Need to balancing kafka consumer tasks

66 Views Asked by At

I need to have a kafka producer and 4 consumers in python that balancing queue.

My Topic bash code:

kafka-topics --bootstrap-server localhost:9092 --create  --topic numbers  --partitions 4 --replication-factor 1

for example when I send producer messages, kafka divides the messages equal to consumers. but I need to check if a consumer work done, new message assign to the consumer.

It's help me to balancing and increase the process speed.

my consumer code:

import json, time
from kafka import KafkaConsumer

print("Connecting to consumer ...")
consumer = KafkaConsumer(
    'numbers',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:

 print(f"{message.value}")
 time.sleep(1)

My producer code:

from time import sleep
from json import dumps
from kafka import KafkaProducer
  
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
  
for e in range(100):
   data = {'number' : e}
   producer.send('numbers', value=data)
   print(f"Sending data : {data}")
   sleep(5)
1

There are 1 best solutions below

3
Shuvojit On

Concurrent Consumers

In order to achieve something like this, you need to create 4 consumers with the same groupId. You can use Python Threads for that.

import threading

from kafka import KafkaConsumer


def consumer_thread(consumer):
    for message in consumer:
        print(message)


if __name__ == "__main__":
    consumer_group_id = "my-group"
    bootstrap_servers = ["localhost:29092"]
    topic = "my-topic"

    consumers = []
    for i in range(4):
        consumer = KafkaConsumer(
            topic,
            group_id=consumer_group_id,
            bootstrap_servers=bootstrap_servers,
        )
        print("starting consumer")
        thread = threading.Thread(target=consumer_thread, args=(consumer,))
        consumers.append(thread)
        thread.start()

    for thread in consumers:
        thread.join()

Then each of those consumers will be bound to each partition, polling for messages together.

Single producer producing to a topic with 4 paritions

Warning About Static Keys/No keys

Also I see the key in your producer is a static value(null); Kafka would actually send all of the messages to the same partition. It tends to do that if keys in the messages are same. In such a case even if you have 4 consumers bound to 4 partitions, only one consumer will process the messages one by one since all of them will end up in the same partition.

enter image description here