Why Python KafkaConsumer do not read all messages from topic?

127 Views Asked by At

For test purposes I have to read all messages in some kafka topics. Before test I remove all messages using /kafka-delete-records.sh then I run tests that fill kafka topics. After test I want to analyse all messages.

My code to read all messages from all topic partitions:

def show_messageges_from_topic(kafka_server, topic):
    i = 0
    consumer = kafka.KafkaConsumer(bootstrap_servers=kafka_server, auto_offset_reset='earliest')
    try:
        partitions = consumer.partitions_for_topic(topic)
        if partitions:
            for partition in partitions:
                tp = TopicPartition(topic, partition)
                consumer.assign([tp])
                consumer.seek(partition=tp, offset=0)
                records = consumer.poll(50)     # timeout in millis
                for _, consumer_records in records.items():
                    for consumer_record in consumer_records:
                        i += 1
                        msg_process(topic, i, consumer_record)
    finally:
        consumer.close()
    return i

For test purposes I created 1230 messages in two topics, one with one partition, and other with 4 partitions:

Counts from Kafka UI (webpage http://169.0.1.77:8090/ui/clusters/local/all-topics?perPage=25&q=fdp)

Topic Name  Partitions  Out of sync replicas    Replication Factor  Number of messages  Size

app.analyse 1   0   1   1230    12 MB   
app.result  4   0   1   1230    8 MB    

But it seems that my code do not read all messages from topic with one partition:

My counts:

topic name {partitions} cnt

app.analyse {0}                  cnt=500
app.result {0, 1, 2, 3}          cnt=1230

What can I do to read all messages from kafka topics?

My environment:

[mn:~] $ python -V
Python 3.11.4

[mn:~] $ pip3 list | grep kafka
kafka-python       2.0.2
0

There are 0 best solutions below