Consume Message from Multiple kafka hosts using multi bus kafka mass transit rider

35 Views Asked by At

I'm attempting to receive messages from multiple Kafka hosts. We're utilizing Kafka riders for message consumption. While I can successfully consume messages from the first bus, but I couldnt able to consume the message from second bus and also no errors reported. Assistance with this issue would be greatly appreciated

Running Kafka locally using docker image . Implemented two buses to consume message from two hosts localhost:9094 and localhost:9095.

Expectation: should be able to consume message from both hosts

Actual: am able to consume from only localhost:9094

namespace KafkaConsumer;
using MassTransit;
using MassTransit.Kafka.Consumer.Consumers;
using MassTransit.Kafka.Contracts;
using Microsoft.Extensions.DependencyInjection;
// using MassTransit.Kafka.Consumer.Middlewares;
public class Program
{
    public static async Task Main()
    {
        var services = new ServiceCollection();
        services.AddMassTransit(x =>
        {
            const string topicName = "test_topic";
            const string consumerGroup = "consumer-group-medium";
            const string kafkaBrokerServers = "localhost:9094";
            x.UsingInMemory((context, cfg) =>
            {
                // cfg.UseExceptionLogger();

                cfg.ConfigureEndpoints(context);
            });

            x.AddRider(rider =>
            {
                rider.AddConsumer<KafkaMessageConsumer>();
                rider.UsingKafka((context, k) =>
                {
                    k.Host(kafkaBrokerServers);

                    k.TopicEndpoint<IMessage>(topicName, consumerGroup, e =>
                    {
                        e.ConfigureConsumer<KafkaMessageConsumer>(context);
                        e.CreateIfMissing();
                    });
                });
            });
        });
        services.AddMassTransit<ISecondBus>(x =>
        {
            const string topicName1 = "test_topic";
            const string consumerGroup1 = "consumer-group-medium";
            const string kafkaBrokerServers1 = "localhost:9095";
            x.UsingInMemory((context, cfg) =>
            {
                // cfg.UseExceptionLogger();
                cfg.ConfigureEndpoints(context);
            });
            x.AddConsumer<KafkaMessageConsumer1>();

            x.AddRider(rider =>
            {
                rider.AddConsumer<KafkaMessageConsumer1>();
                rider.UsingKafka((context, k) =>
                {
                    k.Host(kafkaBrokerServers1);
                    k.TopicEndpoint<IMessage>(topicName1, consumerGroup1, e =>
                    {
                        e.ConfigureConsumer<KafkaMessageConsumer1>(context);
                        e.CreateIfMissing();
                    });
                });
            });
        });
        services.AddScoped<KafkaMessageConsumer>();
        services.AddScoped<KafkaMessageConsumer1>();
        var provider = services.BuildServiceProvider();
        var busControl = provider.GetRequiredService<IBusControl>();
        await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token);
        Console.WriteLine("Started...");
        Console.ReadKey();
    }

} 
0

There are 0 best solutions below