I have designed a simple JobProcessor using TPL Data flow (my first time using it). I want to be able to create jobs, and have them invoked and placed on a priority queue (the PriorityBufferBlock). My code structure is as follows
public interface IJob<TInput>
{
Task Execute(TInput input);
Priority Priority { get; }
}
where
public enum Priority
{
High,
Medium,
Low
}
and I have a custom version of the PriorityBufferBlock (taken from PriorityBufferBlock) with a custom refreshing cache implementation, to ensure clean up of the messages that are "reserved", this looks like
class PriorityBufferBlock<T> : ISourceBlock<T>, IReceivableSourceBlock<T>
{
private readonly BufferBlock<T> _highPriorityBuffer;
private readonly BufferBlock<T> _mediumPriorityBuffer;
private readonly BufferBlock<T> _lowPriorityBuffer;
private readonly RefreshingInMemoryCache<DataflowMessageHeader, ISourceBlock<T>> _messagesCache;
// ... More code here
}
The JobProcessor interface is
public interface IJobProcessor<TInput>
{
void RegisterHandler<TTask>(TInput input) where TTask : IJob<TInput>;
Task Enqueue(IJob<TInput> task);
}
with implementation as
public class JobProcessor<TInput> : IJobProcessor<TInput>
{
private readonly PriorityBufferBlock<IJob<TInput>> _priorityBufferBlock;
private readonly IOptions<AzureOptions> _options;
private readonly ILogger<IJobProcessor<TInput>> _logger;
private readonly CancellationToken _token;
public JobProcessor(
IClock clock,
IOptions<AzureOptions> options,
ILogger<IJobProcessor<TInput>> logger,
CancellationToken token)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger;
_token = token;
var dataflowBlockOptions = new DataflowBlockOptions { CancellationToken = token };
_priorityBufferBlock = new PriorityBufferBlock<IJob<TInput>>(dataflowBlockOptions, clock, options, logger);
_logger?.LogInformation($"{nameof(JobProcessor<TInput>)} initialized and configured successfully");
}
public void RegisterHandler<TJob>(TInput input) where TJob : IJob<TInput>
{
var actionBlock = new ActionBlock<IJob<TInput>>(
(job) => job.Execute(input),
new ExecutionDataflowBlockOptions
{
CancellationToken = _token,
MaxDegreeOfParallelism = _options.Value.TaskProcessorMaxDegreeOfParallelism
});
_priorityBufferBlock.LinkTo(
actionBlock,
new DataflowLinkOptions
{
PropagateCompletion = true
},
(task) => task is IJob<TInput>);
_logger?.LogInformation($"Handler for {typeof(TJob).Name} registered successfully");
}
public async Task Enqueue(IJob<TInput> task)
{
await _priorityBufferBlock.SendAsync(task, task.Priority);
_logger?.LogInformation($"Successfully enqueued {task.GetType().Name} for processing");
}
}
and this is proving to work well for everything except Exception handling. Because I am not awaiting the actionBlock, any exception from inside the running IJob are swallowed, as you would expect (here I am essentially doing Task.Run(() => throw new Exception());.
My question is, what is the best way to change the above so I can await and bubble the exceptions upwards to allow error responses from this API?
I have tried
public async Task RegisterHandler<TJob>(TInput input) where TJob : IJob<TInput>
{
var actionBlock = new ActionBlock<IJob<TInput>>(
(job) => job.Execute(input),
new ExecutionDataflowBlockOptions
{
CancellationToken = _token,
MaxDegreeOfParallelism = _options.Value.TaskProcessorMaxDegreeOfParallelism
});
await actionBlock.Completion;
_priorityBufferBlock.LinkTo(
actionBlock,
new DataflowLinkOptions
{
PropagateCompletion = true
},
(job) => job is IJob<TInput>);
_logger?.LogInformation($"Handler for {typeof(TJob).Name} registered successfully");
}
but this is not working and also makes no sense, I am a bit lost. Any help hugely appreciated. Thanks for your time.
NOTE:
An example of the working code via unit test is
[TestCase(1)]
[TestCase(3)]
[TestCase(6)]
public async Task EnsureConcurrencyLimitsAreNotExceeded(int maxDegreeOfParallelism)
{
var waitHandle = new ManualResetEventSlim(false);
var priorityBuffer = new ConcurrentQueue<Priority>();
_mockGeneralOptions.SetReturnsDefault(new AzureOptions()
{
PriorityBufferBlockExpiryMilliseconds = 10,
TaskProcessorMaxDegreeOfParallelism = maxDegreeOfParallelism
});
var jobProcessor = new JobProcessor<OptionalPair<ViewformTour, ViewformTour>>(
_mockClock.Object,
_mockGeneralOptions.Object,
_mockLogger.Object,
CancellationToken.None
);
var tour = new ViewformTour
{
Id = "_id"
};
jobProcessor.RegisterHandler<HighPriorityJob>(
new OptionalPair<ViewformTour, ViewformTour>(
Optional.From(tour),
Optional.None<ViewformTour>()
));
jobProcessor.RegisterHandler<LowPriorityJob>(
new OptionalPair<ViewformTour, ViewformTour>(
Optional.From(tour),
Optional.None<ViewformTour>()
));
var tasks = new List<Task>();
tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));
tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));
tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));
tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));
tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));
tasks.Add(jobProcessor.Enqueue(new LowPriorityJob(waitHandle, priorityBuffer)));
await Task.WhenAll(tasks.ToArray());
Thread.Sleep(150);
waitHandle.Set();
Assert.That(priorityBuffer, Is.Not.Null);
Assert.That(priorityBuffer.Count, Is.EqualTo(maxDegreeOfParallelism));
}
with
private class HighPriorityJob : IJob<OptionalPair<ViewformTour, ViewformTour>>
{
private ConcurrentQueue<Priority> _priorityBuffer;
public HighPriorityJob(ConcurrentQueue<Priority> priorityBuffer)
{
_priorityBuffer = priorityBuffer;
}
public Task Execute(OptionalPair<ViewformTour, ViewformTour> input)
{
_priorityBuffer.Enqueue(Priority);
return Task.CompletedTask;
}
public Priority Priority => Priority.High;
}
private class LowPriorityJob : IJob<OptionalPair<ViewformTour, ViewformTour>>
{
private ManualResetEventSlim _waitHandle;
private ConcurrentQueue<Priority> _priorityBuffer;
private int _delayMilliseconds;
private bool _setWaitHandle = false;
public LowPriorityJob(
ManualResetEventSlim waitHandle,
ConcurrentQueue<Priority> priorityBuffer,
bool setWaitHandle = false,
int delayMilliseconds = 100)
{
_waitHandle = waitHandle;
_priorityBuffer = priorityBuffer;
_delayMilliseconds = delayMilliseconds;
_setWaitHandle = setWaitHandle;
}
public async Task Execute(OptionalPair<ViewformTour, ViewformTour> input)
{
await Task.Delay(_delayMilliseconds);
_priorityBuffer.Enqueue(Priority);
if (_setWaitHandle)
{
_waitHandle.Set();
}
}
public Priority Priority => Priority.Low;
}
EDIT II:
Okay, so I have now tried the following - the RegisterHandler now returns the ActionBlock on registration, I then set up a continuation
public ActionBlock<IJob<TInput>> RegisterHandler<TJob>(TInput input) where TJob : IJob<TInput>
{
var actionBlock = new ActionBlock<IJob<TInput>>(
(job) => job.Execute(input),
new ExecutionDataflowBlockOptions
{
CancellationToken = _token,
MaxDegreeOfParallelism = _options.Value.TaskProcessorMaxDegreeOfParallelism
});
_priorityBufferBlock.LinkTo(
actionBlock,
new DataflowLinkOptions
{
PropagateCompletion = true
},
(job) => job is IJob<TInput>);
_logger?.LogInformation($"Handler for {typeof(TJob).Name} registered successfully");
return actionBlock;
}
Then in my test
[Test]
public async Task DoesHandleExceptionGracefully()
{
var priorityBuffer = new ConcurrentQueue<Priority>();
var jobProcessor = new JobProcessor<OptionalPair<ViewformTour, ViewformTour>>(
_mockClock.Object,
_mockGeneralOptions.Object,
_mockLogger.Object,
CancellationToken.None
);
var tour = new ViewformTour
{
Id = "_id"
};
try
{
var handler = jobProcessor.RegisterHandler<ThrowingHighPriorityJob>(
new OptionalPair<ViewformTour, ViewformTour>(
Optional.From(tour),
Optional.None<ViewformTour>()
));
await jobProcessor.Enqueue(new ThrowingHighPriorityJob());
await handler.Completion.ContinueWith(ant =>
{
throw new Exception("... From Continuation");
}, TaskContinuationOptions.OnlyOnFaulted);
}
catch (Exception ex)
{
Console.WriteLine($"External capture{ex.Message}");
}
}
This outputs "External capture... From continuation", we have externalized the exception handling. HOWEVER, the continuation setup is now blocking. In production, I want to enqueue jobs dynamically, and this prevents that. :'[
In the end, there was no way to do this cleanly without breaking the control flow and go against the "design principles" of the library. So, to do this, I merely embraced the pipeline/dataflow mantra and created a
IPoisonQueueThen I have gone further with the DI for the
JobProcessorand unit tested this to high heaven, the final implementation isHere I have use "Polly" for the retry and backoff, and once the retries are complete (and we still have failure), I push a
PoisonJobon to thePriorityBufferBlockand the errors are handled in the same way other jobs are. It seems to work well, fingers crossed my test coverage is complete and correct.