Allow user to cancel task running in BackgroundService

93 Views Asked by At

I created a background service to process items in a queue as demonstrated here:

Queued Background Task

Unlike the example, items are added to the queue via a Web API method that looks something like this:

[HttpPut("queueworkitem")]
public async Task<ActionResult> QueueWorkItem() {
    var collection = GetALargeCollection();
    await _taskQueue.QueueBackgroundWorkItemAsync(async token => {
        foreach (var item in collection) {
           //do work here
        }
    });
    return Ok();
}

I would like to provide a way for a caller to cancel a queued task in a subsequent request using a reference returned by QueueWorkItem. I tried the following, but CancellationTokenSource is not serializable. Is there some way to accomplish this?

[HttpPut("queueworkitem")]
public async Task<ActionResult> QueueWorkItem() {
    var collection = GetALargeCollection();
    var source = new CancellationTokenSource();
    var cancellationToken = source.Token;

    await _taskQueue.QueueBackgroundWorkItemAsync(async token => {
       foreach (var item in collection) {
           cancellationToken.ThrowIfCancellationRequested();
           //do work here
       }
    });
    return Ok(source);
}

[HttpPut("cancelworkitem")]
public Task<ActionResult> CancelWorkItem(CancellationTokenSource source) {
    source.Cancel();
    return Ok();
}

1

There are 1 best solutions below

2
AliReza Sabouri On BEST ANSWER

You can keep your cancellation token inside your task queue and use a unique key to find it later, for example:

create a WorkItem class/record to keep everything in one place:

public record WorkItem(Func<CancellationToken, ValueTask> func, string key, CancellationTokenSource tokenSource);

then you can store this key/token inside a dictionary: for example:

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly ConcurrentDictionary<string, CancellationTokenSource> _tokenSources;
    private readonly Channel<WorkItem> _queue;

    public BackgroundTaskQueue(int capacity = 100)
    {
        var options = new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _queue = Channel.CreateBounded<WorkItem>(options);
        _tokenSources = new ConcurrentDictionary<string, CancellationTokenSource>();
    }

    public bool CancelWorkItem(string key)
    {
        var exist = _tokenSources.TryRemove(key, out var tokenSource);
        if (!exist) return false; // token source not found

        tokenSource!.Cancel();
        return true;
    }

    public async ValueTask QueueBackgroundWorkItemAsync(WorkItem workItem)
    {
        _tokenSources.TryAdd(workItem.Key, workItem.TokenSource);
        await _queue.Writer.WriteAsync(workItem);
    }

    public async ValueTask<WorkItem> DequeueAsync(CancellationToken cancellationToken)
    {
        var workItem = await _queue.Reader.ReadAsync(cancellationToken);
        return workItem;
    }
}

At the end, you can use it this way in your API:

[HttpGet("queueworkitem")]
    public async Task<ActionResult> QueueWorkItem()
    {
        var key = Guid.NewGuid().ToString("N");
        var tokenSource = new CancellationTokenSource();
        var cancellationToken = tokenSource.Token; 
        
        await _taskQueue.QueueBackgroundWorkItemAsync(new WorkItem(async (token) =>
        {
            while (!cancellationToken.IsCancellationRequested && !token.IsCancellationRequested)
            {
                Console.WriteLine($"Processing ... {key}");
                await Task.Delay(1000, cancellationToken);
            } 
        }, key, tokenSource));
        
        return Ok(key);
    }
    
    
    [HttpGet("cancelworkitem")]
    public ActionResult CancelWorkItem(string token_key)
    {
        if (_taskQueue.CancelWorkItem(token_key))
        {
            return Ok();
        }

        return BadRequest("Task not found");
    }

This will work fine if you have only 1 server and you don't want to serialize and save your tasks in some storage. but if you need to run the tasks later, I would suggest checking out the Hangfire library which gives you exactly the same functionalities.