Cancel properly BackgroundService with Rabbitmq

318 Views Asked by At

I have a .NET 7 backgroundservice uses RabbitMq to consumme messages.

I've used the code sample provided by Microsoft and added the IBackgroundTaskQueue, BackgroundTaskQueue and QueuedHostedService like described here : [link][1]

Each message is enqueue in channel from BackgroundTaskQueue class in Consumer_ReceivedAsync event, backgroundservice dequeue the channel and execute the method (Process(DataReceived dataReceived, string routingKey, CancellationToken token)).

Application works fine, but I have some issues when I stop the service with StopAsyncMethod. Sometimes, application throw AlreadyClosedException in Process method or throw

The channel has been closed

in CleanRabbit method.

I don't know if the handle cancellation of my code is correct or not ? Purpose is to stop the service properly and dispose rabbitmq resources.

private readonly CancellationToken _cancellationToken;
private readonly IBackgroundTaskQueue _taskQueue;
public EventBusRabbitMQ(IHostApplicationLifetime hostApplicationLifetime, IBackgroundTaskQueue taskQueue, /*some others parameters*/)
{
     _cancellationToken = hostApplicationLifetime.ApplicationStopping;
     _taskQueue = taskQueue;
}

public async ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem)
{
    if (workItem == null)
    {
        throw new ArgumentNullException(nameof(workItem));
    }
   
    await _queue.Writer.WriteAsync(workItem);
}

public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken)
{
    var workItem = await _queue.Reader.ReadAsync(cancellationToken);

    return workItem;
}

Each messages received, enqueue in _taskQueue with QueueBackgroundWorkItemAsync method and create linked token between EventBusRabbitMQ class and QueueBackgroundWorkItemAsync token:

private async Task Consumer_ReceivedAsync(object sender, DataReceived @event)
{
    try
    {
        if (!_cancellationToken.IsCancellationRequested)
        {
            await ProcessQueueEvent(@event, @event.KeyName);
        }
        else
        {
            _logger.LogInformation("CancellationRequested from Consumer_ReceivedAsync");
            CleanRabbit();
        }
    }
    catch(Exception ex)
    {
        _logger.LogError($"Error from Consummer_ReceivedAsync : {ex.Message}");
    }
}

private async ValueTask ProcessQueueEvent(DataReceived dataReceived, string routingKey)
{
    try
    {
        await _taskQueue.QueueBackgroundWorkItemAsync(ct =>
        {
            var linkedToukenSource = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken, ct);

            return Process(dataReceived, routingKey, linkedToukenSource.Token);
         });
    }
    catch(Exception ex)
    {
        _logger.LogError(ex.Message);
    }
}

Process method is execute by BackgroundProcessing in BackgroundService, a TokenSource is create in constructor :

private readonly CancellationTokenSource _tokenSource
public QueuedBackgroundService()
{
    _tokenSource = new CancellationTokenSource();
}
    

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await BackgroundProcessing(stoppingToken);
}
    
private async Task BackgroundProcessing(CancellationToken cancellationToken)
{
    while (!_tokenSource.IsCancellationRequested)
    {
        try
        {
            //On récupère la tache qui est sotckée en file d'attente dans le Channel
            var workItem = await TaskQueue.DequeueAsync(_tokenSource.Token);

            await workItem(_tokenSource.Token).ConfigureAwait(false);
        }
        catch (OperationCanceledException op)
        {
            _logger.LogWarning($"{nameof(OperationCanceledException)} thrown.");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error occurred executing {WorkItem}.");
        }
    }
}

Process method in EventBusRabbitMQ do some work.

private async ValueTask Process(DataReceived dataReceived, string routingKey, CancellationToken token)
{
    try
    {
        token.ThrowIfCancellationRequested();

        await DoWork(dataReceived, routingKey);
    }
    catch (OperationCanceledException op) when (token.IsCancellationRequested)
    {
        CleanRabbit();
    }
    catch (AlreadyClosedException ac)
    {
        _logger.LogError($"AlreadyClosedException {ac.Message}");
    }
    catch (Exception ex)
    {
        HandleException(dataReceived, ex);
    }
}

CleanRabbit dispose rabbitmq :

private void CleanRabbit()
{
    try
    {
         _basicConsumer.Received -= Consumer_Received;
        if (_consumerChannel != null)
        {
            if (_consumerChannel.IsOpen)
                _consumerChannel.Close();

            _persistentConnection.Close();
            _persistentConnection.Dispose();
        }
    }
    catch (Exception ex)
    {
        _logger.LogError($"Error from CleanRabbit : {ex.Message}");
    }
}

Stop method :

public override async Task StopAsync(CancellationToken stoppingToken)
{
    _tokenSource.Cancel();

    await base.StopAsync(stoppingToken);
}


  [1]: https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-7.0&tabs=visual-studio#queued-background-tasks
0

There are 0 best solutions below