C# BlockingCollection loop on IsCompleted instead of GetConsumingEnumerable

331 Views Asked by At

In an application, many actions need to be logged in the database. but this logging process should not slow down the request. So they should be done in an asynchronous queue or something.

This is my big picture:

big picture

And this is my example implementation:

Model:

public class ActivityLog
{
    [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
    public string Id { get; set; }
    public IPAddress IPAddress { get; set; }
    public string Action { get; set; }
    public string? Metadata { get; set; }
    public string? UserId { get; set; }
    public DateTime CreationTime { get; set; }
}

Queue:

public class LogQueue
{
    private const int QueueCapacity = 1_000_000; // is one million enough?

    private readonly BlockingCollection<ActivityLog> logs = new(QueueCapacity);

    public bool IsCompleted => logs.IsCompleted;
    public void Add(ActivityLog log) => logs.Add(log);
    public IEnumerable<ActivityLog> GetConsumingEnumerable() => logs.GetConsumingEnumerable();
    public void Complete() => logs.CompleteAdding();
}

Worker (the problem is here):

public class DbLogWorker : IHostedService
{
    private readonly LogQueue queue;
    private readonly IServiceScopeFactory scf;
    private Task jobTask;

    public DbLogWorker(LogQueue queue, IServiceScopeFactory scf)
    {
        this.queue = queue;
        this.scf = scf;

        jobTask = new Task(Job, TaskCreationOptions.LongRunning);
    }
  
    private void Job()
    {
        using var scope = scf.CreateScope();
        var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();

        // The following code does not work
        // My intension was to reduce DB trips
        //while(!queue.IsCompleted)
        //{
        //    var items = queue.GetConsumingEnumerable();
        //    dbContext.AddRange(items);
        //    dbContext.SaveChanges();
        //}

        // But this works
        // If I have 10 items available, I'll have 10 DB trips (not good), right?
        foreach (var item in queue.GetConsumingEnumerable())
        {
            dbContext.Add(item);
            dbContext.SaveChanges();
        }
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        jobTask.Start();
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        queue.Complete();
        jobTask.Wait(); // or 'await jobTask' ?
        return Task.CompletedTask; // unnecessary if I use 'await jobTask'
    }
}

Dependency Injection:

builder.Services.AddSingleton<LogQueue>();
builder.Services.AddHostedService<DbLogWorker>();

Controller:

[HttpGet("/")]
public IActionResult Get(string? name = "N/A")
{
    var log = new ActivityLog()
    {
        CreationTime = DateTime.UtcNow,
        Action = "Home page visit",
        IPAddress = HttpContext.Connection.RemoteIpAddress ?? IPAddress.Any,
        Metadata = $"{{ name: {name} }}",
        UserId = User.FindFirstValue(ClaimTypes.NameIdentifier)
    };

    queue.Add(log);

    return Ok("Welcome!");
}

As I explained in the comments, I want to get as many as items that are available and save them with one DB trip. My solution was the following code, but it doesn't work:

while (!queue.IsCompleted)
{
    var items = queue.GetConsumingEnumerable();
    dbContext.AddRange(items);
    dbContext.SaveChanges();
}

So instead, I'm using this which does a DB trip per each row:

foreach (var item in queue.GetConsumingEnumerable())
{
    dbContext.Add(item);
    dbContext.SaveChanges();
}

I also have two side questions: How can I increase workers? How can I increase workers dynamically based on queue count?

I created an example project for this question:

enter image description here

2

There are 2 best solutions below

0
IKomarovLeonid On

I can recommend you to write your worker as follows, looks like it will help you:

public class ActivityWorker : IWorker
    {
        // logs
        private static readonly Logger Logger = LogManager.GetLogger(nameof(ActivityWorker));

        // services (if necessary)

       
        private readonly BlockingCollection<ActivityLog> _tasks = new BlockingCollection<ActivityLog>();
        private Task _processingTask;

        
        public ActivityWorker(// your services if required)
        {
            
        }

       
        public void Start()
        {
            _processingTask = Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
        }

        public void Stop()
        {
            _tasks.CompleteAdding();
            _processingTask?.Wait();
        }
        
        // push your activity log here to collection
        public void Push(ActivityLog task)
        {
            _tasks.Add(task);
        }

        private void Process()
        {
            try
            {
                while (!_tasks.IsCompleted)
                {
                    var task = _tasks.Take();
                    ProcessTask(task);
                }
            }
            catch (InvalidOperationException)
            {
                Logger.Warn($"ActivityLog tasks worker have been stopped");
            }
            catch (Exception ex)
            {
                Logger.Error(ex);
            }
        }

        private void ProcessTask(ActivityLog task)
        {
            // YOUR logic here
        }

        

        public void Dispose()
        {
            _tasks?.CompleteAdding();
            _processingTask?.Wait();
            _processingTask?.TryDispose();
            _tasks?.TryDispose();
        }
    }
5
Parsa99 On

This is what I ended up with in case anyone needs it (thanks to Theodor Zoulias's hint):

public class DbLogWorker : IHostedService
{
    private const int BufferThreshold = 100;
    private readonly TimeSpan BufferTimeLimit = TimeSpan.FromSeconds(20);
    private DateTime lastDbUpdate = DateTime.UtcNow;

    private readonly LogQueue queue;
    private readonly IServiceScopeFactory scf;

    private Task jobTask;

    public DbLogWorker(LogQueue queue, IServiceScopeFactory scf)
    {
        this.queue = queue;
        this.scf = scf;

        jobTask = new Task(Job, TaskCreationOptions.LongRunning);
    }

    private void Job()
    {
        using var scope = scf.CreateScope();
        var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();

        ActivityLog nextItem;
        try
        {
            while (queue.TryTake(out nextItem, -1))
            {
                var buffer = new List<ActivityLog>();
                buffer.Add(nextItem);

                while (buffer.Count < BufferThreshold && 
                    DateTime.UtcNow - lastDbUpdate < BufferTimeLimit)
                {
                    if (queue.TryTake(out nextItem))
                        buffer.Add(nextItem);
                }

                dbContext.AddRange(buffer);
                dbContext.SaveChanges();

                lastDbUpdate = DateTime.UtcNow;
            }
        }
        catch (ObjectDisposedException) 
        {
            // queue completed?
        }
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        jobTask.Start(TaskScheduler.Default);
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        queue.Complete();
        jobTask.Wait(); // or 'await job' ?
        return Task.CompletedTask; // unnecessary if I use 'await job'
    }
}