How to sort a ConcurrentBag?

1.8k Views Asked by At

I am working on a client/server application. The server sends messages to the client, but the order cannot be guaranteed. I am using TCP... I don't want to get into why the order cannot be guaranteed (it is to do with threads on the server).

Anyway, on the client, I am processing messages like this:

private Queue<byte[]> rawMessagesIn = new Queue<byte[]>();
public ConcurrentBag<ServerToClient> messages = new ConcurrentBag<ServerToClient>();

public void Start()
{
    var processTask = Task.Factory.StartNew(() =>
    {
        while (run)
        {
            process();
        }
    });
}

void process(){
    if(rawMessagesIn.Count > 0){
        var raw_message = rawMessagesIn.Dequeue();
        var message = (ServerToClient)Utils.Deserialize(raw_message);
            messages.Add(message);
    }
}

private void OnDataReceived(object sender, byte[] data)
{
    rawMessagesIn.Enqueue(data);
}

Now, it is important that when I call messages.TryTake() or messages.TryPeek() that the message out is the next in the sequence. Every message has a number/integer representing its order. For example, message.number = 1

I need to use TryPeek because the message at index 0 might be the correct message or it might be the wrong message, in which case we remove the message from the bag. However, there is a possibility that the message is a future required message, and so it should not be removed.

I have tried using message.OrderBy(x=>x.number).ToList(); but I cannot see how it will work. If I use the OrderBy and get a sorted list SL and the item at index 0 is the correct one, I cannot simply remove or modify messages because I do not know its position in the ConcurrentBag!

Does anyone have a suggestion for me?

1

There are 1 best solutions below

0
Theodor Zoulias On

My suggestion is to switch from manually managing queues, to a TransformBlock<TInput,TOutput> from the TPL Dataflow library. This component is a combination of an input queue, and output queue, and a processor that transforms the TInput to TOutput. The EnsureOrdered functionality is built-in, and it is the default. Example:

private readonly TransformBlock<byte[], ServerToClient> _transformer;

public Client() // Constructor
{
    _transformer = new((byte[] raw_message) =>
    {
        ServerToClient message = (ServerToClient)Utils.Deserialize(raw_message);
        return message;
    }, new ExecutionDataflowBlockOptions()
    {
        EnsureOrdered = true, // Just for clarity. true is the default.
        MaxDegreeOfParallelism = 1, // the default is 1
    });
}

private void OnDataReceived(object sender, byte[] data)
{
    bool accepted = _transformer.Post(data);
    // The accepted will be false in case the _transformer has failed.
}

public bool TryReceiveAll(out IList<ServerToClient> messages)
{
    return _transformer.TryReceiveAll(out messages);
}

There are many ways to consume the ServerToClient messages that are stored in the output queue of the block. The example above demonstrates the TryReceiveAll method. There are also the TryReceive, Receive, ReceiveAsync and ReceiveAllAsync (some of them are extension methods). You can also use the lower level method OutputAvailableAsync as shown here. Linking it to another dataflow block is also an option.