How to enable multiple consumers for Apache.NMS.AMQP clients?

210 Views Asked by At

Would like to know how to enable multiple consumers for Apache.NMS.AMQP client. Have tried with multiple sessions for same queue with different consumer - but the listener is only getting called for one consumer per queue. Below is the sample code. Ignore the connection per queue as I thought that might be the cause, but doesn't work. Given consumer one name -ConsumerName to identify consumer being called.

var queueClientLogger = loggerFactory.CreateLogger<QueueClient>();
var queueClient1 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient1.InitializeAsync();

var queueClient2 = new QueueClient(queueClientLogger, "Q/test1");
await queueClient2.InitializeAsync();

var queueClient3 = new QueueClient(queueClientLogger, "Q/test2");
await queueClient3.InitializeAsync();

-----------------------------------------------
internal class QueueClient : IDisposable
{
  private readonly ILogger<QueueClient> logger;
  private IMessageConsumer consumer;
  private bool disposedValue;

  #region constructor

  public QueueClient(ILogger<QueueClient> logger, string queueName)
  {
      this.logger = logger;
      QueueName = queueName;
      ConsumerName = $"{QueueName}-{Guid.NewGuid()}";
  }

  #endregion

  #region Properties

  internal string? QueueName { get; private set; }
  internal string ConsumerName { get; private set; }
  internal Apache.NMS.ISession Session { get; private set; }
  internal Apache.NMS.IConnection Connection { get; private set; }


  #endregion

  #region Methods

  internal async Task InitializeAsync()
  {
      string brokerUri = $"amqp://localhost:5672";  // Default port
      NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
      Connection = await factory.CreateConnectionAsync();
      await Connection.StartAsync();
      Session = await Connection.CreateSessionAsync(AcknowledgementMode.AutoAcknowledge);
      Apache.NMS.IDestination dest = await Session.GetQueueAsync(QueueName);
      consumer = await Session.CreateConsumerAsync(dest);
      consumer.Listener += Consumer_Listener;
  }

  private void Consumer_Listener(Apache.NMS.IMessage message)
  {
      logger.LogInformation($"{ConsumerName}: Message from queue - {QueueName}");            
      Thread.Sleep(1000);
      string content = string.Empty;
      if (message is ITextMessage)
      {
          ITextMessage? txtMsg = message as ITextMessage;
          content = txtMsg?.Text ?? "";
      }
      else if (message is IBytesMessage)
      {
          IBytesMessage? bytesMsg = message as IBytesMessage;
          if (bytesMsg == null)
          {
              content = $"NULL message received";
          }
          else
          {
              content = Encoding.UTF8.GetString(bytesMsg.Content);
          }
      }
      else
      {
          content = "Unexpected message type: " + message.GetType().Name;
      }
      logger.LogInformation($"{content}");
  }
  
  //Ignore IDosposable code
}
0

There are 0 best solutions below