Split Rx Observable into multiple streams and process individually

Brandon Bil picture Brandon Bil · Mar 4, 2015 · Viewed 24.1k times · Source

Here is a picture of what I am attempting to accomplish.

--a-b-c-a--bbb--a

split into

--a-----a-------a --> a stream

----b------bbb--- --> b stream

------c---------- --> c stream

Then, be able to

a.subscribe()
b.subscribe()
c.subscribe()

So far, everything I have found has split the stream using a groupBy(), but then collapsed everything back into a single stream and process them all in the same function. What I want to do is process each derived stream in a different way.

The way I'm doing it right now is doing a bunch of filters. Is there a better way to do this?

Answer

Tomáš Dvořák picture Tomáš Dvořák · Mar 5, 2015

Easy as pie, just use filter

An example in scala

import rx.lang.scala.Observable

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")

aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))

bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))

cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))

You just need to make sure that the source observable is hot. The easiest way is to share it.