I have a .NET 7 backgroundservice uses RabbitMq to consumme messages.
I've used the code sample provided by Microsoft and added the IBackgroundTaskQueue, BackgroundTaskQueue and QueuedHostedService
like described here : [link][1]
Each message is enqueue in channel from BackgroundTaskQueue class in Consumer_ReceivedAsync event, backgroundservice dequeue the
channel and execute the method (Process(DataReceived dataReceived, string routingKey, CancellationToken token)).
Application works fine, but I have some issues when I stop the service with StopAsyncMethod.
Sometimes, application throw AlreadyClosedException in Process method or throw
The channel has been closed
in CleanRabbit method.
I don't know if the handle cancellation of my code is correct or not ? Purpose is to stop the service properly and dispose rabbitmq resources.
private readonly CancellationToken _cancellationToken;
private readonly IBackgroundTaskQueue _taskQueue;
public EventBusRabbitMQ(IHostApplicationLifetime hostApplicationLifetime, IBackgroundTaskQueue taskQueue, /*some others parameters*/)
{
_cancellationToken = hostApplicationLifetime.ApplicationStopping;
_taskQueue = taskQueue;
}
public async ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
Each messages received, enqueue in _taskQueue with QueueBackgroundWorkItemAsync method and create linked token between EventBusRabbitMQ class and QueueBackgroundWorkItemAsync token:
private async Task Consumer_ReceivedAsync(object sender, DataReceived @event)
{
try
{
if (!_cancellationToken.IsCancellationRequested)
{
await ProcessQueueEvent(@event, @event.KeyName);
}
else
{
_logger.LogInformation("CancellationRequested from Consumer_ReceivedAsync");
CleanRabbit();
}
}
catch(Exception ex)
{
_logger.LogError($"Error from Consummer_ReceivedAsync : {ex.Message}");
}
}
private async ValueTask ProcessQueueEvent(DataReceived dataReceived, string routingKey)
{
try
{
await _taskQueue.QueueBackgroundWorkItemAsync(ct =>
{
var linkedToukenSource = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken, ct);
return Process(dataReceived, routingKey, linkedToukenSource.Token);
});
}
catch(Exception ex)
{
_logger.LogError(ex.Message);
}
}
Process method is execute by BackgroundProcessing in BackgroundService, a TokenSource is create in constructor :
private readonly CancellationTokenSource _tokenSource
public QueuedBackgroundService()
{
_tokenSource = new CancellationTokenSource();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await BackgroundProcessing(stoppingToken);
}
private async Task BackgroundProcessing(CancellationToken cancellationToken)
{
while (!_tokenSource.IsCancellationRequested)
{
try
{
//On récupère la tache qui est sotckée en file d'attente dans le Channel
var workItem = await TaskQueue.DequeueAsync(_tokenSource.Token);
await workItem(_tokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException op)
{
_logger.LogWarning($"{nameof(OperationCanceledException)} thrown.");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing {WorkItem}.");
}
}
}
Process method in EventBusRabbitMQ do some work.
private async ValueTask Process(DataReceived dataReceived, string routingKey, CancellationToken token)
{
try
{
token.ThrowIfCancellationRequested();
await DoWork(dataReceived, routingKey);
}
catch (OperationCanceledException op) when (token.IsCancellationRequested)
{
CleanRabbit();
}
catch (AlreadyClosedException ac)
{
_logger.LogError($"AlreadyClosedException {ac.Message}");
}
catch (Exception ex)
{
HandleException(dataReceived, ex);
}
}
CleanRabbit dispose rabbitmq :
private void CleanRabbit()
{
try
{
_basicConsumer.Received -= Consumer_Received;
if (_consumerChannel != null)
{
if (_consumerChannel.IsOpen)
_consumerChannel.Close();
_persistentConnection.Close();
_persistentConnection.Dispose();
}
}
catch (Exception ex)
{
_logger.LogError($"Error from CleanRabbit : {ex.Message}");
}
}
Stop method :
public override async Task StopAsync(CancellationToken stoppingToken)
{
_tokenSource.Cancel();
await base.StopAsync(stoppingToken);
}
[1]: https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-7.0&tabs=visual-studio#queued-background-tasks