How to publish messages to RabbitMQ by using Multi threading?

33 Views Asked by At

I would appreciate help on parallelism with RabbitMQ.

Given a large json file, let's say it has 1 million items, I want to go through it in parallel so that each item is passed to Rabbit using multithreading.

I read quite a few blogs and it seems that there is a problem with using one connection with one channel with multithreading and if I understood correctly it was recommended to add channels, in practice I didn't really understand how this was implemented in the code, I tried all kinds of things and nothing worked.

I would like to know if my desire is possible, or if I have to parse the file iteratively one after the other, and if it is possible if I can get help with the code?

Here I define the connection and the publish to rabbit function :

class RabbitMQClient:
    def __init__(
        self,
        queues_connection_config: Dict[str, Any],
        queue_name: str,
    ) -> None:
        self.__logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
        self.__queue_name = queue_name
        self.__exchange = queues_connection_config.get("exchange")

        self._connection = Connection(
            queues_connection_config.get("hostname"),
            queues_connection_config.get("username"),
            queues_connection_config.get("password"),
            queues_connection_config.get("port"),
            virtual_host=queues_connection_config.get("virtual_host"),
        )
        self.channel=self._connection.channel()
        self.channel.confirm_deliveries()
    
    def enqueue(self, queues_item: Dict[str, Any]) -> str:
        message = Message.create(self.channel, json.dumps(queues_item))
        message.publish(routing_key=self.__queue_name, exchange=self.__exchange, mandatory=True)
        self.__logger.debug(f"Enqueued {message.correlation_id}")
        return str(message.correlation_id)

Here I try to divide the task into multi threading without success:

for i in range(checkpoint, 10 + 1):
                    batch_data = [next(items) for _ in range(1000)]
                    try:
                        start_time = time.time()
                        futures = [
                            self.executor.submit(self.__queue_client.enqueue, item)
                            for item in batch_data
                        ]
                        # Wait for all threads to complete
                        concurrent.futures.wait(futures)
0

There are 0 best solutions below