In a bigger flask application, some routes stream its response. But the underlying service implementation is an async generator, so it needs to be converted to a synchronous one. Like so.
from typing import AsyncGenerator, Generator
import flask
from api.routes.utils import async_generator_to_sync
import json
import asyncio
app = flask.Flask(__name__)
def async_generator_to_sync(generator: AsyncGenerator) -> Generator:
try:
while True:
yield asyncio.run(anext(generator), debug=True) # type: ignore
except StopAsyncIteration:
return
async def streaming_service() -> AsyncGenerator[dict, dict]:
for i in range(3):
yield {"task": f"task {i}"}
yield {"done": True}
@app.route("/works")
def this_yields_all_updates_and_response():
service = async_generator_to_sync(streaming_service())
for s in service:
yield json.dumps(s) + "\n"
return flask.Response(service)
if __name__ == "__main__":
app.run(debug=True)
And this works. But in the bigger picture, It would be really useful to be able to structure it like this, so I can do the conversion to a normal generator in a custom route decorator.
@app.route("/doesnotwork")
def only_yields_once():
async def streaming_service_wrapper():
async for i in streaming_service():
yield json.dumps(i) + "\n"
service = async_generator_to_sync(streaming_service_wrapper())
return flask.Response(service)
But this does not work, it only streams the first update. To me these implementations should behave exactly the same. This is my cURL output
(base) pepijnvanderklei@pepijns-mbp Kayoc % curl http://127.0.0.1:5000/works
{"task": "task 0"}
{"task": "task 1"}
{"task": "task 2"}
{"done": true}
(base) pepijnvanderklei@pepijns-mbp Kayoc % curl http://127.0.0.1:5000/doesnotwork
{"task": "task 0"}
Help me out! :)