why my kafka consumer reads sometimes yes and sometimes no

84 Views Asked by At

I'm trying to publish and read messages to a kafka topic in an kafka server deployed by a colleague.

The problem:

The consumer reads sometimes yes, sometimes no.

  • I ran the consumer (and kept it on) and it got 5 old messages
  • I ran the producer but the consumer didn't receive any message
  • I ran again the producer but changed the amount of messages sent (from 5 to 10). This time the consumer got the 10 messages sent by the producer
  • I ran the producer again to send 10 messages, but this time the consumer gets nothing
  • I ran the producer again to send 3 messages, and the consumer got the 3 messages
  • Then I waited a moment before send more messages to the topic, but when I tried with an amount of messages already tried, the consumer got nothing, when I try with a new one it works

It's very strange and unpredictable behavior, what could be the issue?

Technologies

  • python version: 3.11
  • kafka version: 3.6.1
  • kafka client: kafka-python

The code:

kafka_admin.py to create topics:

from os import getenv
from dotenv import load_dotenv
from kafka.admin import KafkaAdminClient, NewTopic

# take environment variables from .env.
load_dotenv()

kafka_server = getenv('KAFKA_HOST')

admin_client = KafkaAdminClient(bootstrap_servers=kafka_server)

# Define the topic configuration
topic_name = "santiago_test1"
partitions = 1
replication_factor = 1
new_topic = NewTopic(topic_name, partitions, replication_factor)

# Create topics
admin_client.create_topics([new_topic])

# Ensure topic was created
created_topics = admin_client.list_topics()
topics_details = admin_client.describe_topics()

print(created_topics)
print(topics_details)

topic_details

{
  'error_code': 0,
  'topic': 'santiago_test1',
  'is_internal': False,
  'partitions': [
    {
      'error_code': 0,
      'partition': 0,
      'leader': 1,
      'replicas': [1],
      'isr': [1],
      'offline_replicas': []
    }
  ]
}

kafka_producer.py to publish messages

from time import sleep
from os import getenv
from dotenv import load_dotenv
from kafka import KafkaProducer

# take environment variables from .env.
load_dotenv()

kafka_server = getenv('KAFKA_HOST')
client_id = "santiago_producer_1"
topic_name = "santiago_test1"

print(f"Kafka server: {kafka_server}")

producer = KafkaProducer(client_id=client_id,
                         bootstrap_servers=kafka_server)

for i in range(3):
    value = f"hello world {i}".encode()
    print(f"sending message: {value.decode('utf-8')}")
    producer.send(topic_name, value)

    print("sleeping...")
    sleep(1)

producer.close()

kafka_consumer.py to read topic

from dotenv import load_dotenv
from os import getenv

from kafka import KafkaConsumer

# take environment variables from .env.
load_dotenv()

kafka_server = getenv('KAFKA_HOST')
client_id = "santiago_consumer_1"
group_id = "santiago"
topic_name = "santiago_test1"

print(f"Kafka server: {kafka_server}")

consumer = KafkaConsumer(topic_name,
                         client_id=client_id,
                         group_id=None,
                         auto_offset_reset='earliest',
                         bootstrap_servers=kafka_server)

try:
    for msg in consumer:
        print(f"Message: {msg.value.decode('utf-8')}")
except KeyboardInterrupt:
    consumer.close()
finally:
    print("finally statement")
    consumer.close()
0

There are 0 best solutions below