Cancellation token in Parallel.ForEach loop not working

219 Views Asked by At

I can't seem to stop the Parallel.ForEach tasks below on CTRL-C, am I not catching the exceptions correctly? I have a long running operation in a while loop.

Can anyone tell me what I'm not doing correctly here?

public static void main(string[] args)
{
    CancellationTokenSource cts = new CancellationTokenSource();
    StartAllReaders(cts.token)
    Console.CancelKeyPress += (s,e) => {
       cts.cancel();
       e.cancel = true;
    }
}

public static void StartAllReaders(CancellationToken cts)
{
            try
            {
                Parallel.ForEach(
                _readers, 
                new ParallelOptions{MaxDegreeOfParallelism=10,CancellationToken = cts},
                (reader, state) =>
                {
                    reader.Start();
                    while (!cts.IsCancellationRequested)
                    {
                        cts.ThrowIfCancellationRequested();
                        try
                        {
                            reader.ReadTelemetry();
                            Thread.Sleep(reader.TimeDelay);
                        }
                        catch(Exception ex)
                        {
                            //log something
                            continue;
                        }
                    }}); 
            }
            catch (OperationCanceledException e)
            {
                Logger.Info("Readers are stopping.");
            }
}
1

There are 1 best solutions below

4
Magnus On

I'm guessing a little bit here but I believe something like this is what you are after. It will run all the readers in parallel (with a startup delay) and cancel them when cancellation is signaled.

public static async Task Main()
{
    var cts = new CancellationTokenSource(5000); //Cancel after 5 sec.
    var ct = cts.Token;
    var _readers = new List<Reader> { new(), new() };
    try
    {
        await Task.WhenAll(_readers.Select((r, i) => StartReader(r, i, ct)));
    }
    catch (OperationCanceledException e) when (ct.IsCancellationRequested)
    {
        Console.WriteLine("stop");
    }
    
}

public static async Task StartReader(Reader reader, int index, CancellationToken ct)
{
    await Task.Delay(index * 1000, ct); //Optional startup delay to avoid all readers starting at the same time. 
    while (true)
    {
        ct.ThrowIfCancellationRequested();
        reader.ReadTelemetry();
        await Task.Delay(reader.TimeDelay, ct); //Will throw on cancellation
    }
}

public class Reader
{
    public int TimeDelay { get; set; } = 100000;
    internal void ReadTelemetry()
    {
    }
}