I recently built a consumer/producer system using ConcurrentQueue<T> and SemaphoreSlim. Then made another alternative system utilizing the new System.Threading.Channel class.
After benchmarking both systems using BenchmarkDotNet by writing 1000 items 1000 times into both systems (and waiting for reader to finish), I get the following results:
| Method | ItemsCount | Iterations | Mean | Error | StdDev | Median | Allocated |
|------------ |----------- |----------- |------------:|------------:|------------:|------------:|-----------:|
| MyQueue | 1000 | 1000 | 19,379.4 us | 1,230.30 us | 3,569.33 us | 18,735.6 us | 8235.02 KB |
| MyChannel | 1000 | 1000 | 45,858.2 us | 1,298.42 us | 3,704.46 us | 45,689.2 us | 72.11 KB |
The ConcurrentQueue implementation seems to be significantly faster than the Channel.
I tried setting SingleReader and SingleWriter to true on the Channel, but results ended up being worse:
| Method | ItemsCount | Iterations | Mean | Error | StdDev | Median | Allocated |
|------------ |----------- |----------- |------------:|------------:|------------:|------------:|-----------:|
| MyQueue | 1000 | 1000 | 18,578.7 us | 1,238.46 us | 3,493.10 us | 18,192.7 us | 8236.31 KB |
| MyChannel | 1000 | 1000 | 50,506.9 us | 1,383.73 us | 3,857.28 us | 49,635.8 us | 170.73 KB |
I'm not sure if there is a flaw in my implementation or the benchmark itself? If not and these results are valid, when should Channels be preferred over just plain ConcurrentQueue?
The simplified code of both classes look like this:
public class MyQueue
{
ConcurrentQueue<Item> _queue;
SemaphoreSlim _readerFinishedSemaphore;
SemaphoreSlim _readSemaphore;
bool completed = false;
public void Setup()
{
_queue = new();
_readerFinishedSemaphore = new(0);
_readSemaphore = new(0);
var task = new Task(Reader, TaskCreationOptions.LongRunning);
task.Start();
}
private async void Reader()
{
while (true)
{
await _readSemaphore.WaitAsync();
while (_queue.TryDequeue(out var item))
{
// do stuff ...
}
if (_completed) break;
}
_readerFinishedSemaphore.Release();
}
public void Write(IList<Item> items)
{
foreach (var i in items)
{
_queue.Enqueue(i);
}
_readSemaphore.Release();
}
public void CompleteAndWaitForReader()
{
_completed = true;
_readSemaphore.Release();
_readerFinishedSemaphore.Wait();
}
}
And for Channels:
public class MyChannel
{
Channel<Item> _channel = null!;
SemaphoreSlim _readerFinishedSemaphore = null!;
public void Setup()
{
_readerFinishedSemaphore = new(0);
_channel = Channel.CreateUnbounded<Item>();
var task = new Task(Reader, TaskCreationOptions.LongRunning);
task.Start();
}
private async void Reader()
{
var reader = _channel.Reader;
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out var item))
{
// do stuff ...
}
}
_readerFinishedSemaphore.Release();
}
public void Write(IList<Item> items)
{
foreach (var i in items)
{
_channel.Writer.TryWrite(i);
}
}
public void CompleteAndWaitForReader()
{
_channel.Writer.Complete();
_readerFinishedSemaphore.Wait();
}
}
The benchmarking code looks like this:
// items are generated in [GlobalSetup] using fixed-seed Random class
[IterationSetup]
public void IterationSetup()
{
myChannel = new MyChannel();
myQueue = new MyQueue();
myChannel.Setup();
myQueue.Setup();
}
[Benchmark]
public void MyQueue()
{
for (int i = 0; i < Iterations; i++)
myQueue.Write(items);
myQueue.CompleteAndWaitForReader();
}
// same for MyChannel
Should be noted I am running this on .NET 8.0.0-preview.6.23329.4
The main reason why
ConcurrentQueue<T>was performing faster was because it was signalling only once per 1000 added items, while theChannel<T>was doing it for every item.When I adjusted the benchmark to instead add 1000 items separately one by one to make it fairer, the results were practically identical:
And at higher item counts, the difference was becoming more obvious in favor of the
Channel<T>implementation: (also notably in terms of allocations)So I guess I will be sticking to
Channel<T>for general producer/consumer scenarios.