I have my custom WebSocket client (using System.Net.WebSockets library) that works pretty well. Internally, library has 2 separate Tasks: first task is sending commands to the web socket server, second task is receiving responses from the server. Inside of each task I use "infinite loop" approach (because they works in background) like:
SendTask:
// Task that sends command through the web socket and write them to response queue.
// Implementation of responseQueue & commandQueue are below.
// CommandQueue has some items at the beggining.
while (webSocket.State != WebSocketState.Closed)
{
var command = await commandQueue.ReadAsync(cts);
await webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, CancellationToken.None);
//in order to log command sent, push it into response queue.
responseQueue.Push(command);
}
ReceiveTask:
//Task that receive responses from the same web socket and push them into response queue as well.
//these variables needs for a multipacket responses
int dataPerPacket = 8 * 1024; //8192
StringBuilder strBuilder = new StringBuilder();
var buffer = new ArraySegment<byte>(new byte[dataPerPacket]);
while (webSocket.State != WebSocketState.Closed)
{
var receiveResult = await webSocket.ReceiveAsync(buffer, recCts);
// receive message
strBuilder.Append(Encoding.UTF8.GetString(buffer.Array, buffer.Offset, receiveResult.Count));
//keep doing until end of message
while (!receiveResult.EndOfMessage)
{
receiveResult = await webSocket.ReceiveAsync(buffer, recCts);
strBuilder.Append(Encoding.UTF8.GetString(buffer.Array, buffer.Offset, receiveResult.Count));
}
responseQueue.Push(strBuilder.ToString());
strBuilder.Clear();
}
I simplicated my code, but nothing valuable didn't lose. The websocket is just open echo server on the internet.
So, for the command and response queues I am using Channel approach and have implemented my own custom class (because I am on .NET Framework 4.5.2) .The intension of this class is to be able to produce (or write) data to the responseQueue in the SendTask and consume (or read) data from the responseQueue in the ReceiveTask efficiently, that is, without blocking (and avoid CPU consuming due to infinite loops).
Implementation of Channel:
public class Channel
{
private ConcurrentQueue<string> _queue;
private SemaphoreSlim _flag;
public Channel()
{
_queue = new ConcurrentQueue<string>();
_flag = new SemaphoreSlim(0);
}
public async Task<string> ReadAsync(CancellationToken readCancelToken)
{
await _flag.WaitAsync(readCancelToken);
if (readCancelToken.IsCancellationRequested)
{
return default;
}
if (_queue.TryDequeue(out item))
{
return item;
}
return default;
}
public void Push(string item)
{
_queue.Enqueue(item);
_flag.Release();
}
}
But, for some unclear reason I have an unexpected behavior when it works. I expect that order of pushing into responseQueue will be as the following:
command 1, command 2, response 1, command 3, response 2,...
that is, async behaviour. While the succeeding SendAsync is executing, we are receiving response and push the response in the queue as well. But in fact I am getting the following order:
command 1, command 2,.....command 100, response 1, response 2, ....
So my question why? For what reason? How can I achieve this?
I'm sure that the reason is not due to Channel's implementation.
What I have done to figure out:
- If I add some time delay into SendTask (at least
await Task.Delay(1)) I am getting the order that is expected, that is async behaviour. - I use Stopwatch class to measure elapsed time for calling SendAsync and it showed me 0 millisecond. It's strange output, but I don't know how to explain this behaviour.
- Take a look at the source WebSocketClient code to make sure that SendAsync should take some time (in my understanding).
Based on what you informed, the problem is:
In your ReceiveTask, there is no command to
Enterin the semaphore only toRelease. In order to use a semaphore, all tasks have to request to enter and inform the release.It works when you do
await Task.Delay(1)because it gives time toReceiveTasktoRelease.Here is an example simulating the network
SendAsyncandReceiveAsyncoperations:The Channel is the same, but initializes
SemaphoreSlimwith 1 to start in a green state (more info) andWaitAsyncwithoutDequeueoperation:Output: