How do I implement the Request/Response pattern for MassTransit using RabbitMQ Service Bus Plus Filters?

57 Views Asked by At

I am implementing the request/response pattern for MassTransit using RabbitMQ using an IBusControl object and only 1 Queue.

We will have 1 Request originator app (currently a Unit Test) running and multiple consumer (console) apps running. Each consumer app will ONLY be responding to messages with a given MyId (command-line argument).

I am assuming that I should accomplish this by adding a filter to the ReceiveEndpoint definition. I am using the latest MassTransit (8.1.3.0).

I tried creating a ConsumeContext filter (MyFilter1) in the code below (e.UseFilter(new MyFilter1(myId));) This filter was never hit. Now I am trying to create a ConsumerConsumeContext filter (MyFilter). I believe that this is the type of filter that I want.

All of the code samples I find seem to be using an older version of MassTransit, and they are able to call UseConsumeFilter passing in the new filter object.
like so... e.UseConsumeFilter(new MyFilter1(myId));

e.UseConsumeFilter now takes in a type and a registration context, not a filter object. I can add a static variable to Program class and give it a MyId.
But what do I send in for a registration context?
Or is there a better way to filter out messages that don't apply?

Also, if I get the filter to work, will MassTransit take this request and send it to the next consumer or will it be lost?

Here is the code. I have changed the class names and removed all try/catch blocks and some functions for readability.

Consumer App

static async Task<int> Main(string[] args)
{
    Console.WriteLine("Consumer started ...");
    if (args.Length == 0)
    {
        Console.WriteLine("You have given no command-line arguments.  Expected <myId> ...");
        return -1;
    }
    
    string myId = args[0];
    string rabbitHost = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];
    string rabbitUserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_UserName"];
    string rabbitPassword= System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Password"];
    string rabbitQueue = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_QueueName"];
    Console.WriteLine($"Creating Bus for id {myId}...RabbitHost = {rabbitHost}... Queue Name = {rabbitQueue}");
    IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(x =>
    {
   
        x.Host(new Uri(rabbitHost), h =>
        {
            h.Username(rabbitUserName);
            h.Password(rabbitPassword);
        });
                        
        x.ReceiveEndpoint(rabbitQueue,
            e =>{
                e.Consumer<MyConsumer>(  );
            
                !!!!INSERT LINE HERE!!!!
                //the following line never hits the filter before consumer
                //gets called
                //e.UseFilter(new MyFilter1(myId));
                //e.UseConsumeFilter(typeof (MyFilter),null);
            }
        );
    });
    Console.WriteLine($"Finished Creating Bus..MyId={myId}");

}

public class MyFilter: 
    IFilter<ConsumerConsumeContext<MyConsumer,IMyRequest>>
    
{
    private string MyId { get; set; }
    public MyFilter(string myId) 
    {
        MyId = myId;
    }

    public async Task Send(ConsumerConsumeContext<MyConsumer, IMyRequest> context, IPipe<ConsumerConsumeContext<MyConsumer, IMyRequest>> next)
    {
        if (context.Message.MyId == MyId)
        {
            await next.Send(context);
        }
    }

    public void Probe(ProbeContext context)
    {
        var scope = context.CreateFilterScope("MyId");
        scope.Add("MyId", MyId);
    }
}

public class MyFilter1 :
    IFilter<ConsumeContext<IMyRequest>> 
{
    private string MyId { get; set; }
    public MyFilter1(string myId)
    {
        MyId = myId;
    }

    async Task IFilter<ConsumeContext<IMyRequest>>.Send(ConsumeContext<IMyRequest> context, IPipe<ConsumeContext<IMyRequest>> next)
    {
        if (context.Message.MyId == MyId )
        {
            await next.Send(context);
        }
    }

    void IProbeSite.Probe(ProbeContext context)
    {
        
    }
}


public class MyConsumer : IConsumer<MyRequest>
{

    public async Task Consume(ConsumeContext<MyRequest> context)
    {
        DateTime receivedTime = DateTime.Now;
        Console.WriteLine($"Received Request for {context.Message.MyId}");
        . . . do stuff . . .
        
        context.Respond(
            new MyResponse
            {
                MyId = context.Message.MyId,
                ResponseStatus = ResponseStatus.Passed
            });
        Console.WriteLine($"My Response sent for {context.Message.MyId}... other stuff");       
    }
}

Request Originator App

const string RABBIT_USER_NAME = "username";
const string RABBIT_PASSWORD = "password";
const string RABBIT_SERVER = "rabbitmq://<server>";
const string RABBIT_SERVICE = "rabbitmq://<server>/<queueName>";
var bus = return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri(RABBIT_SERVER), h =>
    {
        h.Username(RABBIT_USER_NAME);
        h.Password(RABBIT_PASSWORD);
    }));
TaskUtil.Await(() => bus.StartAsync());
var serviceAddress = new Uri(RABBIT_SERVICE);
IRequestClient<LightupRequest> client =
    busControl.CreateRequestClient<MyRequest>(serviceAddress, TimeSpan.FromSeconds(10));

MyRequest request = new MyRequest();
foreach (var myId in Ids)
{
    request.MyId = myId;
    request.sid = 123456;
    request.tid = 123457;
    request.Data = "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
    request.Token = "111111111111111111111111111111";
    request.Version = "10.02.003.0004";
    Task.Run(async () =>
    {
        request.StartTime = DateTime.Now;
        var response = await client.GetResponse<MyResponse>(request);
        //do something with response  
    }).Wait();  
}

Request/Response Types

public interface IMyRequest
{
    string MyId { get; set; }
}
public class MyRequest : IMyRequest
{
    public string MyId { get; set; }
    public string Token { get; set; }
    public string Data { get; set; }
    public int someId { get; set; }
    public int someId2 { get; set; }
    public string Version { get; set; }
    public DateTime StartTime { get; set; }
}

public class MyResponse
{
    public enum ResponseStatus { passed, failed, timeout, networkError }
    public string MyId { get; set; }
    public string ErrorData { get; set; }
}

I tried creating a ConsumeContext filter (MyFilter1) in the code below (e.UseFilter(new MyFilter1(myId));)

I was hoping that this filter would be hit BEFORE the consumer so that it could prevent this consumer from consuming the message if the ID isn't the ID that I am expecting.

I put a breakpoint on MyFilter1.Send and it never hit.

Furthermore, when I run 4 instances of my subscriber app, I can see that MassTransit distributes the message to only 1 of the 4 executables, and that the next message will be on a different executable.

Changes After Response

I changed the Queue Name to include myId, and I added a bind

Consumer App

static async Task<int> Main(string[] args)
{
    Console.WriteLine("Consumer started ...");
    if (args.Length == 0)
    {
        Console.WriteLine("You have given no command-line arguments.  Expected <myId> ...");
        return -1;
    }
    
    string myId = args[0];
    string rabbitHost = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"];
    string rabbitUserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_UserName"];
    string rabbitPassword= System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Password"];
    string rabbitQueue = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_QueueName"]+ $".{myId}";
    Console.WriteLine($"Creating Bus for id {myId}...RabbitHost = {rabbitHost}... Queue Name = {rabbitQueue}");
    IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(x =>
    {
   
        x.Host(new Uri(rabbitHost), h =>
        {
            h.Username(rabbitUserName);
            h.Password(rabbitPassword);
        });
                        
        x.ReceiveEndpoint(rabbitQueue,
            e =>{
                e.Consumer<MyConsumer>(  );
                e.Bind<MyConsumer>();
            }
        );
    });
    Console.WriteLine($"Finished Creating Bus..MyId={myId}");

}

public class MyFilter: 
    IFilter<ConsumerConsumeContext<MyConsumer,IMyRequest>>
    
{
    private string MyId { get; set; }
    public MyFilter(string myId) 
    {
        MyId = myId;
    }

    public async Task Send(ConsumerConsumeContext<MyConsumer, IMyRequest> context, IPipe<ConsumerConsumeContext<MyConsumer, IMyRequest>> next)
    {
        if (context.Message.MyId == MyId)
        {
            await next.Send(context);
        }
    }

    public void Probe(ProbeContext context)
    {
        var scope = context.CreateFilterScope("MyId");
        scope.Add("MyId", MyId);
    }
}

public class MyFilter1 :
    IFilter<ConsumeContext<IMyRequest>> 
{
    private string MyId { get; set; }
    public MyFilter1(string myId)
    {
        MyId = myId;
    }

    async Task IFilter<ConsumeContext<IMyRequest>>.Send(ConsumeContext<IMyRequest> context, IPipe<ConsumeContext<IMyRequest>> next)
    {
        if (context.Message.MyId == MyId )
        {
            await next.Send(context);
        }
    }

    void IProbeSite.Probe(ProbeContext context)
    {
        
    }
}


public class MyConsumer : IConsumer<MyRequest>
{

    public async Task Consume(ConsumeContext<MyRequest> context)
    {
        DateTime receivedTime = DateTime.Now;
        Console.WriteLine($"Received Request for {context.Message.MyId}");
        . . . do stuff . . .
        
        context.Respond(
            new MyResponse
            {
                MyId = context.Message.MyId,
                ResponseStatus = ResponseStatus.Passed
            });
        Console.WriteLine($"My Response sent for {context.Message.MyId}... other stuff");       
    }
}

Request Originator App

const string RABBIT_USER_NAME = "username";
const string RABBIT_PASSWORD = "password";
const string RABBIT_SERVER = "rabbitmq://<server>";
const string RABBIT_SERVICE = "rabbitmq://<server>/<queueName>";
var bus = return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri(RABBIT_SERVER), h =>
    {
        h.Username(RABBIT_USER_NAME);
        h.Password(RABBIT_PASSWORD);
    }));
TaskUtil.Await(() => bus.StartAsync());

MyRequest request = new MyRequest();
foreach (var myId in Ids)
{
    var serviceAddress = new Uri($"{RABBIT_SERVICE}.{myId}");
    IRequestClient<LightupRequest> client =
        busControl.CreateRequestClient<MyRequest>(serviceAddress, TimeSpan.FromSeconds(10));


    request.MyId = myId;
    request.sid = 123456;
    request.tid = 123457;
    request.Data = "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
    request.Token = "111111111111111111111111111111";
    request.Version = "10.02.003.0004";
    Task.Run(async () =>
    {
        request.StartTime = DateTime.Now;
        var response = await client.GetResponse<MyResponse>(request);
        //do something with response  
    }).Wait();  
}

Request/Response Types (these didn't change)

public interface IMyRequest
{
    string MyId { get; set; }
}
public class MyRequest : IMyRequest
{
    public string MyId { get; set; }
    public string Token { get; set; }
    public string Data { get; set; }
    public int someId { get; set; }
    public int someId2 { get; set; }
    public string Version { get; set; }
    public DateTime StartTime { get; set; }
}

public class MyResponse
{
    public enum ResponseStatus { passed, failed, timeout, networkError }
    public string MyId { get; set; }
    public string ErrorData { get; set; }
}

Observations

When the consumer App Runs, the following are created on RabbitMQ

  • 1 Connection
  • 2 channels
  • Exchange . binds from Exchange :MyConsumer (no routing key) Exchange :IMyRequest (no routing key) binds to Queue .
  • Exchange :MyConsumer binds to Exchange . (no routing key)
  • Exchange :IMyRequest binds to Exchange . (no routing key)
  • Queue . binds from Exchange . (no routing key)

When I run a 2nd instance of a consumer app with a new myId, here's what gets added

  • 1 connection

  • 2 channels

  • Exchange .<2nd Id> binds from Exchange :MyConsumer (no routing key) Exchange :IMyRequest (no routing key) binds to Queue .<2nd Id>

  • Queue .<2nd Id> binds from Exchange .<2nd Id> (no routing key)

  • Exchange :MyConsumer gets a binding added to Exchange .<2nd Id>

  • Exchange :IMyRequest gets a binding added to Exchange .<2nd Id>

Final Question

I really like the easy usage of this pattern, and it is an easy way to ensure that your requests don't go into a black hole. However, there is 1 queue and 1 exchange created per id. And I am seeing 2 channels and 1 connection each time the consumer app is ran. Is this the best way to ensure that our responses don't get dropped?

1

There are 1 best solutions below

0
Christine On

After Chris Patterson answered, Here is the final code

Consumer App

This is a console App.

static async Task<int> Main(string[] args)
{
    Console.WriteLine("Consumer started ...");
    if (args.Length == 0)
    {
        Console.WriteLine("You have given no command-line arguments.  Expected <myId> ...");
        return -1;
    }
    
    string myId = args[0];
    string rabbitHost = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Host"]; //rabbitmq://<URL>:<Port>
    string rabbitUserName = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_UserName"];
    string rabbitPassword= System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_Password"];
    string rabbitQueue = System.Configuration.ConfigurationManager.AppSettings["RabbitMQ_QueueName"]+ $".{myId}";
    Console.WriteLine($"Creating Bus for id {myId}...RabbitHost = {rabbitHost}... Queue Name = {rabbitQueue}");

    //setup a bus to the RabbitMQ server
    IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(x =>
    {
        //This is how you give it the url, user name, and password
        x.Host(new Uri(rabbitHost), h =>
        {
            h.Username(rabbitUserName);
            h.Password(rabbitPassword);
        });

        //Tell MassTransit to be looking for requests sent to 
        //queu rabbitQueue.              
        x.ReceiveEndpoint(rabbitQueue,
            e =>{
                //MyConsumer is the name of the consumer class
                //This class will be waiting to consume requests
                //sent to the queue
                e.Consumer<MyConsumer>(  );
                //if you don't have the following line,
                //MassTransit will create an extra exchange and queue per consumer.
                e.Bind<MyConsumer>();
            }
        );
    });
    Console.WriteLine($"Finished Creating Bus..MyId={myId}");
    TaskUtil.Await(() => busControl.StartAsync());
                

}


public class MyConsumer : IConsumer<MyRequest>
{
    //when a request gets sent from the originator app to the 
    // queue, MassTransit will call this function
    public async Task Consume(ConsumeContext<MyRequest> context)
    {
        DateTime receivedTime = DateTime.Now;
        Console.WriteLine($"Received Request for {context.Message.MyId}");
        . . . do stuff . . .
        
        //until this point in time, the request has still ben on 
        //the queue.  The state is unacked. Once we call respond,
        //the request will be acknowledged and will be removed 
        //from the queueu.
        context.Respond(
            new MyResponse
            {
                MyId = context.Message.MyId,
                ResponseStatus = ResponseStatus.Passed
            });
        Console.WriteLine($"My Response sent for {context.Message.MyId}... other stuff");       
    }
}

Request Originator App In my case, this app is a C# Unit Test, but in your case, it could be anything from a console app, to a unit test, to a website.

const string RABBIT_USER_NAME = "username";
const string RABBIT_PASSWORD = "password";
const string RABBIT_SERVER = "rabbitmq://<server>";
const string RABBIT_SERVICE = "rabbitmq://<server>/<queueName>";

var bus = return Bus.Factory.CreateUsingRabbitMq(x => x.Host(new Uri(RABBIT_SERVER), h =>
    {
        h.Username(RABBIT_USER_NAME);
        h.Password(RABBIT_PASSWORD);
    }));
//Start the bus
TaskUtil.Await(() => bus.StartAsync());

MyRequest request = new MyRequest();
foreach (var myId in Ids)
{
    var serviceAddress = new Uri($"{RABBIT_SERVICE}.{myId}");
    //connect to RabbitMQ through MassTransit.  The requests will timeout if it doesn't get a response in 10 seconds
    IRequestClient<LightupRequest> client =
        busControl.CreateRequestClient<MyRequest>(serviceAddress, TimeSpan.FromSeconds(10));


    request.MyId = myId;
    request.sid = 123456;
    request.tid = 123457;
    request.Data = "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111";
    request.Token = "111111111111111111111111111111";
    request.Version = "10.02.003.0004";
    Task.Run(async () =>
    {
        request.StartTime = DateTime.Now;
        //The request is going to be sent to the Queue
        //Then the Consumer App will consume the request.
        //When the Consumer App is finished doing it's work, it will respond
        //At this point, the original request will be acknowledged, 
        //removed from the queue and then the response will be 
        //sent back here.
        //If there's a communication issue, a timeout exception 
        //will be thrown.  I don't have a try/catch block here because
        //I wanted to simplify the code.
        var response = await client.GetResponse<MyResponse>(request);
        //do something with response  
    }).Wait();  
}

Request/Response Types (these didn't change)

public interface IMyRequest
{
    string MyId { get; set; }
}
public class MyRequest : IMyRequest
{
    public string MyId { get; set; }
    public string Token { get; set; }
    public string Data { get; set; }
    public int someId { get; set; }
    public int someId2 { get; set; }
    public string Version { get; set; }
    public DateTime StartTime { get; set; }
}

public class MyResponse
{
    public enum ResponseStatus { passed, failed, timeout, networkError }
    public string MyId { get; set; }
    public string ErrorData { get; set; }
}