Take the following RxJava code:
public class IntervalObserverTest {
@Test
public void test() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
IntervalPublisher o = new IntervalPublisher();
o.getPublisher().subscribe(new Subscriber<String>() {
volatile Subscription s;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(10);
}
@Override
public void onNext(String s) {
System.out.println(Thread.currentThread().getName());
latch.countDown();
}
@Override
public void onError(Throwable t) {
s.cancel();
}
@Override
public void onComplete() {
}
});
if(!latch.await(5, TimeUnit.SECONDS)) {
throw new RuntimeException();
}
}
}
class IntervalPublisher {
public Publisher<String> getPublisher() {
return Flowable.range(1, 100)
.map(String::valueOf)
.delay(50, TimeUnit.MILLISECONDS)
.delaySubscription(100, TimeUnit.MILLISECONDS);
}
}
It simply prints the thread name on the onNext() call.
Here onNext() is actually called by the same thread which runs the Publisher(Flowable):
RxComputationThreadPool-2
This means that the same thread which runs the Publisher is the same thread on the Subscriber side. As is expected.
My question is then of what use is the backpressure setting ?
There is no way the Subscriber can be overloaded since their is just ONE thread doing the whole thing. Correct me if I am wrong.
In general, what use is the backpressure setting ?
The only use I can think of is when the Publisher and Subscriber is on different threads and the Subscriber maybe sees that it has a lot of backed up requests in its buffer and decides to tell the Publisher to stop producing.
This the classic Producer Consumer problem with a bounded queue. But this has no meaning when the Publisher and the Subscriber is on the same Thread.
Am I correct ?