I have a long running process and Kafka is timing out assuming that the consumer is no longer there and the message is placed back on the topic. I need a way to process my long running message but also continue to acknowledge that the consumer is still there so it does not give up on the message and reassign it.
Here is what I have tried
var consumer = _consumerAccessor[context.ConsumerContext.ConsumerName];
var partitionId = context.ConsumerContext.Partition;
var partition = new Partition(partitionId);
var topicPartition = new TopicPartition(context.ConsumerContext.Topic, partition);
var topicPartitionList = new List<TopicPartition>()
{
topicPartition
};
//consumer.Pause();
consumer.Pause(topicPartitionList);
var pausedPartitions = consumer.PausedPartitions;
var processTask = Task.Run(() => _processFileService.ProcessFile(fileReceivedMsg));
while (!processTask.IsCompleted)
{
consumer.Resume(topicPartitionList);
//_logger.LogInformation("{FileName} still processing", fileReceivedMsg.FileName);
Thread.Sleep(10000);
consumer.Pause(topicPartitionList);
}
consumer.Resume(topicPartitionList);
Has anyone ever done this using KafkaFlow?
The issue should be related to the max.poll.interval.ms. By default the time is 5 minutes, meaning that if the consumer does not call poll after that time the consumer is considered failed and a rebalance will occur.
Consider tweaking this value in order to give time for your messages to process (the max time for this setting is 24h). This way you can also avoid having to pause and resume the consumer. Additionally, you can use the cancelation token
context.ConsumerContext.WorkerStoppedto stop the process in case a rebalance occurs.In order to configure the
max.poll.interval.msyou can do something like this: