Messages ordering when receiving batch of messages from Service Bus

79 Views Asked by At

I'm receiving messages in batch from Azure Service Bus:

IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await serviceBusReceiver.ReceiveMessagesAsync(maxMessages: maxMessages, maxWaitTime: TimeSpan.FromSeconds(1), cancellationToken: cancellationToken); 

It works fine but I am unsure if it also consider the ordering (FIFO). I know what we can achieve this behaviour using Sessions. But I want to ask if I need to use sessions when receiving batch of messages?

I've a single receiver that receives and process messages in batches. But I want to ensure ordering as I am receiving devices info e.g last seen etc and I need to make sure that I've updated the correct info.

Also if Sessions is the only way then should I just set the SessionId to DeviceId and that will make sure that the all the messages linked to a single device get processed in order?

Update

I've done a quick test and I can see that message are received in sequence. I've added batch of 150 messages to the topic:

for (int i = 1; i <= numOfMessages; i++)
{
    // try adding a message to the batch
    var msg = new ServiceBusMessage($"Message {i}");
    msg.ApplicationProperties.Add("Type", 1);
    if (!messageBatch.TryAddMessage(msg))
    {
        // if it is too large for the batch
        throw new Exception($"The message {i} is too large to fit in the batch.");
    }
}

try
{
    
    await sender.SendMessagesAsync(messageBatch);
    Console.WriteLine($"A batch of {numOfMessages} messages has been published.");
    
}
finally
{
    // Calling DisposeAsync on client types is required to ensure that network
    // resources and other unmanaged objects are properly cleaned up.
    await sender.DisposeAsync();
    await client.DisposeAsync();
}

After this I added a receiver method that receives and logs the message to the console:

private async Task ProcessMessages(int maxMessagesToProcess, CancellationToken cancellationToken)
{
    
     while (!cancellationToken.IsCancellationRequested)
     {
         var receivedMessageList = new List<ServiceBusReceivedMessage>();
         IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await _serviceBusReceiver.ReceiveMessagesAsync(maxMessages: maxMessagesToProcess, maxWaitTime: TimeSpan.FromSeconds(1), cancellationToken: cancellationToken);
         if (receivedMessages.Count > 0)
         {
             receivedMessageList.AddRange(receivedMessages);
             foreach (var msg in receivedMessageList)
             {
                 Debug.WriteLine($"Message: {msg.Body}");
             }
             if (receivedMessageList.Count >= ConcurrentMessagesToProcess)
             {
                 //await ProcessReceivedMessages(receivedMessageList);

                 var processAgain = await CheckForMessageProcessing();
                 if (!processAgain)
                 {
                     break;
                 }
             }
         }
         else
         {
             break;
         }
     }
 }

After multiple tests, I get the messages in sequence. I'm unsure if I've missed any scenario or is this expected.

1

There are 1 best solutions below

4
Sampath On

According to documentation on Azure Service Bus message sessions, achieving a first-in, first-out (FIFO) pattern involves using sessions to ensure that messages are processed in the order they were received.

  • When submitting messages into a queue or topic, ensure that each message is assigned a session ID. This session ID acts as an application-defined identifier unique to the session.

  • On session-aware queues or subscriptions, sessions are initiated when there's at least one message with a session ID. Messages with the same session ID are grouped together, forming a session.

  • When multiple messages are sent with session IDs, Service Bus ensures that messages within each session are processed in the order they were received. This maintains the FIFO guarantee, as messages within the same session are processed sequentially.

  • To receive bulk messages, use Batching. Batching delivers a set number of messages to the function as a single batch, allowing for processing multiple messages in one invocation.

  • The below sample is to receive and process messages from an Azure Service Bus queue using the Azure.Messaging.ServiceBus library, the session state is retrieved using GetSessionStateAsync().

References Used:


string connectionString = "<connection_string>";
        string queueName = "<queue_name>";
        int batchSize = 10; // Number of messages to receive in each batch
        int prefetchCount = 100; // Number of messages to prefetch

        await using var client = new ServiceBusClient(connectionString);
        var receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
        {
            PrefetchCount = prefetchCount
        });

        var sessionReceiver = await client.AcceptNextSessionAsync(queueName);

        while (true)
        {
            var messages = await sessionReceiver.ReceiveMessagesAsync(batchSize);

            if (messages == null || messages.Count == 0)
                break;

            foreach (var message in messages)
            {
                // Process the received message
                string messageBody = Encoding.UTF8.GetString(message.Body);
                Console.WriteLine($"Message received: {messageBody} ,SequenceNumber={message.SequenceNumber}, SessionId= {message.SessionId}");

                // Simulate processing of the message
                await Task.Delay(TimeSpan.FromSeconds(10));

                // Complete the message
                await sessionReceiver.CompleteMessageAsync(message);
            }
        }

        // After processing all messages, retrieve session state
        BinaryData state = await sessionReceiver.GetSessionStateAsync();
      

        await sessionReceiver.CloseAsync();

Output: enter image description here

enter image description here