Use IAsyncEnumerable to replace Task and Action delegate

386 Views Asked by At

I should implement a method that can return a list of devices. The goal is to return a device as soon as one is found without waiting for the search to finish.

To do this I think I use IAsyncEnumerable.

The Discover method of my implementation has an Action that takes care of adding an item to the list as soon as one is found.

How could I modify the Action and make sure that the new device is not added to a list but returned via "yield return"?

Below is the code I used:

public async IAsyncEnumerable<Device> ScanAsync(
    CancellationToken cancellationToken = default(CancellationToken))
{
    var devices = new List<DiscoveryDevice>();

    await Discover(TimeoutSeconds, d => devices.Add(d), cancellationToken);

    yield return new Device("", ""); //ERROR: return always last
}

private async Task Discover(int timeout, Action<DiscoveryDevice> onDeviceDiscovered,
    CancellationToken cancellationToken = default(CancellationToken))
{
    IEnumerable<IOnvifUdpClient> foreachInterface = clientFactory
        .CreateClientForeachInterface();

    if (!foreachInterface.Any())
        throw new Exception(
            "Missing valid NetworkInterfaces, UdpClients could not be created");

    await Task.WhenAll(foreachInterface.Select(
        client => Discover(timeout, client, onDeviceDiscovered,
            cancellationToken)).ToArray());
}

private async Task Discover(int timeout, IOnvifUdpClient client,
    Action<DiscoveryDevice> onDeviceDiscovered,
    CancellationToken cancellationToken = default(CancellationToken))
{
    var messageId = Guid.NewGuid();
    var responses = new List<UdpReceiveResult>();
    var cts = new CancellationTokenSource(TimeSpan.FromSeconds(timeout));

    try
    {
        await SendProbe(client, messageId);

        while (true)
        {
            if (cts.IsCancellationRequested ||
                cancellationToken.IsCancellationRequested)
                break;

            try
            {
                var response = await client.ReceiveAsync()
                                .WithCancellation(cancellationToken)
                                .WithCancellation(cts.Token);

                if (IsAlreadyDiscovered(response, responses)) continue;

                responses.Add(response);
                var discoveredDevice = ProcessResponse(response, messageId);

                if (discoveredDevice != null)
                {
                    Task.Run(() => onDeviceDiscovered(discoveredDevice));
                }
            }
            catch (Exception)
            { }
        }
    }
    finally
    {
        client.Close();
    }
}
1

There are 1 best solutions below

4
Theodor Zoulias On

You need a buffer to store the discovered devices, because the work of discovering the devices is not driven by the enumeration of the resulting IAsyncEnumerable<Device> sequence. In other words you have a push system, hidden behind a pull interface. When this happens you need a buffer, and a suitable buffer for your case is the Channel<T> class. You can find a synopsis of the features of this class here. Usage example:

public IAsyncEnumerable<Device> ScanAsync(
    CancellationToken cancellationToken = default)
{
    Channel<Device> devices = Channel.CreateUnbounded<Device>();

    Task task = Discover(TimeoutSeconds,
        dd => devices.Writer.TryWrite(new Device(dd.X, dd.Y)),
        cancellationToken);

    _ = task.ContinueWith(t => devices.Writer.TryComplete(t.Exception), default,
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

    return devices.Reader.ReadAllAsync(cancellationToken);
}

There are a couple of rough corners in the above code:

  1. There is a fire-and-forget ContinueWith continuation.
  2. The cancellationToken does not cancel gracefully. The resulting sequence might be canceled before the completion of the Discover asynchronous method. That's a second case of fire-and-forget.

It might be possible to improve the code by ditching the Action<DiscoveryDevice> parameter, and passing instead the devices.Writer as argument (a parameter of type ChannelWriter<Device>).