I have three consumers and one producer in Kafka.
When the producer sends all the messages (there are 100 messages in my simple code), these messages are divided among three consumers, and my main problem is this division of messages.
Sometimes a message may be long, that's why one consumer may not be able to answer all the messages quickly, but another consumer who answers all the messages quickly becomes idle and has nothing to do.
How to have all the messages in the queue and whenever the consumers are done with their work, then receive the next message from the producer? (Of course, I don't know whether consumers receive messages from producers or topics, and I am a beginner in this field)
Thank you for guiding me completely.
I took a video about the working process, please watch it. According to the video, one consumer has finished its work and is idle, but the other two consumers are running.
Movie link.
My codes:
Topic:
kafka-topics --bootstrap-server localhost:9092 --create --topic numbers --partitions 4 --replication-factor 1
producer.py:
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
for e in range(100):
data = {'number' : e}
producer.send('numbers', value=data)
print(f"Sending data : {data}")
consumer0.py:
import json
from kafka import KafkaConsumer
print("Connecting to consumer ...")
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
print(f"{message.value}")
consumer1.py:
import json
from kafka import KafkaConsumer
import time
print("Connecting to consumer ...")
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
time.sleep(1)
print(f"{message.value}")
consumer2.py:
import json
from kafka import KafkaConsumer
import time
print("Connecting to consumer ...")
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
time.sleep(2)
print(f"{message.value}")
This isn't guaranteed. Kafka batches records, and depending on the size of a batch, all records could end up in one partition.
You can send data with keys to enforce different partitions, assuming each key will be hashed & modulo'd by the number of partitions to a unique value, but a producer cannot force a message to be broadcast to all consumers of the same group without duplicating the same event to every partition
Then produce more data. If you want to send 100 records once in a batch, and never send new events, then maybe you don't really need stream processing
That's exactly what your for loops are doing
You can use consumer lag metric monitoring tools to detect this