Retrying and dropping erroneous elements of a Flux without pausing downstream demand for data during retry backoff period

31 Views Asked by At

I have a Flux of elements and need to attempt to process each element while retrying the processing of each element if an error occurs. After a maximum number of retries, I need to drop erroneous elements. I also need to only process N elements at a time (back pressure), for which I use the concurrency parameter of flatMap.

The issue is, if N elements are all in their retry delay period, then downstream requests for data stop until the end of an element's retry period. This is because we have reached the maximum number of in-flight inner sequences in the flatMap (specified by the concurrency parameter).

Here is a toy example illustrating the issue:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

import java.time.Duration;

class Scratch {

    public static void main(String[] args) {
        Flux.range(0, 10)
                .flatMap(Scratch::process, 1) // Note: concurrency set to 1 to illustrate issue more easily
                .blockLast();
    }

    private static Mono<Integer> process(Integer i) {
        return Mono.just(i)
                .doOnNext(Scratch::throwErrorOn4)

                // Issue here: element 4 will trigger a retry delay of 1 hour, which will prevent elements 5 through 9
                // from being requested until 1 hour has passed. I would like to continue processing elements 5 through
                // 9 while element 4 is being delayed for retry. (Generally, my flow would be a hot Flux of infinite
                // size, so I don't want retry delays to slow down processing of subsequent elements.)
                .retryWhen(Retry.fixedDelay(1, Duration.ofHours(1)))


                .onErrorResume(throwable -> Mono.empty());
    }

    private static void throwErrorOn4(Integer i) {
        if (i == 4) {
            throw new RuntimeException(); // Throw an exception for element 4 to trigger retry
        }
        System.out.println(i);
    }

}

I understand I could increase the concurrency parameter for the flatMap to allow more concurrent processing of inner publishers, but then I lose the back pressure and request unbounded demand.

How can I fix my processing pattern to continue requesting more data even when I've reached the maximum number of in-flight inner publishers (all in retry delay period), while still maintaining a maximum number of concurrent processing and also being able to drop erroneous elements?

I can handle this by queuing up retries on my own (outside the reactive chain), but it feels wrong.

0

There are 0 best solutions below