How to send a message to every consumer who is idle in Kafka?

163 Views Asked by At

I have three consumers and one producer in Kafka. When the producer sends all the messages (there are 100 messages in my simple code), these messages are divided among three consumers, and my main problem is this division of messages. Sometimes a message may be long, that's why one consumer may not be able to answer all the messages quickly, but another consumer who answers all the messages quickly becomes idle and has nothing to do.

How to have all the messages in the queue and whenever the consumers are done with their work, then receive the next message from the producer? (Of course, I don't know whether consumers receive messages from producers or topics, and I am a beginner in this field) Thank you for guiding me completely.

I took a video about the working process, please watch it. According to the video, one consumer has finished its work and is idle, but the other two consumers are running.
Movie link.
My codes: Topic:

kafka-topics --bootstrap-server localhost:9092 --create  --topic numbers  --partitions 4 --replication-factor 1

producer.py:

from time import sleep
from json import dumps
from kafka import KafkaProducer
  
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
  
for e in range(100):
   data = {'number' : e}
   producer.send('numbers', value=data)
   print(f"Sending data : {data}")

consumer0.py:

import json 
from kafka import KafkaConsumer

print("Connecting to consumer ...")
consumer = KafkaConsumer(
    'numbers',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
 print(f"{message.value}")

consumer1.py:

import json 
from kafka import KafkaConsumer
import time

print("Connecting to consumer ...")
consumer = KafkaConsumer(
    'numbers',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
 time.sleep(1)
 print(f"{message.value}")

consumer2.py:

import json 
from kafka import KafkaConsumer
import time

print("Connecting to consumer ...")
consumer = KafkaConsumer(
    'numbers',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
 time.sleep(2)
 print(f"{message.value}")
1

There are 1 best solutions below

5
OneCricketeer On

these messages are divided among three consumers

This isn't guaranteed. Kafka batches records, and depending on the size of a batch, all records could end up in one partition.

You can send data with keys to enforce different partitions, assuming each key will be hashed & modulo'd by the number of partitions to a unique value, but a producer cannot force a message to be broadcast to all consumers of the same group without duplicating the same event to every partition

quickly becomes idle and has nothing to do.

Then produce more data. If you want to send 100 records once in a batch, and never send new events, then maybe you don't really need stream processing

whenever the consumers are done with their work, then receive the next message from the producer

That's exactly what your for loops are doing

don't know whether consumers receive messages from producers or topics

You can use consumer lag metric monitoring tools to detect this