I have an application where on every message consumption I need to query the MySQL database for some information and based on that process the consumed message. I would want to optimize this so as to prevent multiple queries on the database adding up to the load.
I was thinking of an approach where I wait for atleast x messages or y seconds. This way I can batch consume some messages and even if at some point I receive less messages, they get consumed as well.
Example: Let's say x = 100, y = 10 seconds
This means I wait for atleast 100 messages or 10 seconds whichever is first. This way I can query the database at once for 100 messagess in a single query. Also, if I get less than 100 messages, the remaining messages would be processed in a maximum 10 second window.
I am using NodeJS with the amqplib for consumption. I have the following code based on RabbitMQ examples:
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'hello';
ch.assertQueue(q, {durable: false});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function(msg) {
console.log(" [x] Received %s", msg.content.toString());
}, {noAck: true});
});
});
I was thinking of having a global object and add to that in every consume callback and check the count of that object when it reaches the x messages those get processed. Still, not sure how to add a upper time limit of y seconds on this and also ensure that if I get less than x messages within the time window, those get processed
The following code would call a function after every received message, which aggregates the received messages in an array. When it is called without a message (with argument
null) or when it sees that the message count has reachedx, it sends the aggregated messages to a database function. Otherwise, it simply adds the message to the array (in the second part of theifstatement).The argument
nullis passed to the aggregate function by a timer which fires afteryseconds. This timer is first set when the message queue has just been initialized, and gets reset whenever the aggregator sends messages into the database.Note: I haven't been able to test this as I don't have a messaging system at hand.