send 1 million messages to queue in 1 minutes

194 Views Asked by At

I am using rabbitMQ to receive the message. I have 1,000,000 messages need to send out in 1 minute to queue. I am using multiprocessing by python.My code can send over 5 minute. Is that possible to send them in 1 minute in the single pc. Here is my code

import multiprocessing
from datetime import datetime
import pika
import time
import uuid
import sys

class PyPikaTest:

def publish(self,no_message,producer):
    c = pika.BlockingConnection(pika.ConnectionParameters(port=5672,virtual_host="test"))

    channel = c.channel()
    qname = str(uuid.uuid4())
    channel.queue_declare(queue='letterbox')

    print("start: %s" % (time.ctime(time.time())))

    for i in range(1, int(no_message)):
        sendtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-2]
        body = ('aa cccc ' + str(sendtime))
        _properties = pika.BasicProperties(
            content_type='application/json',
            content_encoding='utf-8',
            message_id=producer + "_message_no_" + str(i),
            timestamp=int(time.time())
        )

        channel.basic_publish(
            exchange='',
            routing_key='letterbox',
            properties=_properties,
            body=body
        )
    print("end: %s" % (time.ctime(time.time())))
    c.close()

def thread_publish(self, no_publisher, no_message):

    jobs = []
    for i in range(int(no_publisher)):
        process = multiprocessing.Process(target=self.publish,args=(no_message, "test_publisher_no_" + str(i)))
        jobs.append(process)
        #Start the threads (i.e. calculate the random number lists)
    for j in jobs:
        j.start()

        #Ensure all of the threads have finished
    for j in jobs:
        j.join()

    print("List processing complete.")

if __name__ == "__main__":

    print('starting .. %s')
    x = PyPikaTest()
    x.thread_publish(sys.argv[1],sys.argv[2])
1

There are 1 best solutions below

1
Robert Betts On

Have you tried one of the pika non-blocking connections? I use the AsyncioConnection. When publishing only from a single Python process and I can get 14k-17k messages sent per second, which is about 1mm messages in a minute. This in on a 2018 MacBook Pro i7, RabbitMQ installed locally with Homebrew and Python 3.11.

I created a script where i could attain sending 1mm messages in 61 seconds by chunking it up into 10 batches of 100k asynchronous requests.

    total_request_time = 0
    batch_size = 100000
    number_of_batches = 10
    for i in range(number_of_batches):
        rr_start_time = time.time()
        tasks = [asyncio.create_task(send_message(client_api)) for _ in range(batch_size)]
        _ = await asyncio.wait(tasks)
        rr_taken = time.time() - rr_start_time
        total_request_time += rr_taken
        logger.info("batch %s of %s calls took %s seconds and %s calls/s", i, batch_size, rr_taken, batch_size / rr_taken)

For full disclosure I had 4 consumers running, I find that RabbitMQ works best when sending if the queue does not get too large.

The full example code can be found here

This is my program output.

I 2023-09-25 00:01:29,536 one_million_example       main                                 17  : Client connected
I 2023-09-25 00:01:35,772 one_million_example       main                                 41  : batch 0 of 100000 calls took 6.235662937164307 seconds and 16036.787268279675 calls/s
I 2023-09-25 00:01:41,478 one_million_example       main                                 41  : batch 1 of 100000 calls took 5.705104827880859 seconds and 17528.161710771692 calls/s
I 2023-09-25 00:01:47,438 one_million_example       main                                 41  : batch 2 of 100000 calls took 5.960079193115234 seconds and 16778.30054934751 calls/s
I 2023-09-25 00:01:53,282 one_million_example       main                                 41  : batch 3 of 100000 calls took 5.84413480758667 seconds and 17111.172704329678 calls/s
I 2023-09-25 00:01:59,617 one_million_example       main                                 41  : batch 4 of 100000 calls took 6.335207939147949 seconds and 15784.801534620101 calls/s
I 2023-09-25 00:02:05,506 one_million_example       main                                 41  : batch 5 of 100000 calls took 5.888730049133301 seconds and 16981.590116313437 calls/s
I 2023-09-25 00:02:12,120 one_million_example       main                                 41  : batch 6 of 100000 calls took 6.614094972610474 seconds and 15119.226502508423 calls/s
I 2023-09-25 00:02:18,071 one_million_example       main                                 41  : batch 7 of 100000 calls took 5.950514793395996 seconds and 16805.268698933756 calls/s
I 2023-09-25 00:02:24,473 one_million_example       main                                 41  : batch 8 of 100000 calls took 6.401550054550171 seconds and 15621.21660345697 calls/s
I 2023-09-25 00:02:30,549 one_million_example       main                                 41  : batch 9 of 100000 calls took 6.075830936431885 seconds and 16458.654140683902 calls/s
I 2023-09-25 00:02:30,549 one_million_example       main                                 44  : Request only performance: 1000000 total calls in 61.010910511016846 seconds @ 16390.511002444855 calls/s