I have been looking at new rx java 2 and I'm not quite sure I understand the idea of backpressure
anymore...
I'm aware that we have Observable
that does not have backpressure
support and Flowable
that has it.
So based on example, lets say I have flowable
with interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
This is going to crash after around 128 values, and thats pretty obvious I am consuming slower than getting items.
But then we have the same with Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
This will not crash at all, even when I put some delay on consuming it still works. To make Flowable
work lets say I put onBackpressureDrop
operator, crash is gone but not all values are emitted either.
So the base question I can not find answer currently in my head is why should I care about backpressure
when I can use plain Observable
still receive all values without managing the buffer
? Or maybe from the other side, what advantages do backpressure
give me in favour of managing and handling the consuming?
What backpressure manifests in practice is bounded buffers, Flowable.observeOn
has a buffer of 128 elements that gets drained as fast as the dowstream can take it. You can increase this buffer size individually to handle bursty source and all the backpressure-management practices still apply from 1.x. Observable.observeOn
has an unbounded buffer that keeps collecting the elements and your app may run out of memory.
You may use Observable
for example:
You may use Flowable
for example: