I've below code which programmatically generates Flux using generate. Since the result is Flux<Mono<T>> I'm using concatMap to map result to Flux<T>. The producer of Mono<T> works based on throttling. If too many requests are sent in short period of time it results in exception. To deal with this I've provided a subscriber that calls request(n) in hookOnSubscribe() and hookOnNext(). I want remote API to be called only request(n) times. This is like pull based request. But concatMap request upstream publisher even when there is no demand from downstream and it results in throttling exception. How can I make sure that remote API is called only request(n) times?
Flux<Mono<Result>> generate = Flux.generate(
() -> queryRequest,
(state, sink) -> {
if(!state.isPrepared() || !state.isDone()) {
Mono<Result> nextPage = execute(state).doOnNext(result -> {
QueryResult result1 = (QueryResult) result;
state.setContKey(result1.getContinuationKey());
});
sink.next(nextPage);
} else {
sink.complete();
}
return state;
}
);
generate.concatMap(resultMono -> resultMono);