Enabling Await and Exception Capture with TPL Dataflow

86 Views Asked by At

I have designed a simple JobProcessor using TPL Data flow (my first time using it). I want to be able to create jobs, and have them invoked and placed on a priority queue (the PriorityBufferBlock). My code structure is as follows

public interface IJob<TInput>
{
    Task Execute(TInput input);

    Priority Priority { get; }
}

where

public enum Priority
{
    High,
    Medium,
    Low
}

and I have a custom version of the PriorityBufferBlock (taken from PriorityBufferBlock) with a custom refreshing cache implementation, to ensure clean up of the messages that are "reserved", this looks like

class PriorityBufferBlock<T> : ISourceBlock<T>, IReceivableSourceBlock<T>
{
    private readonly BufferBlock<T> _highPriorityBuffer;
    private readonly BufferBlock<T> _mediumPriorityBuffer;
    private readonly BufferBlock<T> _lowPriorityBuffer;
    private readonly RefreshingInMemoryCache<DataflowMessageHeader, ISourceBlock<T>> _messagesCache;
    // ... More code here 
}

The JobProcessor interface is

public interface IJobProcessor<TInput> 
{
    void RegisterHandler<TTask>(TInput input) where TTask : IJob<TInput>;

    Task Enqueue(IJob<TInput> task);
}

with implementation as

public class JobProcessor<TInput> : IJobProcessor<TInput>
{
    private readonly PriorityBufferBlock<IJob<TInput>> _priorityBufferBlock;
    private readonly IOptions<AzureOptions> _options;
    private readonly ILogger<IJobProcessor<TInput>> _logger;
    private readonly CancellationToken _token;

    public JobProcessor(
        IClock clock,
        IOptions<AzureOptions> options,
        ILogger<IJobProcessor<TInput>> logger,
        CancellationToken token)
    {
        _options = options ?? throw new ArgumentNullException(nameof(options));
        _logger = logger;
        _token = token;

        var dataflowBlockOptions = new DataflowBlockOptions { CancellationToken = token };
        _priorityBufferBlock = new PriorityBufferBlock<IJob<TInput>>(dataflowBlockOptions, clock, options, logger);

        _logger?.LogInformation($"{nameof(JobProcessor<TInput>)} initialized and configured successfully");
    }

    public void RegisterHandler<TJob>(TInput input) where TJob : IJob<TInput>
    {
        var actionBlock = new ActionBlock<IJob<TInput>>(
            (job) => job.Execute(input),
            new ExecutionDataflowBlockOptions
            {
                CancellationToken = _token,
                MaxDegreeOfParallelism = _options.Value.TaskProcessorMaxDegreeOfParallelism
            });

        _priorityBufferBlock.LinkTo(
            actionBlock,
            new DataflowLinkOptions
            {
                PropagateCompletion = true
            },
            (task) => task is IJob<TInput>);

        _logger?.LogInformation($"Handler for {typeof(TJob).Name} registered successfully");
    }

    public async Task Enqueue(IJob<TInput> task)
    {
        await _priorityBufferBlock.SendAsync(task, task.Priority);
        _logger?.LogInformation($"Successfully enqueued {task.GetType().Name} for processing");
    }
}

and this is proving to work well for everything except Exception handling. Because I am not awaiting the actionBlock, any exception from inside the running IJob are swallowed, as you would expect (here I am essentially doing Task.Run(() => throw new Exception());.

My question is, what is the best way to change the above so I can await and bubble the exceptions upwards to allow error responses from this API?

I have tried

public async Task RegisterHandler<TJob>(TInput input) where TJob : IJob<TInput>
{
    var actionBlock = new ActionBlock<IJob<TInput>>(
        (job) => job.Execute(input),
        new ExecutionDataflowBlockOptions
        {
            CancellationToken = _token,
            MaxDegreeOfParallelism = _options.Value.TaskProcessorMaxDegreeOfParallelism
        });
    await actionBlock.Completion;

    _priorityBufferBlock.LinkTo(
        actionBlock,
        new DataflowLinkOptions
        {
            PropagateCompletion = true
        },
        (job) => job is IJob<TInput>);

    _logger?.LogInformation($"Handler for {typeof(TJob).Name} registered successfully");
}

but this is not working and also makes no sense, I am a bit lost. Any help hugely appreciated. Thanks for your time.


NOTE:

An example of the working code via unit test is

[TestCase(1)]
[TestCase(3)]
[TestCase(6)]
public async Task EnsureConcurrencyLimitsAreNotExceeded(int maxDegreeOfParallelism)
{
    var waitHandle = new ManualResetEventSlim(false);
    var priorityBuffer = new ConcurrentQueue<Priority>();

    _mockGeneralOptions.SetReturnsDefault(new AzureOptions()
    {
        PriorityBufferBlockExpiryMilliseconds = 10,
        TaskProcessorMaxDegreeOfParallelism = maxDegreeOfParallelism
    });

    var jobProcessor = new JobProcessor<OptionalPair<ViewformTour, ViewformTour>>(
        _mockClock.Object,
        _mockGeneralOptions.Object,
        _mockLogger.Object,
        CancellationToken.None
    );

    var tour = new ViewformTour
    {
        Id = "_id"
    };

    jobProcessor.RegisterHandler<HighPriorityJob>(
        new OptionalPair<ViewformTour, ViewformTour>(
            Optional.From(tour),
            Optional.None<ViewformTour>()
    ));
    jobProcessor.RegisterHandler<LowPriorityJob>(
        new OptionalPair<ViewformTour, ViewformTour>(
            Optional.From(tour),
            Optional.None<ViewformTour>()
    ));

    var tasks = new List<Task>();

    tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));
    tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));
    tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));
    tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));
    tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));
    tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));

    await Task.WhenAll(tasks.ToArray());

    Thread.Sleep(150);
    waitHandle.Set();

    Assert.That(priorityBuffer, Is.Not.Null);
    Assert.That(priorityBuffer.Count, Is.EqualTo(maxDegreeOfParallelism));
}

with

private class HighPriorityJob : IJob<OptionalPair<ViewformTour, ViewformTour>>
{
    private ConcurrentQueue<Priority> _priorityBuffer;

    public HighPriorityJob(ConcurrentQueue<Priority> priorityBuffer)
    {
        _priorityBuffer = priorityBuffer;
    }

    public Task Execute(OptionalPair<ViewformTour, ViewformTour> input)
    {
        _priorityBuffer.Enqueue(Priority);
        return Task.CompletedTask;
    }

    public Priority Priority => Priority.High;
}

private class LowPriorityJob : IJob<OptionalPair<ViewformTour, ViewformTour>>
{
    private ManualResetEventSlim _waitHandle;
    private ConcurrentQueue<Priority> _priorityBuffer;
    private int _delayMilliseconds;
    private bool _setWaitHandle = false;

    public LowPriorityJob(
        ManualResetEventSlim waitHandle,
        ConcurrentQueue<Priority> priorityBuffer,
        bool setWaitHandle = false,
        int delayMilliseconds = 100)
    {
        _waitHandle = waitHandle;
        _priorityBuffer = priorityBuffer;
        _delayMilliseconds = delayMilliseconds;
        _setWaitHandle = setWaitHandle;
    }

    public async Task Execute(OptionalPair<ViewformTour, ViewformTour> input)
    {
        await Task.Delay(_delayMilliseconds);

        _priorityBuffer.Enqueue(Priority);
        if (_setWaitHandle)
        {
            _waitHandle.Set();
        }
    }

    public Priority Priority => Priority.Low;
}

EDIT II:

Okay, so I have now tried the following - the RegisterHandler now returns the ActionBlock on registration, I then set up a continuation

public ActionBlock<IJob<TInput>> RegisterHandler<TJob>(TInput input) where TJob : IJob<TInput>
{
    var actionBlock = new ActionBlock<IJob<TInput>>(
        (job) => job.Execute(input),
        new ExecutionDataflowBlockOptions
        {
            CancellationToken = _token,
            MaxDegreeOfParallelism = _options.Value.TaskProcessorMaxDegreeOfParallelism
        });

    _priorityBufferBlock.LinkTo(
        actionBlock,
        new DataflowLinkOptions
        {
            PropagateCompletion = true
        },
        (job) => job is IJob<TInput>);

    _logger?.LogInformation($"Handler for {typeof(TJob).Name} registered successfully");

    return actionBlock;
}

Then in my test

[Test]
public async Task DoesHandleExceptionGracefully()
{
    var priorityBuffer = new ConcurrentQueue<Priority>();
    var jobProcessor = new JobProcessor<OptionalPair<ViewformTour, ViewformTour>>(
        _mockClock.Object,
        _mockGeneralOptions.Object,
        _mockLogger.Object,
        CancellationToken.None
    );

    var tour = new ViewformTour
    {
        Id = "_id"
    };

    try
    {
        var handler = jobProcessor.RegisterHandler<ThrowingHighPriorityJob>(
            new OptionalPair<ViewformTour, ViewformTour>(
                Optional.From(tour),
                Optional.None<ViewformTour>()
        ));
        
        await jobProcessor.Enqueue(new ThrowingHighPriorityJob());
        await handler.Completion.ContinueWith(ant =>
        {
            throw new Exception("... From Continuation");
        }, TaskContinuationOptions.OnlyOnFaulted);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"External capture{ex.Message}");
    }
}

This outputs "External capture... From continuation", we have externalized the exception handling. HOWEVER, the continuation setup is now blocking. In production, I want to enqueue jobs dynamically, and this prevents that. :'[

1

There are 1 best solutions below

0
FluffMaster4000 On

In the end, there was no way to do this cleanly without breaking the control flow and go against the "design principles" of the library. So, to do this, I merely embraced the pipeline/dataflow mantra and created a IPoisonQueue

public class PoisonQueue<TInput> : IPoisonQueue<TInput>
{
    private readonly IPriorityBufferBlock<IJob<TInput>> _priorityBufferBlock;

    public PoisonQueue(IPriorityBufferBlock<IJob<TInput>> priorityBufferBlock)
    {
        _priorityBufferBlock = priorityBufferBlock ?? throw new ArgumentNullException(nameof(priorityBufferBlock));
    }

    public async Task Enqueue(IJob<TInput> job)
    {
        // Push the PoisonJob onto the buffer block...
    }
}

Then I have gone further with the DI for the JobProcessor and unit tested this to high heaven, the final implementation is

public class JobProcessor<TInput> : IJobProcessor<TInput>
{
    private readonly IPriorityBufferBlock<IJob<TInput>> _priorityBufferBlock;
    private readonly IPoisonQueue<TInput> _poisonQueue;

    private readonly IOptions<GeneralOptions> _options;
    private readonly ILogger<IJobProcessor<TInput>> _logger;

    public JobProcessor(
        IPriorityBufferBlock<IJob<TInput>> priorityBufferBlock,
        IPoisonQueue<TInput> poisonQueue,
        IClock clock,
        IOptions<GeneralOptions> options,
        ILogger<IJobProcessor<TInput>> logger)
    {
        _options = options ?? throw new ArgumentNullException(nameof(options));
        _logger = logger;

        _priorityBufferBlock = priorityBufferBlock ?? throw new ArgumentNullException(nameof(priorityBufferBlock));
        _priorityBufferBlock.Initialize();

        _poisonQueue = poisonQueue ?? throw new ArgumentNullException(nameof(poisonQueue));

        _logger?.LogInformation($"{nameof(JobProcessor<TInput>)} initialized and configured successfully");
    }

    public void RegisterHandler<TJob>(TInput input) where TJob : IJob<TInput>
    {
        var actionBlock = new ActionBlock<IJob<TInput>>(
            async (job) =>
            {
                var jobType = job.GetType().Name;

                var retryCount = _options.Value.JobProcessorRetryCount;
                var retryIntervalMilliseconds = _options.Value.JobProcessorRetryIntervalMilliseconds;

                var retryPolicy = Policy
                    .Handle<Exception>()
                    .WaitAndRetryAsync(
                        retryCount,
                        retryAttempt => TimeSpan.FromMilliseconds(Math.Pow(retryIntervalMilliseconds, retryAttempt)));

                try
                {
                    await retryPolicy.ExecuteAsync(async () =>
                    {
                        _logger?.LogInformation("Starting execution of job {JobType}...", jobType);
                        await job.Execute(input);
                    });
                }
                catch (Exception ex)
                {
                    _logger?.LogError(ex, $"{jobType} failed");
                    _logger?.LogWarning("Adding job {JobType} to poison queue...", jobType);

                    await _poisonQueue.Enqueue(job);
                }
            },
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = _options.Value.JobProcessorMaxDegreeOfParallelism
            });

        _priorityBufferBlock.LinkTo(
            actionBlock,
            new DataflowLinkOptions
            {
                PropagateCompletion = true
            },
            (job) => job is IJob<TInput>);

        _logger?.LogInformation($"Handler for {typeof(TJob).Name} registered successfully");
    }

    public async Task Enqueue(IJob<TInput> job)
    {
        await _priorityBufferBlock.SendAsync(job, job.Priority);
        _logger?.LogInformation($"Successfully enqueued {job.GetType().Name} for processing");
    }
}

Here I have use "Polly" for the retry and backoff, and once the retries are complete (and we still have failure), I push a PoisonJob on to the PriorityBufferBlock and the errors are handled in the same way other jobs are. It seems to work well, fingers crossed my test coverage is complete and correct.