I have one thread responsible for enqueuing and one thread responsible for dequeuing. However, the frequency of the data being enqueued far surpasses the time needed to dequeue + process the data. When I did the following, I ended up with a huge delay in data processing:
public void HandleData()
{
while (true)
{
try
{
if (Queue.Count > 0)
{
Queue.TryDequeue(out item);
ProcessData(item);
}
else
{
Thread.Sleep(10);
}
}
catch (Exception e)
{
//...
}
}
}
Next I tried, processing the data in separate tasks, but this ended up affecting other tasks in the project since this treatment ended up taking up most of the resources allocated to the application and generating a high thread count.
public void HandleData()
{
while (true)
{
try
{
if (Queue.Count > 0)
{
Queue.TryDequeue(out item);
Task.Run(() => ProcessData(item));
}
else
{
Thread.Sleep(10);
}
}
catch (Exception e)
{
//
}
}
}
Next, I tried the following :
public void HandleData()
{
List<Task> taskList = new List<Task>();
while (true)
{
try
{
if (Queue.Count > 0)
{
Queue.TryDequeue(out item);
if (taskList.Count <= 20)
{
Task t = Task.Run(() => ProcessData(item));
taskList.Add(t);
}
else
{
ProcessData(item);
}
}
else
{
Thread.Sleep(10);
}
taskList.RemoveAll(x => x.IsCompleted);
}
catch (Exception e)
{
//...
}
}
}
This seems to have solved the problem, but I wonder if there is a cleaner way to do it? a way to set a maximum concurrent threads number while dequeuing?
ConcurrentQueueisn't the proper container, especially since it doesn't provide asynchronous operations. A better option would be using an ActionBlock or a Channel combined with Parallel.ForEachAsync.Using an ActionBlock
An ActionBlock combines both an input queue and workers to process the data asynchronously. The workers process the data as soon as it's available. Using an ActionBlock, you can create a block with a set number of workers and start posting data to it. The block will use only the configured number of worker tasks to process the data:
Data/messages are posted to the block using either the Post or SendAsync methods. When there's no more data, the Complete method tells the block to shut down after processing any pending items. We can await for pending items to complete by awaiting the Completion property
Using Channel
Another option is to use a
Channelinstead of aConcurrentQueue. This class is equivalent to an asynchronous ConcurrentQueue that provides anIAsyncEnumerable<T>stream that can be iterated with await foreach. You can create a specific number of workers to read from either the container itself, or through theIAsyncEnumerable<T>stream. In .NET 6, the last part is a lot easier usingParallel.ForEachAsyncwith a fixed degree of parallelism option: