How to add retry advice to kafka publishing IntegrationFlow

28 Views Asked by At

I'm trying to setup retryAdvice if there is an error in publishing to kafka in my publisherFlow. I can't seem to find the correct configuration on how to get it working. Any help or guidance would be much appreciated.

    @Bean
    public IntegrationFlow publisherFlow(CsvToEmailInteractionConverter converter, KafkaTemplate<String, String> kafkaTemplate) {
        return IntegrationFlow.from("publisherChannel")
                              .transform(converter, "transform")
                              .transform(Transformers.toJson())
                              .log(LoggingHandler.Level.DEBUG, Constants.LOG_FLOW_CATEGORY, m -> "Payload: " + m.getPayload())
                              .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                                           .topic(Constants.KAFKA_TOPIC), e -> e.id(Constants.ENGAGE_ACOUSTIC_ID))
                              // .routeByException(r -> r.defaultOutputChannel(MessageChannels.direct("standardFlowExceptionChannel").getObject()))
                              .get();
    }

I think the advice needs to go before the handle()?

1

There are 1 best solutions below

0
Artem Bilan On

We don't see any retry configuration in your question, but see this one which might be exactly what you are looking for: How to get RetryAdvice working for KafkaProducerMessageHandler.

So, this is essentially what you need:

.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                 .topic(Constants.KAFKA_TOPIC)
                 .sync(true), 
            e -> e.id(Constants.ENGAGE_ACOUSTIC_ID)
                    .advice(retryAdvice))