Handling and ignore UNKNOWN_TOPIC_OR_PARTITION error in Kafka Streams

51 Views Asked by At

I'm working with a Kafka Streams application where we use dynamic topic determination based on message headers. In our setup, it's normal for topics to be deleted while the application is running. Messages for a deleted topic might still occasionally arrive, but I want to simply ignore them. However, even after receiving just one message for a non-existent topic, I encounter an infinite loop of errors:

[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 74 : {test1=UNKNOWN_TOPIC_OR_PARTITION}

org.apache.kafka.common.errors.TimeoutException: Topic test1 not present in metadata after 60000 ms.

[kafka-producer-network-thread | stream-example-producer] WARN org.apache.kafka.clients.NetworkClient -- [Producer clientId=stream-example-producer] Error while fetching metadata with correlation id 79 : {test1=UNKNOWN_TOPIC_OR_PARTITION}

This infinite loop of errors essentially causes the application to stop working. How can I configure my Kafka Streams application to ignore messages for deleted topics without entering an infinite loop of errors? Is there a way to handle this situation? Here's a simplified example of my application code:

StreamsBuilder builder = new StreamsBuilder();
List<String> dynamicTopics = List.of("good_topic", "deleted_topic");
builder.stream("source_topic").to((k, v, c) -> dynamicTopics.get(new Random().nextInt(dynamicTopics.size()))); //in real application from header
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Automatic topic creation is disabled.

I tried the following to handle and ignore the error:

  1. Use KafkaAdmin: However, between checks for existing topics, a topic can be deleted, which doesn't solve the issue.

  2. Set UncaughtExceptionHandler:

streams.setUncaughtExceptionHandler(new StreamsUncaughtExceptionHandler() {
    @Override
    public StreamThreadExceptionResponse handle(Throwable throwable) {
        return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
    }
});

But the code doesn't even reach this handler.

  1. Set ProductionExceptionHandler:
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
          CustomProductionExceptionHandler.class.getName());

Again, the code doesn't reach this handler.

  1. Set Producer Interceptor:
props.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG), ErrorInterceptor.class.getName());

The code reaches this interceptor, but I'm unable to resolve the issue from here.

  1. Configure Producer Properties:
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, "5000"); 
props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), "8000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "0");
props.put(StreamsConfig.producerPrefix(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), "10000");
props.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 0);

I tried adjusting these producer properties, but Kafka Streams still attempts to handle the error indefinitely

0

There are 0 best solutions below