Spring Application uses Reactor Kafka to consume messages.
Question 1: Is there standard convention to pause message consumption and finish processing inflight messages during application shutdown?
Current implementation is to use reactiveKafkaConsumerTemplate to receive messages. Then using @Predestroy, we pause consumer using reactiveKafkaConsumerTemplate.pause.
Current Implementation (simplified)
reactiveKafkaConsumerTemplate
.receiveAutoAck()
.publishOn(Schedulers.boundedElastic())
.flatMap(x -> Mono.just(x)
.delayElement(Duration.ofMillis(300)),5)
.flatMap(message -> Mono.just(message)
.flatMap(processMessageImp::processMessage)
.onErrorResume(t -> Mono.empty())
);
public void pauseKafkaMessageConsumer() {
reactiveKafkaConsumerTemplate
.assignment()
.flatMap(topicParts -> reactiveKafkaConsumerTemplate.pause(topicParts))
.subscribe();
}
@PreDestroy
public void onExit() {
pauseKafkaMessageConsumer();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.error("onExit Error while PreDestroy ");
}
}
Question 2: How to prevent WebClientRequestException when using @Predestroy
However during shutdown after the Thread.sleep in @Predestroy, any inflight messages fail due to WebClientRequestException . Messages processed during the Thread.sleep process successfully.
WebClientRequestException: executor not accepting a task; nested exception is java.lang.IllegalStateException: executor not accepting a task
Webclient implementation uses Springs injected builder
private final WebClient.Builder builder;
public WebClient createWebclient() {
...
return builder.build()
}
WebClientRequestException seems to only happen when using Spring's injected Webclient.Builder.
I took a similar approach that seems to work locally during testing. It pulls from and sends messages to the same topic as part of processing to test that it successfully consumes all of them before shutting down.
Now to be fair, I am not sure if this is a great approach. I'm still kinda new to Kafka, and could use some feedback!