I have an asynchronous sequence (stream) of messages that are arriving sometimes numerously and sometimes sporadically, and I would like to process them in batches of 10 messages per batch. I also want to enforce an upper limit to the latency between receiving a message and processing it, so a batch with fewer than 10 messages should also be processed, if 5 seconds have passed after receiving the first message of the batch. I found that I can solve the first part of the problem by using the Buffer operator from the System.Interactive.Async package:
IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
    // Process batch
}
The signature of the Buffer operator:
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, int count);
Unfortunately the Buffer operator has no overload with a TimeSpan parameter, so I can't solve the second part of the problem so easily. I'll have to implement somehow a batching operator with a timer myself. My question is: how can I implement a variant of the Buffer operator that has the signature below?
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count);
The timeSpan parameter should affect the behavior of the Buffer operator like so:
- A batch must be emitted when the timeSpanhas elapsed after emitting the previous batch (or initially after the invocation of theBuffermethod).
- An empty batch must be emitted if the timeSpanhas elapsed after emitting the previous batch, and no messages have been received during this time.
- Emitting batches more frequently than every timeSpanimplies that the batches are full. Emitting a batch with less thancountmessages before thetimeSpanhas elapsed, is not desirable.
I am OK with adding external dependencies to my project if needed, like the System.Interactive.Async or the System.Linq.Async packages.
P.S. this question was inspired by a recent question related to channels and memory leaks.
 
                        
The solution below uses the
PeriodicTimerclass (.NET 6) for receiving timer notifications, and theTask.WhenAnymethod for coordinating the timer and enumeration tasks. ThePeriodicTimerclass is more convenient than theTask.Delaymethod for this purpose, because it can be disposed directly, instead of requiring an accompanyingCancellationTokenSource.The timer is restarted each time a chunk is emitted, after the consumer has finished consuming the chunk.
Online demo.
This implementation is destructive, meaning that in case the
sourcesequence fails or the enumeration is canceled, any elements that have been consumed previously from thesourceand are buffered, will be lost. See this question for ideas about how to inject a non-destructive behavior.Care has been taken to avoid leaking fire-and-forget
MoveNextAsyncoperations or timers.For an implementation that uses the
Task.Delaymethod instead of thePeriodicTimerclass, and so it can be used by .NET versions previous than 6.0, you can look at the 7th revision of this answer. That revision includes also a tempting but flawed Rx-based implementation.