(Python Kombu) Consuming and producing using the same channel (for RabbitMQ Direct Reply-to)

622 Views Asked by At

Trying to implement Direct Reply-to RabbitMQ Docs but having issues with consuming and producing using the same channel (this is a requirement for using direct reply-to).

Here's what I have tried:

Base Class:

from typing import Callable
from threading import Thread

from kombu import Connection, Exchange, Queue

from middleware.settings import ResourceSettings

settings = ResourceSettings()


class MiddlewareBrokerServiceBase(object):
    RABBITMQ_EXCHANGE_NAME = "exchange-1"

    def __init__(
        self,
        *,
        queue_name: str,
        username: str = None,
        password: str = None,
        host: str = None,
        **kwargs
    ):
        if not username:
            username = settings.rabbitmq_username

        if not password:
            password = settings.rabbitmq_password

        if not host:
            host = settings.rabbitmq_host

        self.username = username
        self.password = password
        self.host = host

        self.queue_name = queue_name
        self.exchange = self._create_exchange()
        self.queue = self._create_queue()
        self.connection = self._create_connection()

    def _create_exchange(self):
        return Exchange(self.RABBITMQ_EXCHANGE_NAME, 'topic', durable=True)

    def _create_queue(self):
        return Queue(self.queue_name, exchange=self.exchange, routing_key=self.queue_name)

    def _create_connection(self):
        return Connection(f'amqp://{self.username}:{self.password}@{self.host}/{settings.rabbitmq_vhost}')

    @classmethod
    def _start_rabbitmq_thread(cls, target: Callable):
        rmq_thread = Thread(target=target)
        rmq_thread.start()

Producer:

from typing import Callable

from kombu import Queue

from middleware.daemon.rabbitmq_service import MiddlewareBrokerServiceBase


class MiddlewareBrokerProducer(MiddlewareBrokerServiceBase):
    def __init__(self, *, on_reply: Callable = None, **kwargs):
        self.on_reply = on_reply

        super().__init__(**kwargs)

        self.channel = self.connection.channel()
        self.reply_queue = None

        if on_reply:
            self.reply_queue = self._get_reply_queue()
            self._start_rabbitmq_thread(self._reply_consumer_thread)

    def _get_reply_queue(self):
        return Queue(name='amq.rabbitmq.reply-to', exchange='', routing_key='amq.rabbitmq.reply-to', exclusive=True, auto_delete=True, channel=self.channel)\


    def _get_publish_base_args(self):
        args = {'exchange': self.exchange,
                'routing_key': self.queue.routing_key,
                'declare': [self.queue]}

        if self.on_reply:
            args['reply_to'] = 'amq.rabbitmq.reply-to'

        return args

    def _on_reply(self, a):
        print(f'Got message {a}')

        if self.on_reply:
            self.on_reply(a)

    def _reply_consumer_thread(self):
        print('Starting fast-reply consumer..')

        with self.channel.Consumer(queues=[self.reply_queue], no_ack=True, on_message=self._on_reply) as consumer:
            consumer.consume(no_ack=True)

            while True:
                try:
                    self.connection.drain_events(timeout=1)
                except TimeoutError:
                    continue

    def publish_message(self, message: str):
        publish_args = self._get_publish_base_args()

        producer = self.channel.Producer(serializer='json')
        producer.publish(message, **publish_args)

Running the producer with MiddlewareBrokerProducer().publish_message('New alert') raises a timeout error at the line producer.publish(message, **publish_args) of the publish_message method.

With some troubleshooting I have noticed that this works as expected if the _reply_consumer_thread method of the producer is changed as follows:

    def _reply_consumer_thread(self):
        print('Starting fast-reply consumer..')

        with self.channel.Consumer(queues=[self.reply_queue], no_ack=True, on_message=self._on_reply) as consumer:
            while True:
                consumer.consume(no_ack=True)

But this causes 100% CPU utilization which could be fixed by adding a time.sleep(1) but I doubt this is the right way to go about fixing this issue.

Any help appreciated.

0

There are 0 best solutions below