Multiple requests made Tornado's flush() method does not return anything

17 Views Asked by At

I got trouble when using tornado.web.RequestHandler.flush() with multiple requests at the same time. I'm using Python 3.10.12, tornado version 6.4, and Ubuntu 22.04.3.

In my case I want to stream the chatbot output to the client using flush(). I followed instruction using await asyncio.sleep(0) (from here and here) after every chunk to make flush can wait to make my server can handle multiple requests. However, when I make like 3 requests at the same time (run my program one by one but very fast), I see that only 2 clients get the streamed chunk result from the server, the last request only get whitespace as the result. Here's my Tornado Python code:

chatbot = Chatbot()

class GenAnswerHandler(tornado.web.RequestHandler):
    def set_default_headers(self):
        self.add_header('Access-Control-Allow-Origin', '*')
        self.add_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
        self.add_header('Access-Control-Allow-Headers', 'Content-Type')

    def options(self):
        # Handle pre-flight request
        self.set_status(204)
        self.finish()

    async def post(self):
        try:
            # Stream data to the client in chunks
            async for chunk in self.receive():
                self.write(chunk)
                await self.flush()
                await asyncio.sleep(0)
            
            self.finish()
        except Exception as e:
            self.set_status(500)
            self.write({'status': 'error', 'result': str(e)})

    async def receive(self):
        # Check the Content-Type header
        content_type = self.request.headers.get('Content-Type')

        if content_type == 'application/json':
            # If JSON data, extract input from the request data
            question: str = json_decode(self.request.body).get('question')
            chat_history: dict = json_decode(self.request.body).get('chat_history')
        elif content_type == 'application/x-www-form-urlencoded':
            # If form data, extract input from form data
            question: str = self.get_argument('question', default=None, strip=False)
            chat_history: dict = self.get_argument('chat_history', default=None, strip=False)
        else:
            # Unsupported media type
            self.set_status(415)
            self.write({'status': 'error', 'result': 'Unsupported Media Type: Use application/json or application/x-www-form-urlencoded.'})
            return

        async for res_chunk in chatbot.main_process(question, chat_history):
            yield res_chunk

if __name__ == "__main__":
    app = tornado.web.Application(
        [
            (r'/gen_answer', GenAnswerHandler),
        ],
        # debug=options.debug,
    )

    http_server = tornado.httpserver.HTTPServer(app, ssl_options={
        "certfile": "../server.crt",
        "keyfile": "../server.key",
    })

    ip = "127.0.0.1"
    port = 8090
    http_server.listen(port, address=ip)  # Change the address and port as needed
    print(f"Server listening on {ip}:{port}...")
    
    tornado.ioloop.IOLoop.current().start()

And the code for chatbot.py file:

    async def main_process(self, question: str, chat_history: dict):
        chat_hist = self.preprocess_chat_history(chat_history)
        context = await self.get_context(question)
        user_prompt = self.get_user_prompt(context, chat_hist, question)
        llama_prompt = self.get_llama_prompt(user_prompt, SYSTEM_PROMPT)

        model = "meta-llama/Llama-2-7b-chat-hf"
        async for answer_chunk in self.ask_question(model, llama_prompt):
            yield answer_chunk

self.ask_question is a Asyncgenerator method that call external API model and yield words as the answer for the user question. When I try to print using tornado.process.cpu_count(), I got output of 2 number of processor. Also when I try to run the 3rd request a bit slower, tornado work normally by streaming the chunk to the 3rd client. My questions are:

  1. What do I need to change to make my tornado web server can handle many requests at the same time without give bad response (like whitespace)?
  2. Is my scenario happened related to my number of processor, which is 2?
  3. Is there any way to limit the number of requests in and put them in queue?

Any help or suggestions will be appreciated :D

EDIT: I realized it was my fault because the model endpoint only support up to 2 connections I think. But, can someone help me with my 3rd question?

0

There are 0 best solutions below