How to convert this WebSocket pattern to a Task I can await/cancel/continuewith/

148 Views Asked by At

I have 3rd party WebSocket that I need to consume where the request is sent in one method and the response is given in another. How can I convert this type of pattern to a more typical async/await TPL Task that will support cancellation (via token), continue with and all the other goodies.

This is what I came up with so far, although I'm not sure if it works. I can't test it until Monday.

So here are my questions:

  1. Will this work?

  2. I've been reading about TaskCompletionSource. Is there a better way to do any of this possibly with TaskCompletionSource?

  3. I really don't like the lock because I know it has the potential to block for a long time but but I'm not sure how to do it any better because if I don't lock a second call to AsyncGetPositions could clear any positions already returned.

  4. Even with the lock I know that if there is a timeout or cancellation that creates a problem so maybe I just remove the cancellation token. The only other thing I can think of would be to create multiple clients that are all authenticated and ready to process a request and then manage them like a thread pool for these types of request but I'm not going to be doing that anytime soon so other than that... idk.

    private object GetPositionsLock = new object();
    private IEnumerable<Position> Positions { get; } = new List<Position>();
    private Task PositionsReturned { get; set; }
    public async Task<List<Position>> AsyncGetPositions(CancellationToken token)
    {
        try
        {
            lock (GetPositionsLock)
            {
                Positions.Clear();
                IbWebSocket.reqPositions();
                PositionsReturned = new Task(null, token, TaskCreationOptions.None);
                PositionsReturned.GetAwaiter().GetResult();
                return token.IsCancellationRequested ? null : Positions.ToList().Where(x => x.Shares != 0).ToList();
            }
        }
        catch (TimeoutException ex)
        {
            //LogTimeout
            throw;
        }
        catch (Exception ex)
        {
            //LogError
            throw;
        }
    }
    
    ///  <summary>
    ///         Provides a position to the reqPositions() method.  When the last position has been received positionEnd() is called.
    ///  </summary>
    ///  <param name="contract"></param>
    ///  <param name="value"></param>
    ///  <param name="account"></param>
    public void position(string account, Contract contract, double value)
    {
        try
        {
            Positions.Concat(new Position(account, contract, value));
        }
        catch (Exception ex)
        {
            //LogError
            throw;
        }
    }
    
    /// <summary>
    ///     Indicates all the positions have been transmitted.
    /// </summary>
    public void positionEnd()
    {
        PositionsReturned = Task.CompletedTask;
    }
    
1

There are 1 best solutions below

0
Stephen Cleary On BEST ANSWER

Will this work?

No. You shouldn't use the Task constructor, use lock with async code, or mix blocking with asynchronous code.

I've been reading about TaskCompletionSource. Is there a better way to do any of this possibly with TaskCompletionSource?

Yes, that's the type to use for this scenario.

I really don't like the lock because I know it has the potential to block for a long time but but I'm not sure how to do it any better because if I don't lock a second call to AsyncGetPositions could clear any positions already returned.

I recommend getting this working first, and then handling the additional requirement of reentrancy. Each of those are hard enough on their own.


What you want to do is have a TaskCompletionSource<T> and complete it when positionEnd is invoked. For simplicity, start without reentrancy concerns and without the CancellationToken. Once you fully understand TaskCompletionSource<T>, then you can add complexity:

private List<Position> Positions { get; } = new();
private TaskCompletionSource<List<Position>> PositionsReturned { get; set; }
public Task<List<Position>> AsyncGetPositions()
{
  Positions.Clear();
  PositionsReturned = new();
  IbWebSocket.reqPositions();
  return PositionsReturned.Task;
}

public void position(string account, Contract contract, double value)
{
  Positions.Add(new Position(account, contract, value));
}

public void positionEnd()
{
  PositionsReturned.TrySetResult(Positions);
}