Handling long running Tasks in Pika/RabbitMQ Threaded Consumers for enabled client heartbeat

101 Views Asked by At

I am running threaded consumers (~20 to 30 threads - configurable) on my server for concurrent message processing. My consumers are daemonized so that I can use a CLI like utility to start and stop the consumer service. I also need to send back the response that I received from my message handler back to my producer. Hence I implemented Remote procedure call (RPC) as explained here on rabbitMQ-doc. I have kept heartbeats enabled only on the consumer side and thus avoiding any unprecedented idle connection break by the OS kernel of my server.

The setup was working as expected where consumers after processing the messages(concurrently thanks to the threads) were sending the outputs/responses back to the producers without any issue until we encountered the famous "long-running-processes-exceeding-heartbeat-timeout-interval" scenarios (Just to set the context: in some cases processes can even run for couple of days before sending the response back to the producer).

I tried to implement the solution provided here in another similar stackOverflow thread using two approaches: one using threading itself , the other one using asyncio (asynchronous function calls).

In both the cases, my producer is not able to receive responses back from consumer after any message processing (both small and long duration), and broker eventually closes the connection and re-queues the message and another consumer thread receives the message only to fall under the same trap. Eventually all my consumer(threaded consumer) connections are now closed and my producers keeps waiting for the response to come back. I'm not sure what I'm doing wrong here.

The threaded consumer implementation for the 1st approach typically looks like this.

def mq_connection_factory(queue_name):
    '''global method to create and return mq connection and channel'''

    mq_host, mq_port, mq_usr, mq_passd = conf.get('some-config-file')
    credentials = pika.PlainCredentials(mq_usr, mq_passd)
    parameters = pika.ConnectionParameters(host=mq_host,port=mq_port,            credentials=credentials, heartbeat = 600)
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(queue='queue_name,durable=True)
    return connection, channel

class ThreadedConsumer(threading.Thread):
    def __init__(self,queue_name):
        threading.Thread.__init__(self)
       
        self.threads = conf.get('daemon','max_parallel_thread')
        self.queue_name = queue_name
        self.channel = None
        self.connection = None

    def app_process(self,json_conf,choice):
        '''The main serever function based on incoming message'''
        #long running process
        #calls different modules to process the message and create multiline string output
        #to be sent back to producer
        return "with great power comes great responsibilities"


    def app_process_response(self, ch, delivery_tag, props, json_body, choice, connection):
        json_conf = json_body
        thread_id = threading.get_ident()
        log.info(f'Thread id: {thread_id} Delivery tag: {delivery_tag}')
        try:
            response = self.app_process(json_conf,choice)
        except Exception as e:
            response = f'app process failed with the error : {e}'
        log.info("App process response captured.")
        cb = functools.partial(self.ack_app_response, ch, delivery_tag, props, response)
        connection.add_callback_threadsafe(cb)

        

    def ack_app_response(self, ch, delivery_tag, props, response):
        log.info(" [x] Done")
        if ch.is_open:
            log.info('Channel is open. Going ahead with writing acknowledgement back to channel.')
            ch.basic_publish(exchange='',
                            routing_key=props.reply_to,
                            properties=pika.BasicProperties(correlation_id = \
                                                                props.correlation_id),
                            body=str(response))

            ch.basic_ack(delivery_tag=delivery_tag)
        else:
            log.error('''The Channel between Client APP and server APP is already closed.
                                            Acknowledgement can't be sent back to Client APP.''')
            raise Exception('''The Channel between Client APP and server APP is already closed.
                                            Acknowledgement can't be sent back to Client APP.''')
        
        log.info(f'Response was sent to client APP.')

        


    def on_request(self,ch, method, props, body, args):
        json_body = json.loads(body)
        log.info(" [x] Received %r" % json_body)

        choice = "make someone proud"

        (connection, threads) = args
        delivery_tag = method.delivery_tag

        t = threading.Thread(target = self.app_process_response, args= (ch, delivery_tag, props, json_body, choice, connection))
        t.start()

        threads.append(t) 


    def run(self):
       
        try:
            log.info('Connecting to Rabbit MQ Host.')

            self.connection, self.channel = mq_connection_factory(self.queue_name)
            log.info('Connection to rabbit MQ Host was successful.')
            self.channel.basic_qos(prefetch_count=self.threads*10) # i have tried 1 as well

            threads = []
            on_message_callback = functools.partial(self.on_request, args = (self.connection, threads))
            self.channel.basic_consume(self.queue_name, on_message_callback=on_message_callback)

            log.info('starting thread to listen to client request...')
            self.channel.start_consuming()

        except Exception as e:
            raise Exception('Failed to establish connetion to mq. Please check with your rabbit MQ broker.') from e

        
        
        #doubtful about this as my daemon is pid file based and keeps running until i run `stop daemon``
        for thread in threads:
            thread.join()
     

def initialize_thread(queue_name):
    '''Intialise threaded consumer for app to receive request and send them back to server core app'''

    threads = conf.get('daemon','max_parallel_thread')

    for i in range(threads):
        log.info(f'launching thread : {i}')
        td = ThreadedConsumer(queue_name=queue_name)
        td.start()

From my daemon process the function initialize_thread is called. As I understand I'm already creating let's say 20 threads. in each thread I'm creating a mq connection and then trying to run 2 process by splitting them further into 2 threads. But I'm ending up with no response back from consumer to the broker/producer. Any suggestion here is really appreciated.

0

There are 0 best solutions below