Trying to implement Direct Reply-to RabbitMQ Docs but having issues with consuming and producing using the same channel (this is a requirement for using direct reply-to).
Here's what I have tried:
Base Class:
from typing import Callable
from threading import Thread
from kombu import Connection, Exchange, Queue
from middleware.settings import ResourceSettings
settings = ResourceSettings()
class MiddlewareBrokerServiceBase(object):
RABBITMQ_EXCHANGE_NAME = "exchange-1"
def __init__(
self,
*,
queue_name: str,
username: str = None,
password: str = None,
host: str = None,
**kwargs
):
if not username:
username = settings.rabbitmq_username
if not password:
password = settings.rabbitmq_password
if not host:
host = settings.rabbitmq_host
self.username = username
self.password = password
self.host = host
self.queue_name = queue_name
self.exchange = self._create_exchange()
self.queue = self._create_queue()
self.connection = self._create_connection()
def _create_exchange(self):
return Exchange(self.RABBITMQ_EXCHANGE_NAME, 'topic', durable=True)
def _create_queue(self):
return Queue(self.queue_name, exchange=self.exchange, routing_key=self.queue_name)
def _create_connection(self):
return Connection(f'amqp://{self.username}:{self.password}@{self.host}/{settings.rabbitmq_vhost}')
@classmethod
def _start_rabbitmq_thread(cls, target: Callable):
rmq_thread = Thread(target=target)
rmq_thread.start()
Producer:
from typing import Callable
from kombu import Queue
from middleware.daemon.rabbitmq_service import MiddlewareBrokerServiceBase
class MiddlewareBrokerProducer(MiddlewareBrokerServiceBase):
def __init__(self, *, on_reply: Callable = None, **kwargs):
self.on_reply = on_reply
super().__init__(**kwargs)
self.channel = self.connection.channel()
self.reply_queue = None
if on_reply:
self.reply_queue = self._get_reply_queue()
self._start_rabbitmq_thread(self._reply_consumer_thread)
def _get_reply_queue(self):
return Queue(name='amq.rabbitmq.reply-to', exchange='', routing_key='amq.rabbitmq.reply-to', exclusive=True, auto_delete=True, channel=self.channel)\
def _get_publish_base_args(self):
args = {'exchange': self.exchange,
'routing_key': self.queue.routing_key,
'declare': [self.queue]}
if self.on_reply:
args['reply_to'] = 'amq.rabbitmq.reply-to'
return args
def _on_reply(self, a):
print(f'Got message {a}')
if self.on_reply:
self.on_reply(a)
def _reply_consumer_thread(self):
print('Starting fast-reply consumer..')
with self.channel.Consumer(queues=[self.reply_queue], no_ack=True, on_message=self._on_reply) as consumer:
consumer.consume(no_ack=True)
while True:
try:
self.connection.drain_events(timeout=1)
except TimeoutError:
continue
def publish_message(self, message: str):
publish_args = self._get_publish_base_args()
producer = self.channel.Producer(serializer='json')
producer.publish(message, **publish_args)
Running the producer with MiddlewareBrokerProducer().publish_message('New alert') raises a timeout error at the line producer.publish(message, **publish_args) of the publish_message method.
With some troubleshooting I have noticed that this works as expected if the _reply_consumer_thread method of the producer is changed as follows:
def _reply_consumer_thread(self):
print('Starting fast-reply consumer..')
with self.channel.Consumer(queues=[self.reply_queue], no_ack=True, on_message=self._on_reply) as consumer:
while True:
consumer.consume(no_ack=True)
But this causes 100% CPU utilization which could be fixed by adding a time.sleep(1) but I doubt this is the right way to go about fixing this issue.
Any help appreciated.