I'm scratching my head with something here. I need to enable an app to have a single consumer for a queue. So my first gut reaction was to do this:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
return factory;
}
As I sent some messages to the queue for testing, I did notice that only one listener was active at a time, but I noticed this in the logs too:
2024-02-29T16:50:54.338-05:00 DEBUG 49772 --- [pool-2-thread-9] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for consumerTag: 'amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q' with deliveryTag: '38' in Consumer@25de8898: tags=[[amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@ad0bb4e Shared Rabbit Connection: SimpleConnection@19705650 [delegate=amqp://[email protected]:5672/, localPort=65348], acknowledgeMode=MANUAL local queue size=30
2024-02-29T16:50:54.466-05:00 DEBUG 49772 --- [ool-2-thread-10] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for consumerTag: 'amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q' with deliveryTag: '39' in Consumer@25de8898: tags=[[amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@ad0bb4e Shared Rabbit Connection: SimpleConnection@19705650 [delegate=amqp://[email protected]:5672/, localPort=65348], acknowledgeMode=MANUAL local queue size=31
And while looking at the Rabbit console I also noticed that messages were in unack state, but not ready.
So I guess the BlockingQueueConsumer does remove them from the queue, and controls concurrency internally. Is there a way to actually force just one message being pulled at a time using Annotations? Or do I have to switch to a manual poll to achieve that?
Thanks folks
For that purpose you must not use
factory.setMaxConcurrentConsumers(1);. Leave it as-is. There is the logic in theAsyncMessageProcessingConsumer.mainLoop():So, we really do try to start a new
BlockingQueueConsumerif that property is notnull.You can find much more info in JavaDocs of the
concurrencyprops in theSimpleMessageListenerContainer.There is also this one which might be some kind of interest for your use-case:
There is also some info in docs: https://docs.spring.io/spring-amqp/reference/amqp/listener-concurrency.html
You might also be interested in this option to be set to
1: