For test purposes I have to read all messages in some kafka topics. Before test I remove all messages using /kafka-delete-records.sh then I run tests that fill kafka topics. After test I want to analyse all messages.
My code to read all messages from all topic partitions:
def show_messageges_from_topic(kafka_server, topic):
i = 0
consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, auto_offset_reset='earliest')
try:
partitions = consumer.partitions_for_topic(topic)
if partitions:
for partition in partitions:
tp = TopicPartition(topic, partition)
consumer.assign([tp])
consumer.seek(partition=tp, offset=0)
records = consumer.poll(50) # timeout in millis
for _, consumer_records in records.items():
for consumer_record in consumer_records:
i += 1
msg_process(topic, i, consumer_record)
finally:
consumer.close()
return i
For test purposes I created 1230 messages in two topics, one with one partition, and other with 4 partitions:
Counts from Kafka UI (webpage http://169.0.1.77:8090/ui/clusters/local/all-topics?perPage=25&q=fdp)
Topic Name Partitions Out of sync replicas Replication Factor Number of messages Size
app.analyse 1 0 1 1230 12 MB
app.result 4 0 1 1230 8 MB
But it seems that my code do not read all messages from topic with one partition:
My counts:
topic name {partitions} cnt
app.analyse {0} cnt=500
app.result {0, 1, 2, 3} cnt=1230
What can I do to read all messages from kafka topics?
My environment:
[mn:~] $ python -V
Python 3.11.4
[mn:~] $ pip3 list | grep kafka
kafka-python 2.0.2