I need to have a kafka producer and 4 consumers in python that balancing queue.
My Topic bash code:
kafka-topics --bootstrap-server localhost:9092 --create --topic numbers --partitions 4 --replication-factor 1
for example when I send producer messages, kafka divides the messages equal to consumers. but I need to check if a consumer work done, new message assign to the consumer.
It's help me to balancing and increase the process speed.
my consumer code:
import json, time
from kafka import KafkaConsumer
print("Connecting to consumer ...")
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
print(f"{message.value}")
time.sleep(1)
My producer code:
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
for e in range(100):
data = {'number' : e}
producer.send('numbers', value=data)
print(f"Sending data : {data}")
sleep(5)
Concurrent Consumers
In order to achieve something like this, you need to create 4 consumers with the same groupId. You can use Python Threads for that.
Then each of those consumers will be bound to each partition, polling for messages together.
Warning About Static Keys/No keys
Also I see the key in your producer is a static value(null); Kafka would actually send all of the messages to the same partition. It tends to do that if keys in the messages are same. In such a case even if you have 4 consumers bound to 4 partitions, only one consumer will process the messages one by one since all of them will end up in the same partition.