Spring cloud stream kafka committing event on error

35 Views Asked by At

In my spring cloud stream application when having a kafka consumer, I don't want it to commit the event in case of error from an unhandled exception There is not acknowledgement happening for events throwing exception from my side.

But the current behavior is that it retries the fixed number of time and commits the event. I have tried all possible config but could not find a solution.

But at the same time if I don't acknowledge an event which does not throw an exception that behave normally by not committing it.

My config :

spring:
cloud:
    stream:
      bindings:
        topicConsumer-in-0:
          destination: broker-record.v2
          group: group.v2-local
          consumer:
              multiplex: true
      kafka:
        binder:
          brokers: localhost:9093
          configuration:
              max.poll.interval.ms: 1080000
        bindings:
          topic-in-0:
            consumer:
              auto-commit-offset: false
              auto-commit-on-error: false
              ack-mode: MANUAL


spring:
  cloud:
    stream:
      bindings:
        topic-in-0:
          content-type: application/json
          consumer:
              max-attempts: 5
              back-off-initial-interval: 10000 #10sec
              back-off-max-interval: 900000 #15min
              back-off-multiplier: 1.5
      function:
        definition: myConsumer
  autoconfigure:
    exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration

Sample consumer code:

@Slf4j
@RequiredArgsConstructor
public class StockEventConsumer implements Consumer<Message<StoreStockMovementEvent>> {

    @Override
    @CircuitBreaker(name="stockEventCircuitBreaker", fallbackMethod = "onCircuitBreakerOpen")
    public void accept(Message<StoreStockMovementEvent> storeStockMovementEventMessage) {
        Acknowledgment acknowledgment = storeStockMovementEventMessage.getHeaders()
            .get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        StoreStockMovementEvent payload = storeStockMovementEventMessage.getPayload();

        try {
            //processing code
        } catch(HandledException ex) {
            //handled exception
        }
        acknowledgment.acknowledge();
    }

    public void onCircuitBreakerOpen(Message<StoreStockMovementEvent> storeStockMovementEventMessage, CallNotPermittedException exception) {
        log.error("Circuit breaker is open. Message will not be acknowledged to allow retry {} ", storeStockMovementEventMessage.getPayload().eventHeader().id());
    }
}
0

There are 0 best solutions below