Are there any thread safety issues in volatile reading value written with Interlocked.Exchange in this code?

127 Views Asked by At
class TaskTracker
{
    private volatile ConcurrentBag<Task> _bag = new();
    
    public async Task WaitAllAvailableAsync()
    {
        await Task.WhenAll(Interlocked.Exchange(ref _bag, new()));
    }

    public void AddTask(Task toWait)
    {
        // should guarantee that the Task will be awaited on at least one WaitAllAvailableAsync call
        ConcurrentBag<Task> oldBag, bag = _bag;
        do
        {
            bag.Add(toWait);
            oldBag = bag;
        } while ((bag = _bag) != oldBag);
    }
}

This class allows its user to add tasks to the internal list. When the WaitAllAvailableAsync is called, all currently added tasks are removed from the list and awaited. This may be used as a barrier mechanism to wait for all already started incoming requests to complete (without blocking adding newer requests). Both methods are thread safe and can be used from any number of threads. WaitAllAvailableAsync is called on a timer.

I'm not sure about the guarantees applied to this line:

} while ((bag = _bag) != oldBag);

Image that Interlocked.Exchange( have already executed at this point and another instance of bag was written. Does volatile read of _bag really guarantees seeing the new value? Would Interlocked.CompareExchange(ref _bag, null, null) provide better guarantees?

I saw many different opinions about the difference between Interlocked and volatile non-caching guarantees. Some people say that volaile only prevents reordering, others say that is also ensures reading somewhat fresh value but it can be stale by a few milli/nanoseconds. For the code above reading any stale value can lead to that the task will be added to an already discarded _bag instance and thus the task will never be awaited. I'm not sure whom to believe so I decided to ask here.

So are there any thread-safety issues here? Is there any scenario when added task will not be awaited inside WaitAllAvailableAsync call at least once (assuming endless running)?

I know that this code may be rewritten without volatile by using ConcurrentQueue but here I'm trying to understand specifically the behavior of the volatile read in this example.

3

There are 3 best solutions below

7
Theodor Zoulias On

Is there any scenario when added task will not be awaited inside WaitAllAvailableAsync call at least once (assuming endless running)?

In my opinion, no, there is no scenario where an added task will not be awaited by the WaitAllAvailableAsync at least once. The code as is should perform correctly its intended purpose, on all systems that are currently (.NET 8) supported by the .NET runtime.

My understanding is that in practice (on all currently supported systems) reading a volatile field returns a value equally fresh with reading the same field with Interlocked.CompareExchange(ref _field, null, null). Which should return a value fresh enough to satisfy the requirements for the correctness of the code in the question. I would be very surprised if it was proven experimentally that a system exists where the code in the question fails to work correctly.

5
Joshua On

This code cannot and does not work. It has the appearance of working if only one thread can call WaitAllAvailableAsync(), but ...

Thread 1                                       Thread 2
                                               bag = _bag;
rax = Interlocked.Exchange(ref _bag, new())
                                               bag.Add(toWait);                                            

Task.WhenAll(rax)                              while ((bag = _bag) != oldBag)
                                               bag.Add(toWait);

//... much later

Task.WhenAll(Interlocked.Exchange(ref _bag, new()))

It seems harmless to await a task twice but it isn't if the task threw an exception.

To do this kind of thing we need to use Interlocked.Exchange correctly on both sides. I'm going to run my mouth off here.

class TaskTracker {
    private class TaskListNode : IEnumerable<Task> {
        public static readonly TaskListNode EndOfList = new TaskListNode(null);

        private TaskListNode next;
        private Task<T> task;
        public TaskListNode(Task t) { task = t; }
        public SetListHead(ref TaskListNode head) {
            // Interlocked here
            do {
                next = head;
            } while (Interlocked.CompareExchange(head, this, next) != next);
        }
        public IEnumerator<Task> GetEnumerator() => new Enumerator(this); }
        private class Enumerator : IEnumerator<Task>() {
            private TaskListNode first;
            private TaskListNode next;
            public Task Current { get; private set; }
            public Enumerator(TaskListNode start) { next = first = start; }
            public void Reset() { next = first; }
            public bool MoveNext() {
                if (next == EndOfList || next is null) return false;
                current = next.task;
                next = next.next;
            }
            public void Dispose() {}
        }
    }

    private TaskListNode head = TaskListNode.EndOfList;

    public Task WaitAllAvailableAsync()
    {
        return Task.WhenAll(Interlocked.Exchange(ref head, TaskListNode.EndOfList));
    }

    public void AddTask(Task toWait)
    {
        new TaskListNode(toWait).SetListHead(ref head);
    }

The Interlocked.Exchange() call was right. Not doing another Interlocked call in AddTask() was not. I ended up having to swap the datastructure out to get something I could find an Interlocked method for that was actually useful. The single linked list works well here.

4
Stephen Cleary On

I've been playing with some "interlocked immutable state" approaches recently. Here's the current version of my helpers:

internal static class InterlockedState
{
    /// <summary>
    /// Executes a state transition from one state to another.
    /// </summary>
    /// <typeparam name="T">The type of the state; this is generally an immutable type. Strongly consider using a record class.</typeparam>
    /// <param name="state">The location of the state.</param>
    /// <param name="transformation">The transformation to apply to the state. This may be invoked any number of times and should be a pure function.</param>
    /// <param name="oldState">The old state.</param>
    /// <returns>The new state.</returns>
    public static T Transform<T>(ref T state, Func<T, T> transformation, out T oldState)
        where T : class?
    {
        _ = transformation ?? throw new ArgumentNullException(nameof(transformation));

        while (true)
        {
            oldState = Interlocked.CompareExchange(ref state, default!, default!);
            var newState = transformation(oldState);
            if (ReferenceEquals(Interlocked.CompareExchange(ref state!, newState, oldState), oldState))
                return newState;
        }
    }

    /// <summary>
    /// Reads the current state. Note that the state may have changed by the time this method returns.
    /// </summary>
    /// <typeparam name="T">The type of the state; this is generally an immutable type. Strongly consider using a record class.</typeparam>
    /// <param name="state">The location of the state.</param>
    /// <returns>The current state.</returns>
    public static T Read<T>(ref T state)
        where T : class? =>
        Interlocked.CompareExchange(ref state, default!, default!);
}

They can be used like this:

class TaskTracker
{
    private ImmutableHashSet<Task> _bag = ImmutableHashSet<Task>.Empty;

    public Task WaitAllAvailableAsync()
    {
        InterlockedState.Transform(ref _bag, _ => ImmutableHashSet<Task>.Empty, out var oldBag);
        return Task.WhenAll(oldBag);
    }

    public void AddTask(Task toWait)
    {
        InterlockedState.Transform(ref _bag, bag => bag.Add(toWait), out _);
    }
}