I am trying to test @RetryableTopic feature of Kafka where after retrying the message 3 times , I would like to push it to Kafka SQS. I am getting call from retry thread 4 times and call to DltHandler 2 times .As far as I know,in Spring Retry, the @DltHandler annotation is used to handle messages that have failed after all retry attempts have been exhausted and have been moved to the Dead Letter Queue (DLQ) so I am expecting that call to processMessage() only once. I am tracking call to consumer() method with thread name to identify which thread is calling this method. Not sure which part I am missing ?
@Bean(ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY)
public KafkaTemplate<String, String> kafkaTemplateForDlt() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public RetryTopicConfiguration myRetryTopic(@Qualifier(ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY)KafkaTemplate<String, String> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
}
@Slf4j
@Component
public class ProductEventConsumer {
@Autowired
private ProductServiceImpl productServiceImpl;
@Autowired ObjectMapper objectMapper;
@Value("${aws.sqsDLQ}")
private String productdlq;
@Autowired
private SqsTemplate sqsTemplate;
@RetryableTopic(
backoff = @Backoff(delayExpression = "10000", multiplierExpression = "0"),
attempts = "3",
kafkaTemplate = ProductServiceConstants.PRODUCT_KAFKA_DLT_PRODUCER_FACTORY,
include = {SocketTimeoutException.class,ArithmeticException.class})
@KafkaListener(id=ProductServiceConstants.PRODUCT_KAFKA_CONSUMER_ID, idIsGroup=false,
topics="#{'${spring.kafka.product-topic}'}",containerFactory=ProductServiceConstants.PRODUCT_KAFKA_CONSUMER_FACTORY)
public void consumer(ConsumerRecord<String,String> consumerRecord, Acknowledgment ack) {
try{
log.info("START:Received request via kafka:{} thread:{}",consumerRecord.value()
,Thread.currentThread().getName());
int result = 10 / 0;
ack.acknowledge();
}catch( JsonProcessingException e) {
log.error("END:Exception occured while saving item:{}",e.getMessage());
}
}
@DltHandler
public void processMessage(ConsumerRecord<String,String> consumerRecord, Acknowledgment ack) {
try{
log.error("START:Pushing message to SQS DLQ:{}",consumerRecord.key());
sqsTemplate.send(sqsSendOptions -> sqsSendOptions.queue(productdlq).payload(consumerRecord.value()));
}catch(Exception e) {
log.error("END:Failure while pushing msg to sqs dlq:{} key:{}",e.getMessage(),consumerRecord.key());
}
finally {
ack.acknowledge();
}
}
}
Please have a look at this example. I believe that branch
so-78201070does what you are looking for:The test
retryAndDltsends an event that gets retried twice and is then sent to the DLQ/DLT.Hope it helps.