Confluent Kafka AdminClient create_topics method returns KafkaException error but still creates topics

49 Views Asked by At

In a spring boot application when a command is entered in the CLI we create topics with names grabbed from a json file. Below is the code. For each topic it creates it returns an error KafkaError{code=_BAD_MSG,val=-199,str="CREATETOPICS worker failed to parse response: CreateTopics response protocol parse failure: Local: Bad message format"}

import json
from confluent_kafka.admin import AdminClient, NewTopic

admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

# read the json file and extract the topic names
with open('../resources/kafkaSubscriptions.json') as f:
    data = json.load(f)
topic_names = [NewTopic(topic['type'], num_partitions=1, replication_factor=1) for topic in data['data']]
fs = admin_client.create_topics(topic_names, validate_only=False)

# Wait for each operation to finish.
for topic, f in fs.items():
    try:
        f.result() 
        print("Topic {} created\n".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}\n".format(topic, e))

As mentioned in the title it does in fact create the topics as expected but we get the "Failed to created topic" message in output because of exception. Searched all over and couldn't find anyone else having this issue. I have tried using different versions of kafka and the confluent-kafka package and changes to both did not resolve the issue.

Kafka version: 3.3

confluent-kafka version: 2.1

0

There are 0 best solutions below