I'm receiving messages in batch from Azure Service Bus:
IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await serviceBusReceiver.ReceiveMessagesAsync(maxMessages: maxMessages, maxWaitTime: TimeSpan.FromSeconds(1), cancellationToken: cancellationToken);
It works fine but I am unsure if it also consider the ordering (FIFO). I know what we can achieve this behaviour using Sessions. But I want to ask if I need to use sessions when receiving batch of messages?
I've a single receiver that receives and process messages in batches. But I want to ensure ordering as I am receiving devices info e.g last seen etc and I need to make sure that I've updated the correct info.
Also if Sessions is the only way then should I just set the SessionId to DeviceId and that will make sure that the all the messages linked to a single device get processed in order?
Update
I've done a quick test and I can see that message are received in sequence. I've added batch of 150 messages to the topic:
for (int i = 1; i <= numOfMessages; i++)
{
// try adding a message to the batch
var msg = new ServiceBusMessage($"Message {i}");
msg.ApplicationProperties.Add("Type", 1);
if (!messageBatch.TryAddMessage(msg))
{
// if it is too large for the batch
throw new Exception($"The message {i} is too large to fit in the batch.");
}
}
try
{
await sender.SendMessagesAsync(messageBatch);
Console.WriteLine($"A batch of {numOfMessages} messages has been published.");
}
finally
{
// Calling DisposeAsync on client types is required to ensure that network
// resources and other unmanaged objects are properly cleaned up.
await sender.DisposeAsync();
await client.DisposeAsync();
}
After this I added a receiver method that receives and logs the message to the console:
private async Task ProcessMessages(int maxMessagesToProcess, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var receivedMessageList = new List<ServiceBusReceivedMessage>();
IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await _serviceBusReceiver.ReceiveMessagesAsync(maxMessages: maxMessagesToProcess, maxWaitTime: TimeSpan.FromSeconds(1), cancellationToken: cancellationToken);
if (receivedMessages.Count > 0)
{
receivedMessageList.AddRange(receivedMessages);
foreach (var msg in receivedMessageList)
{
Debug.WriteLine($"Message: {msg.Body}");
}
if (receivedMessageList.Count >= ConcurrentMessagesToProcess)
{
//await ProcessReceivedMessages(receivedMessageList);
var processAgain = await CheckForMessageProcessing();
if (!processAgain)
{
break;
}
}
}
else
{
break;
}
}
}
After multiple tests, I get the messages in sequence. I'm unsure if I've missed any scenario or is this expected.
According to documentation on Azure Service Bus message sessions, achieving a first-in, first-out (FIFO) pattern involves using sessions to ensure that messages are processed in the order they were received.
When submitting messages into a queue or topic, ensure that each message is assigned a session ID. This session ID acts as an application-defined identifier unique to the session.
On session-aware queues or subscriptions, sessions are initiated when there's at least one message with a session ID. Messages with the same session ID are grouped together, forming a session.
When multiple messages are sent with session IDs, Service Bus ensures that messages within each session are processed in the order they were received. This maintains the FIFO guarantee, as messages within the same session are processed sequentially.
To receive bulk messages, use Batching. Batching delivers a set number of messages to the function as a single batch, allowing for processing multiple messages in one invocation.
The below sample is to receive and process messages from an Azure Service Bus queue using the
Azure.Messaging.ServiceBuslibrary, the session state is retrieved usingGetSessionStateAsync().References Used:
Output: