Optimizing RabbitMQ consumers to consume in batch

2.8k Views Asked by At

I have an application where on every message consumption I need to query the MySQL database for some information and based on that process the consumed message. I would want to optimize this so as to prevent multiple queries on the database adding up to the load.

I was thinking of an approach where I wait for atleast x messages or y seconds. This way I can batch consume some messages and even if at some point I receive less messages, they get consumed as well.

Example: Let's say x = 100, y = 10 seconds

This means I wait for atleast 100 messages or 10 seconds whichever is first. This way I can query the database at once for 100 messagess in a single query. Also, if I get less than 100 messages, the remaining messages would be processed in a maximum 10 second window.

I am using NodeJS with the amqplib for consumption. I have the following code based on RabbitMQ examples:

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
    }, {noAck: true});
  });
});

I was thinking of having a global object and add to that in every consume callback and check the count of that object when it reaches the x messages those get processed. Still, not sure how to add a upper time limit of y seconds on this and also ensure that if I get less than x messages within the time window, those get processed

1

There are 1 best solutions below

1
Jim Danner On

The following code would call a function after every received message, which aggregates the received messages in an array. When it is called without a message (with argument null) or when it sees that the message count has reached x, it sends the aggregated messages to a database function. Otherwise, it simply adds the message to the array (in the second part of the if statement).

The argument null is passed to the aggregate function by a timer which fires after y seconds. This timer is first set when the message queue has just been initialized, and gets reset whenever the aggregator sends messages into the database.

var messageStore = [];
var timer;

sendToDatabase = function(messages) {...}

aggregate = function(msg) {
    if (msg == null || messageStore.push(msg) == x) {
        clearTimeout(timer);
        timer = setTimeout(aggregate, 1000*y, null);
        sendToDatabase(messageStore);
        messageStore = [];
    }
}

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    timer = setTimeout(aggregate, 1000*y, null);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
      aggregate(msg);
    }, {noAck: true});
  });
});

Note: I haven't been able to test this as I don't have a messaging system at hand.