Read x messages from a service bus queue with .NET Core 3.1

66 Views Asked by At

I have the code below for reading x messages from a service bus queue but it feels a bit clunky, especially when it comes to detecting if there is no more data on the queue

Has anyone been able to get this work in a nicer way?

static async Task<List<string>> ReceiveMessagesAsync(int messageCount)    
{
    var messsages = new List<string>();
    queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
    try
    {
        var messages = new List<Message>();
        var options = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            MaxConcurrentCalls = 1
        };

        var startingAt = DateTime.Now;
        DateTime? latestMessageReadAt = null;
    
        queueClient.RegisterMessageHandler((message, cancellationToken) =>
        {
            var json = Encoding.UTF8.GetString(message.Body);
            messages.Add(json);

            latestMessageReadAt = DateTime.Now;
            return Task.CompletedTask;
        }, options);

        var allMessagesRead = false;
    
        // Wait for the desired number of messages to be received
        while (messages.Count < messageCount && allMessagesRead == false)
        {
            await Task.Delay(10); // Adjust the delay based on your requirements
        
            if (latestMessageReadAt != null)
            {
                allMessagesRead = DateTime.Now.Subtract(latestMessageReadAt.Value).Duration().Seconds > 10;
            }
            else
            {
                allMessagesRead = DateTime.Now.Subtract(startingAt).Duration().Seconds > 10;
            }
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    } 
    finally
    {
        await queueClient.CloseAsync();
    }

    return messages;
}

static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    Console.WriteLine($"Message handler encountered an exception: {exceptionReceivedEventArgs.Exception}");
    var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
    Console.WriteLine($"Exception context for troubleshooting:");
    Console.WriteLine($"- Endpoint: {context.Endpoint}");
    Console.WriteLine($"- Entity Path: {context.EntityPath}");
    Console.WriteLine($"- Executing Action: {context.Action}");
    return Task.CompletedTask;
}

Paul

1

There are 1 best solutions below

0
SiddheshDesai On

I agree and Thank you Panagiotis Kanavas, .Net 3.1 is out of support and utilizing .Net 6.0 With Service Bus SDK now has Long Term Support.

In order to detect the number of messages received and to close the Receiver, Refer the code below. This code works properly in .Net 3.1 as well as .Net 6.0 console app:-

using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Management;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

class Program
{
    const string ServiceBusConnectionString = "Endpoint=sb://xxxsb65.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxbOm80x4=";
    const string QueueName = "myqueue";

    static async Task Main(string[] args)
    {
        await ReceiveAndProcessMessagesAsync(2); // Replace '2' with the number of messages you want to receive
    }

    static async Task ReceiveAndProcessMessagesAsync(int messageCount)
    {
        var messages = new List<string>();
        var queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
        var managementClient = new ManagementClient(ServiceBusConnectionString);

        try
        {
            queueClient.RegisterMessageHandler(async (message, cancellationToken) =>
            {
                var json = Encoding.UTF8.GetString(message.Body);
                messages.Add(json);

                Console.WriteLine($"Received message: {json}");

                if (messages.Count >= messageCount)
                {
                    await queueClient.CloseAsync();
                }
            },
            new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                MaxConcurrentCalls = 1,
                AutoComplete = false
            });

            while (true)
            {
                // Add a short delay before re-checking the message count
                await Task.Delay(1000); // Adjust the delay based on your requirements

                // Check if the desired number of messages have been received
                if (messages.Count >= messageCount)
                {
                    Console.WriteLine("Received the required number of messages.");
                    break;
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
        finally
        {
            await queueClient.CloseAsync();
        }
    }

    static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception: {exceptionReceivedEventArgs.Exception}");
        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        Console.WriteLine($"Exception context for troubleshooting:");
        Console.WriteLine($"- Endpoint: {context.Endpoint}");
        Console.WriteLine($"- Entity Path: {context.EntityPath}");
        Console.WriteLine($"- Executing Action: {context.Action}");
        return Task.CompletedTask;
    }
}

Output:-

I have added await ReceiveAndProcessMessagesAsync(2); to receive only 2 messages from Service Bus.

enter image description here

As per the comment by Panagiotis Kanavas, You can implement batching by using the code below:-

    static async Task ReceiveAndProcessMessagesAsync(int messageCount)
    {
        var messages = new List<string>();
        await using var client = new ServiceBusClient(ServiceBusConnectionString);
        var receiver = client.CreateReceiver(QueueName);

        try
        {
            while (true)
            {
                // Receive a batch of messages (maximum of 'messageCount' in this case)
                IReadOnlyList<ServiceBusReceivedMessage> receivedMessages = await receiver.ReceiveMessagesAsync(messageCount);

                if (receivedMessages.Count == 0)
                {
                    Console.WriteLine("No more messages on the queue.");
                    break;
                }

                foreach (ServiceBusReceivedMessage receivedMessage in receivedMessages)
                {
                    string body = Encoding.UTF8.GetString(receivedMessage.Body);
                    messages.Add(body);
                    Console.WriteLine($"Received message: {body}");

                    // Complete the message if needed
                    // await receiver.CompleteMessageAsync(receivedMessage);

                    if (messages.Count >= messageCount)
                    {
                        break; // Exit the loop if the desired message count is reached
                    }
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
        finally
        {
            await receiver.CloseAsync();
        }
    }
}