I got trouble when using tornado.web.RequestHandler.flush() with multiple requests at the same time. I'm using Python 3.10.12, tornado version 6.4, and Ubuntu 22.04.3.
In my case I want to stream the chatbot output to the client using flush(). I followed instruction using await asyncio.sleep(0) (from here and here) after every chunk to make flush can wait to make my server can handle multiple requests. However, when I make like 3 requests at the same time (run my program one by one but very fast), I see that only 2 clients get the streamed chunk result from the server, the last request only get whitespace as the result. Here's my Tornado Python code:
chatbot = Chatbot()
class GenAnswerHandler(tornado.web.RequestHandler):
def set_default_headers(self):
self.add_header('Access-Control-Allow-Origin', '*')
self.add_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
self.add_header('Access-Control-Allow-Headers', 'Content-Type')
def options(self):
# Handle pre-flight request
self.set_status(204)
self.finish()
async def post(self):
try:
# Stream data to the client in chunks
async for chunk in self.receive():
self.write(chunk)
await self.flush()
await asyncio.sleep(0)
self.finish()
except Exception as e:
self.set_status(500)
self.write({'status': 'error', 'result': str(e)})
async def receive(self):
# Check the Content-Type header
content_type = self.request.headers.get('Content-Type')
if content_type == 'application/json':
# If JSON data, extract input from the request data
question: str = json_decode(self.request.body).get('question')
chat_history: dict = json_decode(self.request.body).get('chat_history')
elif content_type == 'application/x-www-form-urlencoded':
# If form data, extract input from form data
question: str = self.get_argument('question', default=None, strip=False)
chat_history: dict = self.get_argument('chat_history', default=None, strip=False)
else:
# Unsupported media type
self.set_status(415)
self.write({'status': 'error', 'result': 'Unsupported Media Type: Use application/json or application/x-www-form-urlencoded.'})
return
async for res_chunk in chatbot.main_process(question, chat_history):
yield res_chunk
if __name__ == "__main__":
app = tornado.web.Application(
[
(r'/gen_answer', GenAnswerHandler),
],
# debug=options.debug,
)
http_server = tornado.httpserver.HTTPServer(app, ssl_options={
"certfile": "../server.crt",
"keyfile": "../server.key",
})
ip = "127.0.0.1"
port = 8090
http_server.listen(port, address=ip) # Change the address and port as needed
print(f"Server listening on {ip}:{port}...")
tornado.ioloop.IOLoop.current().start()
And the code for chatbot.py file:
async def main_process(self, question: str, chat_history: dict):
chat_hist = self.preprocess_chat_history(chat_history)
context = await self.get_context(question)
user_prompt = self.get_user_prompt(context, chat_hist, question)
llama_prompt = self.get_llama_prompt(user_prompt, SYSTEM_PROMPT)
model = "meta-llama/Llama-2-7b-chat-hf"
async for answer_chunk in self.ask_question(model, llama_prompt):
yield answer_chunk
self.ask_question is a Asyncgenerator method that call external API model and yield words as the answer for the user question. When I try to print using tornado.process.cpu_count(), I got output of 2 number of processor. Also when I try to run the 3rd request a bit slower, tornado work normally by streaming the chunk to the 3rd client. My questions are:
- What do I need to change to make my tornado web server can handle many requests at the same time without give bad response (like whitespace)?
- Is my scenario happened related to my number of processor, which is 2?
- Is there any way to limit the number of requests in and put them in queue?
Any help or suggestions will be appreciated :D
EDIT: I realized it was my fault because the model endpoint only support up to 2 connections I think. But, can someone help me with my 3rd question?