In my kafka consumer, if a transient error happens I want the consumer to have a circuit breaker for which I am using retry indefintely.
So, that I can fix the problem and start consuming again. But the problem which is happening is I am consuming events in a batch of 500. And since the processing is reactive, an event with higher offset gets committed before the event with lower offset throws the error.
So, when I start consuming again after the error, it starts from the latest offset and not the one where error happened earlier. Causing me loss of events.
My code :
@Slf4j
public class EventConsumer {
// Class details omitted
public Disposable consumeMessage() {
return processRecord().onBackpressureBuffer(BUFFER_SIZE)
.limitRate(500)
.retryWhen(Retry.indefinitely())
.subscribe(record -> {}, error -> log.error("Error while consuming event: {}", error.getMessage()));
}
public Flux<EventWrapper> processRecord() {
Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(inputEventReceiver::receive);
return receiverRecord.flatMap(this::processMessage);
}
// Additional methods omitted
}
And I could not find a solution for this problem as flux is a stream and events keep on coming, which makes it difficult for synchronous commit with asynchronous processing.