publishOn vs subscribeOn in Project Reactor 3

KayV picture KayV · Jan 3, 2018 · Viewed 9k times · Source

I am using publishOn vs subscribeOn both on the same flux as follows:

    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");

Although, when i use both, nothing is printed in logs. But when i use only publishOn, i got the following info logs:

*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------

Is that publishOn is more recommended than subscribeOn? Or it has more preference than subscribeOn? What is the difference between the two and when to use which?

Answer

Ismail Marmoush picture Ismail Marmoush · Dec 25, 2019

It took me sometime to understand it, maybe because publishOn is usually explained before subscribeOn, here's a hopefully more simple layman explanation.

subscribeOn means running the initial source emission e.g subscribe(), onSubscribe() and request() on a specified scheduler worker (other thread), and also the same for any subsequent operations like for example onNext/onError/onComplete, map etc and no matter the position of subscribeOn(), this behavior would happen

And if you didn't do any publishOn in the fluent calls then that's it, everything would run on such thread.

But as soon as you call publishOn() let's say in the middle, then any subsequent operator call will be run on the supplied scheduler worker to such publishOn().

here's an example

Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());

Flux.range(1, 5)
        .doOnNext(consumer)
        .map(i -> {
          System.out.println("Inside map the thread is " + Thread.currentThread().getName());
          return i * 10;
        })
        .publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
        .doOnNext(consumer)
        .publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
        .doOnNext(consumer)
        .subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
        .subscribe();

The result would be


1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5

As you can see the first doOnNext() and the following map() is running on the thread called subscribeOn_thread , that happens till any publishOn() called, then any subsequent call would run on the supplied scheduler to that publishOn() and again this will happen for any subsequent call till anyone calls another publishOn().