How to implement parallel task consumer for kafka application?

67 Views Asked by At

I want to consume my kafka topic using multiple tasks in task parallel library.

var tasks = new List<Task>();
consumer.Subscribe(topicName);
var capacity = 5;
var counter = 0;
while (true)
{
    var task = Task.Run(() =>
    {
        var consumed = consumer.Consume();
        var item = deserialize(consumed);
        process(item);
        consumer.Commit();
    });
    
    tasks.Add(task);

    if (capacity == counter)
    {
        await Task.WhenAll(tasks);
        counter = 0;
       
    }
    
    counter++;
}

The process() method consumes a service which capacity is 5 per minute.

enter image description here

But in this solution, if I get 0,1,2,3,4 items from queue and run Task.WhenAll(tasks) method, I do not know which will finish first. so when I commit consumer in task, I do not know which item will commited. I mean the task completion order may be 3,2,0,4,1. And will be the ofset is 0 in this stuation. I am confused using about kafka consumer, if this is a right way. And process may be get exception for task 2. What will be commit in this stuation.

1

There are 1 best solutions below

9
Marco Merola On BEST ANSWER

If you prefer not to partition your topic and assign each partition to a different consumer, you can still process Kafka records in parallel. However, it is essential to ensure that you handle the consumer offset correctly.

Handling the offset

Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.

By default, the .NET Consumer will commit offsets automatically. This is done periodically by a background thread at an interval specified by the AutoCommitIntervalMs config property. An offset becomes eligible to be committed immediately prior to being delivered to the application via the Consume method.

However this strategy introduces the potential for messages to be missed in the case of application failure!

The C# client allows you also to commit offsets explicitly via the Commit method (from your code segment it seems you are following this approach).

Like in the following example:

var config = new ConsumerConfig
{
    ...
    // Disable auto-committing of offsets.
    EnableAutoCommit = false
}

...

while (!cancelled)
{
    var consumeResult = consumer.Consume(cancellationToken);

    // process message here
    Process(consumeResult)

    // Commit the offset
    consumer.Commit(consumeResult);
            
}

If your process fails (the offset commit could fail too), your application should be programmed to implement retry logic, potentially with increasing backoff intervals.

It's crucial for your application to be designed with idempotence in mind, enabling it to handle the same record multiple times while eventually disregarding duplicates. This ensures compliance with the at least once delivery principle.

If an exception bubbles up, make sure to dispose of your consumer properly. Then, when you recreate it after a little break, it should pick up from where it left off, processing from the last committed offset.

Process messages in parallel within the consumer

When processing records in parallel within your consumer you have two options:

  1. Ensure that you commit the highest processed offset without leaving any 'holes' behind. Otherwise, in case of failure, your consumer will ignore some records without processing them
  2. Write the failed records in a dead letter topic and keep processing the others

Coming back to your example, you would likely do something like this:

while (!cancelled)
{
    var records = new List<ConsumeResult<TKey, TValue>>();
    var errorQueue = new ConcurrentQueue<Exception>();
    var processed = new ConcurrentQueue<ConsumeResult<TKey, TValue>>();

    //Consume timeout
    int consumeTimeoutInMs = 1000;

    // Suppose you process 5 messages at a time
    for(int i = 0; i < 5; i++)
    {
       var r = consumer.Consume(consumeTimeoutInMs);
       if(r != null && !r.IsPartitionEOF)
          records.Add(r)
    }   

    if(!records.Any()) {
       // It would make sense to wait a bit here before polling again.
       // For example you may add: await Task.Delay(TimeSpan.FromSeconds(1));
       continue;      
    }

    // Process the messages
    Parallel.ForEach(records, record =>
        {
            try {
               //Suppose your service is designed to process messages one at a time               
               Process(record); 
               processed.Enqueue(record);
            } catch(Exception e) {
              errorQueue.Enqueue(e);
            }
        });

    //Retrieve the latest offset
    ConsumeResult<TKey, TValue> latestProcessedRecord = null;
    foreach (var record in records)
    {            
       if (processed.Contains(record))            
          latestProcessedRecord = record;
            
       else            
          break;            
    }
    
    // Commit the offset
    if(latestProcessedRecord != null)
       consumer.Commit(consumeResult);
    

    // Halt the execution in case of errors
    if(errorQueue.Any()) 
       throw new AggregatedException(errorQueue);
            
}

It's also a good practice to design your process to handle records in batches.

Examples

  • ITERATION 1
READ(MESSAGE_AT_1)
READ(MESSAGE_AT_2)
READ(MESSAGE_AT_3)
READ(MESSAGE_AT_4)
READ(MESSAGE_AT_5)

// In Parallel
PROCESS(MESSAGE_AT_1) - SUCCEEDS
PROCESS(MESSAGE_AT_2) - SUCCEEDS
PROCESS(MESSAGE_AT_3) - FAILS
PROCESS(MESSAGE_AT_4) - SUCCEEDS
PROCESS(MESSAGE_AT_5) - SUCCEEDS

COMMIT(INDEX_2)

THROWS EXCEPTION
  • ITERATION 2
READ(MESSAGE_AT_3)
READ(MESSAGE_AT_4)
READ(MESSAGE_AT_5)
READ(MESSAGE_AT_6)
READ(MESSAGE_AT_7)

// In Parallel
PROCESS(MESSAGE_AT_3) - SUCCEEDS
PROCESS(MESSAGE_AT_4) - SUCCEEDS
PROCESS(MESSAGE_AT_5) - SUCCEEDS
PROCESS(MESSAGE_AT_6) - FAILS
PROCESS(MESSAGE_AT_7) - SUCCEEDS

COMMIT(INDEX_5)

THROWS EXCEPTION

...

If you need more advanced features, I suggest you to give a look at some examples here on github:

Or eventually to try this wrapper (kafka-flow):