For a multithreaded application I want to await until a BlockingCollection is completed and empty (IsCompleted = true). I implemented the below and this seems to be working.
Since it's multithreading I don't even trust my own shadow. Would this be a robust implementation?
public class BlockingCollectionEx<T> : BlockingCollection<T>
{
public Task WaitCompleted => completedManualResetEvent.Task;
private readonly TaskCompletionSource completedManualResetEvent = new();
public new void CompleteAdding()
{
base.CompleteAdding();
lock (completedManualResetEvent)
{
if (base.Count == 0 && !completedManualResetEvent.Task.IsCompleted)
completedManualResetEvent.SetResult();
}
}
public new IEnumerable<T> GetConsumingEnumerable()
{
foreach (var item in base.GetConsumingEnumerable())
yield return item;
lock (completedManualResetEvent) //if GetConsumingEnumerable is used by multiple threads, the 2nd one would throw an InvalidOperationException
{
if (!completedManualResetEvent.Task.IsCompleted)
completedManualResetEvent.SetResult();
}
}
public new IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken) => throw new NotImplementedException();
public new T Take() => throw new NotImplementedException();
public new T Take(CancellationToken cancellationToken) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken) => throw new NotImplementedException();
public new bool TryTake([MaybeNullWhen(false)] out T item, TimeSpan timeout) => throw new NotImplementedException();
}
usage:
var x = new BlockingCollectionEx<int> { 1, 2, 3 };
x.CompleteAdding();
Task.Run(() =>
{
foreach (var item in x.GetConsumingEnumerable())
// do stuff in Task 1
});
Task.Run(() =>
{
foreach (var item in x.GetConsumingEnumerable())
// do stuff in Task 2
});
await x.WaitCompleted;
Debug.Assert(x.IsCompleted);
// do stuff since the collection is emtpy
Your implementation is not robust for general usage, but it may be good enough for an application that honors the following contract:
CompleteAddingmethod is invoked, theWaitCompletedtask will never complete.InvalidOperationExceptionfor all but one consumers.TakeorTryTakemethods, theWaitCompletedtask will never complete.Without knowing your specific use case, I couldn't say whether you have a legitimate reason for requesting this functionality. In general though, waiting for the exact moment that a
BlockingCollection<T>becomes empty and completed is usually unimportant. What is important is the exact moment that the processing of all consumed items is completed, which happens after the completion of the collection.Note: this answer targets the Revision 1 version of this question.