New to processing messages using Azure Service bus. Any help would be greatly appreciated.
Our Azure Service Bus queues messages using an API and the received messages are processed using 4 different VMs using a "Task Engine windows service". The connection is opened individually for each VM but the tenant Id, client Id and namespace is same for all 4 of them. We are not using partitioned threads. In all lower environments the code runs fine but in PROD (about 7 million messages queued and processed in a day), the VM suddenly stops processing messages and just hangs. The Site Reliability team is not finding any CPU or Memory issues with the VM. Every time this issue happens, the "Task Engine windows service" in VM has to be restarted so that the service bus message processing can resume.
Here is what the metrics looks like in Azure Service Bus.

Configuration
AutoCompleteMessages = false
PrefetchCount = 50
ReceiveMode = PeekLock
MaxConcurrentCalls = 5
MaxAutoLockRenewalDuration = 15 min (900 seconds)
MessageLockDuration = 5 min
Code:
The following code initializes Azure Connection when Task Engine is started for the first time and runs until the engine is killed/ restarted.
private static void initializeAzureServiceBusClientConnection()
{
if (_isAzureEnabled)
{
var _tenantId = GetFromConfig.AppSettings["tenant-id"];
var _clientId = GetFromConfig.AppSettings["client-id"];
var _clientSecret = GetFromConfig.AppSettings["client-secret"];
var _servicebusNamespace = GetFromConfig.AppSettings["servicebus-namespace"];
var _token = new ClientSecretCredential(_tenantId, _clientId, _clientSecret);
var clientOptions = new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpWebSockets,
};
ServiceBusClient = new ServiceBusClient(_servicebusNamespace, _token, clientOptions);
}
}
The following code processes messages. (I haven't included code that queues messages. There are no issues there)
using Azure.Messaging.ServiceBus;
namespace TaskEngine
{
internal sealed class AzureManager
{
private Thread _threadQueue;
//Called once when Task Engine starts
internal void Startup()
{
if (TaskEngineManager.IsAzureBusinessEventsEnabled)
{
Logger.Log(_isRunning == false, "The AzureManager startup method was called multiple times without a stop.");
_isRunning = true;
_messageQueueName = System.Configuration.ConfigurationManager.AppSettings["servicebus-queue"];
_invalidMessageQueueName = System.Configuration.ConfigurationManager.AppSettings["servicebus-invalid-queue"];
getAzureServiceBusAccess();
// Start up background thread to manage queue
_threadQueue = new Thread(queueLoop); //We are running in Thread because there is another process that runs in parallel
_threadQueue.Name = "Azure Business Event Queue Thread";
_threadQueue.Start();
}
else
{
Tracer.RaiseInfo(TraceSourceNames.AzureManager, "Azure Service Bus is disabled.");
}
}
private void queueLoop()
{
bool isConnectionReset;
do
{
isConnectionReset = false; //Reset in each loop
isConnectionReset = AsyncGatekeeper.Run(() => processUsingAzureServiceBusMQAsync());
} while (_isWorking && isConnectionReset);
}
private async System.Threading.Tasks.Task<bool> processUsingAzureServiceBusMQAsync()
{
if (_isWorking)
{
try
{
// add handler to process messages
_serviceBusReceiverProcessor.ProcessMessageAsync += MessageHandlerAsync;
// add handler to process any errors
_serviceBusReceiverProcessor.ProcessErrorAsync += ErrorHandlerAsync;
await _serviceBusReceiverProcessor.StartProcessingAsync();
}
catch (Exception ex)
{
Tracer.RaiseError(TraceSourceNames.AzureManager, "Azure Service Bus unhandled exception occurred. Attempting to reset connection.", ex);
try
{
bool isTaskCompleted = await closeAndDisposeConnectionAsync();
if (isTaskCompleted)
{
tryResetConnections(ex);
cooldownReceiveRequests();
}
}
catch (Exception)
{
Tracer.RaiseError(TraceSourceNames.AzureManager, "Azure Service Bus unhandled exception occurred. Reset connection failed.", ex);
}
return true; //reset retry loop
}
}
return false;
}
async System.Threading.Tasks.Task MessageHandlerAsync(ProcessMessageEventArgs args)
{
var message = args.Message;
try
{
// start processing
Tracer.RaiseInfo(TraceSourceNames.AzureManager, $"Azure Service Bus Event Queue thread received message {message.MessageId}.");
var _messageBody = message.Body;
bool isSuccess = processMessage(message.MessageId, _messageBody.ToStream());
if (!isSuccess)
{
await WriteToInvalidQueueAsync(_messageBody);
}
args.MessageLockLostAsync += OnMessageLockLostAsync;
// complete the message. message is deleted from the queue.
try
{
await args.CompleteMessageAsync(message);
}
catch (Exception e)
{
Tracer.RaiseWarning(TraceSourceNames.AzureManager, "Azure service bus message lock already released when completing message. Continuing processing messages.", e);
}
Tracer.RaiseInfo(TraceSourceNames.AzureManager, $"Azure Service Bus Event Queue thread completed processing message {message.MessageId}.");
}
catch (Exception ex)
{
Tracer.RaiseError(TraceSourceNames.AzureManager, "Unexpected error occured moving task from Azure Service Bus to database; attempting to re-queue message.", ex);
try
{
await args.AbandonMessageAsync(args.Message);
}
catch (Exception e)
{
Tracer.RaiseWarning(TraceSourceNames.AzureManager, "Azure service bus message lock already released when abandoning message.", e);
}
}
}
System.Threading.Tasks.Task ErrorHandlerAsync(ProcessErrorEventArgs args)
{
var errMessage = $"Azure Service Bus error handler caught exception. Message processing will retry. Error Source: {args.ErrorSource}; EntityPath: {args.EntityPath}; FullyQualifiedNamespace: {args.FullyQualifiedNamespace}";
Tracer.RaiseError(TraceSourceNames.AzureManager, errMessage, args.Exception);
return System.Threading.Tasks.Task.CompletedTask;
}
System.Threading.Tasks.Task OnMessageLockLostAsync(MessageLockLostEventArgs args)
{
string errMessage = $"Azure Service Bus Message lock Lost handler caught exception. Message: {args.Message}. Message Locked Until: {args.Message.LockedUntil.ToLocalTime()}";
var messageInfo = convertMessage(args.Message.Body.ToStream());
if (messageInfo != null)
{
errMessage = $"Azure Service Bus Message Lock Lost handler caught exception. Message processing will retry. Message Info: {messageInfo}. Message Locked Until: {args.Message.LockedUntil.ToLocalTime()}";
}
Tracer.RaiseError(TraceSourceNames.AzureManager, errMessage, args.Exception);
return System.Threading.Tasks.Task.CompletedTask;
}
//Called when Task Engine is started for the first time and any time when connection is lost and needs to be reset
private static void getAzureServiceBusAccess()
{
Logger.Log(!(string.IsNullOrEmpty(_messageQueueName)
|| string.IsNullOrEmpty(_invalidMessageQueueName))
, "Incomplete Azure Service Bus configuration settings. Unable to proceed.");
Tracer.RaiseInfo(TraceSourceNames.AzureManager, $"Attempting to connect to Azure Service Bus.");
_serviceBusSender = TaskEngineManager.ServiceBusClient.CreateSender(_invalidMessageQueueName, new ServiceBusSenderOptions());
var processorOptions = new ServiceBusProcessorOptions()
{
MaxConcurrentCalls = 5,
PrefetchCount = 50,
MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(900), //Default 15 min. This lock renewal duration should be greater than LockDuration which is set to 5 min.
AutoCompleteMessages = false,
};
_serviceBusReceiverProcessor = TaskEngineManager.ServiceBusClient.CreateProcessor(_messageQueueName, processorOptions);
}
private void tryResetConnections(Exception exception)
{
Tracer.RaiseInfo(TraceSourceNames.AzureManager, "Resetting Azure Service Bus connection and retrying.");
try
{
if (DateTime.Now.Subtract(LastQueueReset).TotalSeconds > 1800)
{
LastQueueReset = DateTime.Now;
getAzureServiceBusAccess();
Tracer.RaiseInfo(TraceSourceNames.AzureManager, "Azure Service Bus Connection succesfully reset.");
}
else
{
NotificationRegulator.Notify(NotificationReason.AzureServiceBusFailure, exception, Resources.AzureServiceBus_ConnectError);
}
}
catch (Exception ex)
{
Tracer.RaiseError(TraceSourceNames.AzureManager, "Resetting Azure Service Bus connection failed.", ex);
throw;
}
}
private static async System.Threading.Tasks.Task<bool> closeAndDisposeConnectionAsync()
{
try
{
await _serviceBusReceiverProcessor.StopProcessingAsync();
await _serviceBusReceiverProcessor.DisposeAsync();
}
catch (Exception ex)
{
//Do not throw and eat exception - Receiver may have been already disposed
Tracer.RaiseWarning(TraceSourceNames.AzureManager, "Error disposing Azure Service Bus Receiver Connection.", ex);
}
try
{
await _serviceBusSender.DisposeAsync();
}
catch (Exception ex)
{
//Do not throw and eat exception - Receiver may have been already disposed
Tracer.RaiseWarning(TraceSourceNames.AzureManager, "Error disposing Azure Service Bus Sender Connection.", ex);
}
return true;
}
}
}
There are lot of logging added to investigate the issue but the only error we are seeing is the LockLost exception occurring here and there. The connection is not lost, the connection does not get disposed or closed automatically unless manually restarted.
Please help!
We played around the configuration settings to see if that helps. There is no change. Every day we are restarting the windows service that processes messages at least thrice a day.