IAsyncEnumerable converted to IObservable (and Task) doesn't start

179 Views Asked by At

I have the following code in my BackgroundService:

public async Task ExecuteAsync(CancellationToken cancellationToken)
{
    await GetElements(cancellationToken)
        .ToObservable()
        .Buffer(TimeSpan.FromMinutes(1), 100)
        .Select(elements => Observable.FromAsync(() => ProcessElementsAsync(
            elements, cancellationToken)))
        .Concat()
        .ToTask(cancellationToken);
}

private async IAsyncEnumerable<Element> GetElements(
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)  // <-- Breakpoint is here
    {
        yield return await _queue.DequeueElementAsync(cancellationToken);
    }
}

When I run the app, the breakpoint is never hit. It looks as if the GetElements is never actually executed.

When I replace ExecuteAsync with this code, the breakpoint in GetElements does get hit:

public async Task ExecuteAsync(CancellationToken cancellationToken)
{
    GetElements(cancellationToken)
        .ToObservable()
        .Subscribe(val =>
        {
            _ = 1;
        });

    await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
}

What's wrong with the first implementation?

1

There are 1 best solutions below

0
ExCrispy On

It may be due to how you are reading GetElements. Try something like await foreach (var element in GetElements()).

I'm running unit tests and trying to hit a break point.

    public async IAsyncEnumerable<bool> SomeFunc()
    {
        yield return true;
    }

    public async Task SomeFunc2()
    {
        await AwaitableFunc();
        return;
    }

And I can only hit a breakpoint in SomeFunc if I call it like this

await foreach (var item in SomeFunc()){}

Calling it like this does not hit the breakpoint

SomeFunc()

The code for IAsyncEnumerable functions doesn't execute immediately upon calling the method. Instead, it only starts executing when you begin iterating over the enumerable.