Suppose Kafka, 1 partition, 2 consumers.(2nd consumer is idle)
Suppose the 1st one consumed a message, goes to handle it with 3 other services and suddenly sticks on one of them and miss the Kafka's timeout.
Will Kafka reappoint the partition to the 2nd consumer and the message will doubly handled (suppose the 1st one eventually succeed)?
Yes, that's correct. If Kafka consumer takes too long to handle a message and subsequent poll() is delayed, Kafka will re-appoint this partition to another consumer and the message will be processed again (and again).
For more clarity, first we need decide and define 'How long is too long?'.
This is defined by the property
max.poll.interval.ms. From the docs,Consumer group is rebalanced if there are no calls to poll() within this time.
There is one more property
auto.commit.interval.ms. The auto commit offsets check will be called only during the poll - it checks whether time elapsed is greater than the configured auto commit interval time and if result is yes, the offset is committed.If Kafka consumer is taking too long to process the records, then the subsequent poll() call is also delayed and the offsets returned on the last poll() are not committed. If rebalance happens at this time, the new consumer client assigned to this partition will start processing the messages again.
Consumer group rebalance and resulting partition reassignment can be avoided by increasing this value. This will increase the allowed interval between polls and give more time to consumers to handle the record(s) returned from poll(). The consumers will only join the rebalance inside the call to poll, so increasing max poll interval will also delay group rebalances.
There is one more problem in increasing max poll interval to a big value. If the consumer dies for some other reason, it takes longer than the configured
max.poll.interval.msinterval to detect the failure.session.timeout.msandheartbeat.interval.msare available in this case to detect the total failure as earlier as possible.For more details about these parameters:
Please note that the values configured for
session.timeout.msmust be in the allowable range as configured in the broker configuration by propertiesOtherwise, following exception will be thrown while starting consumer client.
Update: To avoid handling the messages again
There is another method in KafkaConsumer class
commitAsync()to trigger commit offsets operation.For more details on commitSync() and commitAsync(), please check this thread
Committing an offset manually is an action of saying that the offset has been processed so that the Kafka won't send the committed records for the same partition again. When offsets are committed manually, it is important to note that if the consumer dies before processing records for any reason, there is a chance these records won't be processed again.