I'm trying to publish and read messages to a kafka topic in an kafka server deployed by a colleague.
The problem:
The consumer reads sometimes yes, sometimes no.
- I ran the consumer (and kept it on) and it got 5 old messages
- I ran the producer but the consumer didn't receive any message
- I ran again the producer but changed the amount of messages sent (from 5 to 10). This time the consumer got the 10 messages sent by the producer
- I ran the producer again to send 10 messages, but this time the consumer gets nothing
- I ran the producer again to send 3 messages, and the consumer got the 3 messages
- Then I waited a moment before send more messages to the topic, but when I tried with an amount of messages already tried, the consumer got nothing, when I try with a new one it works
It's very strange and unpredictable behavior, what could be the issue?
Technologies
- python version: 3.11
- kafka version: 3.6.1
- kafka client: kafka-python
The code:
kafka_admin.py to create topics:
from os import getenv
from dotenv import load_dotenv
from kafka.admin import KafkaAdminClient, NewTopic
# take environment variables from .env.
load_dotenv()
kafka_server = getenv('KAFKA_HOST')
admin_client = KafkaAdminClient(bootstrap_servers=kafka_server)
# Define the topic configuration
topic_name = "santiago_test1"
partitions = 1
replication_factor = 1
new_topic = NewTopic(topic_name, partitions, replication_factor)
# Create topics
admin_client.create_topics([new_topic])
# Ensure topic was created
created_topics = admin_client.list_topics()
topics_details = admin_client.describe_topics()
print(created_topics)
print(topics_details)
topic_details
{
'error_code': 0,
'topic': 'santiago_test1',
'is_internal': False,
'partitions': [
{
'error_code': 0,
'partition': 0,
'leader': 1,
'replicas': [1],
'isr': [1],
'offline_replicas': []
}
]
}
kafka_producer.py to publish messages
from time import sleep
from os import getenv
from dotenv import load_dotenv
from kafka import KafkaProducer
# take environment variables from .env.
load_dotenv()
kafka_server = getenv('KAFKA_HOST')
client_id = "santiago_producer_1"
topic_name = "santiago_test1"
print(f"Kafka server: {kafka_server}")
producer = KafkaProducer(client_id=client_id,
bootstrap_servers=kafka_server)
for i in range(3):
value = f"hello world {i}".encode()
print(f"sending message: {value.decode('utf-8')}")
producer.send(topic_name, value)
print("sleeping...")
sleep(1)
producer.close()
kafka_consumer.py to read topic
from dotenv import load_dotenv
from os import getenv
from kafka import KafkaConsumer
# take environment variables from .env.
load_dotenv()
kafka_server = getenv('KAFKA_HOST')
client_id = "santiago_consumer_1"
group_id = "santiago"
topic_name = "santiago_test1"
print(f"Kafka server: {kafka_server}")
consumer = KafkaConsumer(topic_name,
client_id=client_id,
group_id=None,
auto_offset_reset='earliest',
bootstrap_servers=kafka_server)
try:
for msg in consumer:
print(f"Message: {msg.value.decode('utf-8')}")
except KeyboardInterrupt:
consumer.close()
finally:
print("finally statement")
consumer.close()