I'm attempting to receive messages from multiple Kafka hosts. We're utilizing Kafka riders for message consumption. While I can successfully consume messages from the first bus, but I couldnt able to consume the message from second bus and also no errors reported. Assistance with this issue would be greatly appreciated
Running Kafka locally using docker image . Implemented two buses to consume message from two hosts localhost:9094 and localhost:9095.
Expectation: should be able to consume message from both hosts
Actual: am able to consume from only localhost:9094
namespace KafkaConsumer;
using MassTransit;
using MassTransit.Kafka.Consumer.Consumers;
using MassTransit.Kafka.Contracts;
using Microsoft.Extensions.DependencyInjection;
// using MassTransit.Kafka.Consumer.Middlewares;
public class Program
{
public static async Task Main()
{
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
const string topicName = "test_topic";
const string consumerGroup = "consumer-group-medium";
const string kafkaBrokerServers = "localhost:9094";
x.UsingInMemory((context, cfg) =>
{
// cfg.UseExceptionLogger();
cfg.ConfigureEndpoints(context);
});
x.AddRider(rider =>
{
rider.AddConsumer<KafkaMessageConsumer>();
rider.UsingKafka((context, k) =>
{
k.Host(kafkaBrokerServers);
k.TopicEndpoint<IMessage>(topicName, consumerGroup, e =>
{
e.ConfigureConsumer<KafkaMessageConsumer>(context);
e.CreateIfMissing();
});
});
});
});
services.AddMassTransit<ISecondBus>(x =>
{
const string topicName1 = "test_topic";
const string consumerGroup1 = "consumer-group-medium";
const string kafkaBrokerServers1 = "localhost:9095";
x.UsingInMemory((context, cfg) =>
{
// cfg.UseExceptionLogger();
cfg.ConfigureEndpoints(context);
});
x.AddConsumer<KafkaMessageConsumer1>();
x.AddRider(rider =>
{
rider.AddConsumer<KafkaMessageConsumer1>();
rider.UsingKafka((context, k) =>
{
k.Host(kafkaBrokerServers1);
k.TopicEndpoint<IMessage>(topicName1, consumerGroup1, e =>
{
e.ConfigureConsumer<KafkaMessageConsumer1>(context);
e.CreateIfMissing();
});
});
});
});
services.AddScoped<KafkaMessageConsumer>();
services.AddScoped<KafkaMessageConsumer1>();
var provider = services.BuildServiceProvider();
var busControl = provider.GetRequiredService<IBusControl>();
await busControl.StartAsync(new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token);
Console.WriteLine("Started...");
Console.ReadKey();
}
}