I need to receive messages from a queue broker and, after some processing, store them in a database. In order to save records in blocks, i use a BatchBlock. All this works through the BackgroundService
Code:
public class Worker : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly BatchBlock<Comment> _batchBlock;
private readonly ActionBlock<Comment[]> _importer;
private readonly ConnectionFactory _factory;
private readonly IModel _channel;
private const string _queueName = "Comment";
private static int count;
public Worker(IConfiguration configuration, IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
_factory = new ConnectionFactory
{ Uri = new Uri(configuration.GetSection("RabbitMqConnection").Value), DispatchConsumersAsync = true };
var connection = _factory.CreateConnection();
_channel = connection.CreateModel();
_channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
_batchBlock = new BatchBlock<Comment>(50);
_importer = new ActionBlock<Comment[]>(SaveCommentsToDb);
_batchBlock.LinkTo(_importer, new DataflowLinkOptions { PropagateCompletion = true });
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (_, ea) =>
await StartCommentHandlerGroup(ea);
_channel.BasicConsume(_queueName, false, consumer);
return Task.CompletedTask;
}
private async Task StartCommentHandlerGroup(BasicDeliverEventArgs message)
{
try
{
var content = Encoding.UTF8.GetString(message.Body.ToArray());
var comment = JsonConvert.DeserializeObject<Comment>(content);
await _batchBlock.SendAsync(comment);
}
catch (Exception e)
{
Log.Error($"!!!Handler error!!! {e.Message}{Environment.NewLine} !!!Message!!!{message.Body}");
throw;
}
}
private async Task SaveCommentsToDb(Comment[] comments)
{
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<Context>();
await dbContext.AddMessagesToDb(comments);
}
}
It works very weird..... If i set the BatchSize parameter to 10, about 200 records get into the database, if i set it to 2, about 300 hits. I set the BatchSize to 50 and higher and nothing gets into the database. In this case, the queue is consumed correctly. SendAsync works correctly in the BatchBlock, the Task returns true. After a certain part of the processing is done, i get the InputCount parameters in the _importer - 0 and the status is IsCompleted - true.
No matter how many new messages from the broker i receive in the future, this does not change the situation in any way and the ActionBlock is not called.
await dbContext.AddMessagesToDb(comments) contains:
foreach (var model in models)
await DbContext.AddAsync(model);
await DbContext.SaveChangesAsync();
I tried adding the BoundedCapacity parameter, but it didn't work.
batchBlock = new BatchBlock<Comment>(10, new GroupingDataflowBlockOptions(){BoundedCapacity = 50});