How to best use TaskCompletionSource with SemaphoreSlim

680 Views Asked by At

This is an evolution of my previous question. To recap, I have 3rd party WebSocket that I need to consume where the request is sent in one method and the response is given in another. I'm attempting to convert this to a Task.

I read this MSDN Magazine article which shows an example of using a SemaphoreSlim to control entry but my case isn't quite the same and I'm not sure what the best thing to do is.

I could put the SemaphoreSlim.Release() in the finally block of GetPositionsAsync() as shown in the article but if there were an error then the finally could get called before positionEnd() which would cause an error in GetPositionsAsync() if there was another thread waiting for the semaphore. On the other hand if I put the SemaphoreSlim.Release() in positionEnd() can I reliably assume that GetPositionsAsync() will return the proper result? I'm worried that if the release happens a thread waiting for the semaphore might call Positions.Clear() before the other thread saves the value of Positions to result. So really the same concern either way. Which is better or is there another way to protect this problem from happening all together?

This is what I have so far...

private TaskCompletionSource<List<Position>> PositionsTaskSource { get; set; }
private readonly SemaphoreSlim PositionsMutex = new(1, 1);

public async Task<List<Position>> GetPositionsAsync()
{
    try
    {
        await PositionsMutex.WaitAsync().ConfigureAwait(false);
        Positions.Clear();
        PositionsTaskSource = new();
        IbWebSocket.reqPositions();
        var result = await PositionsTaskSource.Task;
        //Write to Trace Log here
        return result;
    }
    finally
    {
        //I could put the Release here as shown in the article
        //PositionsMutex.Release(); 
    }
}

/// <summary>
///        Provides a position to the reqPositions() method.  When the last position has been received positionEnd() is called.
/// </summary>
/// <param name="account"></param>
/// <param name="contract"></param>
/// <param name="value"></param>
public void position(string account, Contract contract, double value)
{
    Positions.Add(new Position(account, contract, value));
}

/// <summary>
///     Indicates all the positions have been transmitted.
/// </summary>
public void positionEnd()
{
    PositionsTaskSource.TrySetResult(Positions))
    PositionsMutex.Release();
}
2

There are 2 best solutions below

2
Nick On

There is no need to call PositionsMutex.Release(); in positionEnd(). Once positionEnd() sets the task result, GetPositionsAsync will continue and ultimately release the semaphore, thus allowing the next caller to re-enter.

0
Peter Csala On

As it was stated by the MSDN article and by Nick as well it is enough to call the Release inside the GetPositionsAsync's finally block.

In order to better articulate your intent I would suggest a slight change in your code:

  • Instead of using TaskCompletionSource<List<Position>>
  • I would suggest to use TaskCompletionSource<object>:
private TaskCompletionSource<object> PositionsHasBeenPopulated;
private readonly SemaphoreSlim GetPositionAsyncMutex = new(1, 1);

public async Task<List<Position>> GetPositionsAsync()
{
    try
    {
        await GetPositionAsyncMutex.WaitAsync().ConfigureAwait(false);
        Positions.Clear();
        PositionsHasBeenPopulated = new();
        IbWebSocket.reqPositions();
        
        await PositionsHasBeenPopulated.Task;
        return Positions;
    }
    finally
    {
        GetPositionAsyncMutex.Release();
    }
}


public void position(string account, Contract contract, double value)
    => Positions.Add(new Position(account, contract, value));

public void positionEnd()
    => PositionsHasBeenPopulated.TrySetResult(null);
  • The GetPositionAsyncMutex limits the concurrent calls to one
  • The PositionsHasBeenPopulated is used only for signalling
    • You can consider TaskCompletionSource<object> as the async version of ManualResetEvent