Here is a picture of what I am attempting to accomplish.
--a-b-c-a--bbb--a
split into
--a-----a-------a --> a stream
----b------bbb--- --> b stream
------c---------- --> c stream
Then, be able to
a.subscribe()
b.subscribe()
c.subscribe()
So far, everything I have found has split the stream using a groupBy(), but then collapsed everything back into a single stream and process them all in the same function. What I want to do is process each derived stream in a different way.
The way I'm doing it right now is doing a bunch of filters. Is there a better way to do this?
Easy as pie, just use filter
An example in scala
import rx.lang.scala.Observable
val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")
aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))
bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))
cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))
You just need to make sure that the source observable is hot. The easiest way is to share
it.