I've been testing some of the Reactor backpressure stuff, and one common structure seems to be:
package com.example.backpressure;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
@SpringBootTest
class BackPressureApplicationTests {
@Test
public void backPressureApplicationTest1() {
Flux<Integer> request = Flux.range(1, 20).log();
request.subscribe(new BackPressureSubscriber<>());
}
}
class BackPressureSubscriber<T> extends BaseSubscriber<T> {
public void hookOnSubscribe(Subscription subscription) {
request(1);
}
public void hookOnNext(T value) {
System.out.println("Value is: " + value);
request(1);
}
}
The idea here is to apply backpressure by just allowing a new value to be received after having processed the current one (request(1)). It seems to work fine - and I am seeing output like this:
2022-10-03 20:11:51.908 INFO 2380 --- [ main] reactor.Flux.Range.1 : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2022-10-03 20:11:51.909 INFO 2380 --- [ main] reactor.Flux.Range.1 : | request(1)
2022-10-03 20:11:51.910 INFO 2380 --- [ main] reactor.Flux.Range.1 : | onNext(1)
Value is: 1
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | request(1)
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | onNext(2)
Value is: 2
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | request(1)
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | onNext(3)
Value is: 3
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | request(1)
2022-10-03 20:11:51.911 INFO 2380 --- [ main] reactor.Flux.Range.1 : | onNext(4)
Value is: 4
But what would it mean if I e.g. user request(3) in the hookOnNext?
public void hookOnNext(T value) {
System.out.println("Value is: " + value);
request(3);
}
When examining the output - it seems that the structure is identical (expect for 1=>3):
2022-10-03 20:10:37.701 INFO 19484 --- [ main] reactor.Flux.Range.1 : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2022-10-03 20:10:37.703 INFO 19484 --- [ main] reactor.Flux.Range.1 : | request(1)
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | onNext(1)
Value is: 1
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | request(3)
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | onNext(2)
Value is: 2
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | request(3)
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | onNext(3)
Value is: 3
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | request(3)
2022-10-03 20:10:37.704 INFO 19484 --- [ main] reactor.Flux.Range.1 : | onNext(4)
Value is: 4
Question 1: If I understand the code correctly, it will - for each value received - tell the producer that it can send 3 more values. But will then this "quota" be aggregated somehow for the producer - since I request more values than I process? Does it really make sense at all to request more than 1 value in the hookOnNext handler?
I have seen people do things like this:
class BackPressureSubscriber<T> extends BaseSubscriber<T> {
int cnt = 0;
int numEachTime=3;
public void hookOnSubscribe(Subscription subscription) {
request(3);
}
public void hookOnNext(T value) {
System.out.println("Value is: " + value);
cnt++;
if(cnt%numEachTime == 0) {
request(3);
}
}
}
Which then gives output:
2022-10-03 20:21:59.681 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | request(3)
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(1)
Value is: 1
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(2)
Value is: 2
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(3)
Value is: 3
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | request(3)
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(4)
Value is: 4
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(5)
Value is: 5
2022-10-03 20:21:59.683 INFO 11836 --- [ main] reactor.Flux.Range.1 : | onNext(6)
Value is: 6
Question 2: This processes 3 values before it requests 3 new values, but does this really give an advantage over requesting 1-by-1? Maybe some less latency due to less number of request calls?
Calling
request(n)from within a SubscriberStells a PublisherPthat is is allowed to call theonNextmethod ofSat mostntimes. Callingrequest(n)again does not override a previous request: it tellsPthat is allowed to produce anothernelements (i.e.2*nin total).If
nis equal to 1,Pwill callonNextat most once and wait until more elements are requested byS. This results in stop-and-wait which is usually not what you want. Subscribers should, therefore, request the maximum number of elements they are able to process. In fact, ifScan process elements faster thanPcan procude them,Smight request a unbounded number of elements by callingrequest(Long.MAX_VALUE)(ideally from itsonSubscribemethod).So, to answer your questions:
Yes, it makes sense to request multiple elements in order to reduce delays between
SandP. While it may not make sense to callrequest(3)unconditionally from within youronNextmethod, image you have a buffer of, let's say, 100 elements. If you request 100 elements at the beginning (i.e.onSubscribe) and ifSprocesses the elements slower thanPcan produce them, your buffer might fill up to some number of elements. Then, let's say, that after some processing the remaining elements in the buffer reach 10. At this point you might request another 90: with a strategy like this yourSmight never have to wait forPto generate elements.Fur further details, you might want to have a look at the specification from which much of this information here is taken from.