I'm working with a Kafka Streams application where we use dynamic topic determination based on message headers. In our setup, it's normal for topics to be deleted while the application is running. Messages for a deleted topic might still occasionally arrive, but I want to simply ignore them. However, even after receiving just one message for a non-existent topic, I encounter an infinite loop of errors:
[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 74 : {test1=UNKNOWN_TOPIC_OR_PARTITION}
org.apache.kafka.common.errors.TimeoutException: Topic test1 not present in metadata after 60000 ms.
[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 79 : {test1=UNKNOWN_TOPIC_OR_PARTITION}
This infinite loop of errors essentially causes the application to stop working. How can I configure my Kafka Streams application to ignore messages for deleted topics without entering an infinite loop of errors? Is there a way to handle this situation? Here's a simplified example of my application code:
StreamsBuilder builder = new StreamsBuilder();
List<String> dynamicTopics = List.of("good_topic", "deleted_topic");
builder.stream("source_topic").to((k, v, c) -> dynamicTopics.get(new Random().nextInt(dynamicTopics.size()))); //in real application from header
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Automatic topic creation is disabled.
I tried the following to handle and ignore the error:
Use KafkaAdmin: However, between checks for existing topics, a topic can be deleted, which doesn't solve the issue.
Set UncaughtExceptionHandler:
streams.setUncaughtExceptionHandler(new StreamsUncaughtExceptionHandler() {
@Override
public StreamThreadExceptionResponse handle(Throwable throwable) {
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
}
});
But the code doesn't even reach this handler.
- Set ProductionExceptionHandler:
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class.getName());
Again, the code doesn't reach this handler.
- Set Producer Interceptor:
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), ErrorInterceptor.class.getName());
The code reaches this interceptor, but I'm unable to resolve the issue from here.
- Configure Producer Properties:
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, "5000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), "8000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "0");
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 0);
I tried adjusting these producer properties, but Kafka Streams still attempts to handle the error indefinitely