Mass Transit - Competing Consumer Pattern

26 Views Asked by At

I have azure service bus topic "TOPIC-1" and has one subscription "SUBSCRIPTION-1". I want two mass transit consumers which listens to same subscription "SUBSCRIPTION-1". Basically I want achieve competing consumer pattern on same subscription.

Based on below code both consumer receive the same messages. Surprisingly I see single subscription on azure portal.

I have two consumers

    public static IHostBuilder CreateHostBuilder(string[] args)
    {
        return Host
               .CreateDefaultBuilder(args)
               .ConfigureServices((hostContext, services) =>
               {
                   services.AddMassTransit(config =>
                   {
                       config.AddConsumer<GettingStartedEventConsumer>();
                       config.AddConsumer<GettingStartedEvent2Consumer>();

                       config.UsingAzureServiceBus((context, cfg) =>
                       {
                           cfg.Host("Endpoint=sb://masstansit-ak.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=BZSoHtHoOZJ/4mueVeNdX4l6k2qMdElhq+ASbLrnEWE=");

                           cfg.SubscriptionEndpoint<GettingStartedEvent>("GettingStartedEvent", c =>
                           {
                               c.ConfigureConsumer<GettingStartedEventConsumer>(context);
                               c.ConfigureConsumer<GettingStartedEvent2Consumer>(context);
                           });

                       });
                   });
               });
    }

Consumers

    public class GettingStartedEventConsumer : IConsumer<GettingStartedEvent>
    {
        private readonly ILogger<GettingStartedEventConsumer> _logger;
        private int count = 1;

        public GettingStartedEventConsumer(ILogger<GettingStartedEventConsumer> logger)
        {
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<GettingStartedEvent> context)
        {
            //if (count == 6)
            //{
            //    throw new Exception("Count exceeded");
            //}

            _logger.LogInformation("CONSUMER 1 --------------  Received Text: {Text}", context.Message.Value);

            count++;

            //await context.ConsumeCompleted;

            await Task.CompletedTask;
        }
    }
 public class GettingStartedEvent2Consumer : IConsumer<GettingStartedEvent>
 {
     private readonly ILogger<GettingStartedEventConsumer> _logger;
     private int count = 1;

     public GettingStartedEvent2Consumer(ILogger<GettingStartedEventConsumer> logger)
     {
         _logger = logger;
     }

     public async Task Consume(ConsumeContext<GettingStartedEvent> context)
     {
         //if (count == 6)
         //{
         //    throw new Exception("Count exceeded");
         //}

         _logger.LogInformation("CONSUMER 2 --------------  Received Text: {Text}", context.Message.Value);

         count++;

         //await context.ConsumeCompleted;

         await Task.CompletedTask;
     }
0

There are 0 best solutions below