In my spring cloud stream application when having a kafka consumer, I don't want it to commit the event in case of error from an unhandled exception
There is not acknowledgement happening for events throwing exception from my side.
But the current behavior is that it retries the fixed number of time and commits the event. I have tried all possible config but could not find a solution.
But at the same time if I don't acknowledge an event which does not throw an exception that behave normally by not committing it.
My config :
spring:
cloud:
stream:
bindings:
topicConsumer-in-0:
destination: broker-record.v2
group: group.v2-local
consumer:
multiplex: true
kafka:
binder:
brokers: localhost:9093
configuration:
max.poll.interval.ms: 1080000
bindings:
topic-in-0:
consumer:
auto-commit-offset: false
auto-commit-on-error: false
ack-mode: MANUAL
spring:
cloud:
stream:
bindings:
topic-in-0:
content-type: application/json
consumer:
max-attempts: 5
back-off-initial-interval: 10000 #10sec
back-off-max-interval: 900000 #15min
back-off-multiplier: 1.5
function:
definition: myConsumer
autoconfigure:
exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
Sample consumer code:
@Slf4j
@RequiredArgsConstructor
public class StockEventConsumer implements Consumer<Message<StoreStockMovementEvent>> {
@Override
@CircuitBreaker(name="stockEventCircuitBreaker", fallbackMethod = "onCircuitBreakerOpen")
public void accept(Message<StoreStockMovementEvent> storeStockMovementEventMessage) {
Acknowledgment acknowledgment = storeStockMovementEventMessage.getHeaders()
.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
StoreStockMovementEvent payload = storeStockMovementEventMessage.getPayload();
try {
//processing code
} catch(HandledException ex) {
//handled exception
}
acknowledgment.acknowledge();
}
public void onCircuitBreakerOpen(Message<StoreStockMovementEvent> storeStockMovementEventMessage, CallNotPermittedException exception) {
log.error("Circuit breaker is open. Message will not be acknowledged to allow retry {} ", storeStockMovementEventMessage.getPayload().eventHeader().id());
}
}