I have following process, that I thought would be easy to implement with the AzureServiceBus and sessions: I have a subscription that creates +5 messages out of one message and sends all those messages via batch to another subscription that enables sessions. Therefore I set the same session id for those +5 messages. Sending the messages is not a problem. But the next step is: I thought I could create a processor, that would get me all messages with the same session id. Each message calls a service that could possibly fail. If one call fails, all the other messages that were completed in that session before would have to be rollbacked. But I guess that's not how sessions work in the AzureServiceBus.
I started with following code (that's inside the method StartAsync in a processor class that implements the IHostedService):
[...]
sessionProcessor = serviceBusClient.CreateSessionProcessor(
processingOptions.CurrentValue.ServiceBusOptions.ServiceBusTopic,
processorOptions.Subscription,
options);
// Configure the message and error handler to use
sessionProcessor.ProcessMessageAsync += MessageHandler;
sessionProcessor.ProcessErrorAsync += ErrorHandler;
sessionProcessor.SessionInitializingAsync += SessionInitializingHandler;
sessionProcessor.SessionClosingAsync += SessionClosingHandler;
await sessionProcessor.StartProcessingAsync();
[...]
Problem is that the MessageHandler only processes one message and completes it right away. That's not what I need.
So I tried that here:
var receiver = await serviceBusClient.AcceptNextSessionAsync(processingOptions.CurrentValue.ServiceBusOptions.ServiceBusTopic, processorOptions.Subscription);
var messages = await receiver.ReceiveMessagesAsync(100);
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
foreach (var message in messages)
{
await ProcessSessionRequest(message);
await receiver.CompleteMessageAsync(message);
await receiver.SetSessionStateAsync(new BinaryData(SessionState.SessionInProcess));
}
ts.Complete();
}
But here I have no error handling like the session processor would have. Which means: If one message fails and throws an exception, my processor completely shuts down instead of just trying again or moving the messages into the dead-letter queue and moving on to another session.
Does anyone have any idea how to implement that correctly? The only other idea I have is writing my own processor with the necessary events. But maybe I'm just missing something.
Thanks to @Sean Feldman. Yes, the approach which you outlined will not handle the scenario whereas the maximum number of messages per session is not received in a single operation. Additionally, ensuring atomic processing of messages within a session, especially in the case of failures, requires more consideration.
You’ll need to loop until you’ve received all messages within the session. Keep fetching messages until you’ve accumulated the expected count or until no more messages are available.
Send messages to the Azure Service Bus topic, ensuring that messages with related content are grouped under the same session ID.
When a message fails (e.g., due to an exception), you’ll need to roll back all previously completed messages within the same session. This is where it gets tricky. Move the failed message to a dead-letter queue (DLQ) for further analysis. However, this doesn’t automatically affect other completed messages.
Received messages: