Python - wait for a websocket response in a sync method

38 Views Asked by At

i'm facing a problem with Asyncio, web sockets and sync calls.

We have an application which uses websockets and Flask.

Websockets are managed with asyncio, we receive messages on

async def on_message(message):
   ** some logic
   await doStuff(message)

The problem is that our workflow is that we have an endpoint with Flask that needs to perform some action that needs to send a request to the websocket server, wait for the ws response and send the sync response to the controller.

Something like that

@app.route("/request", methods=["POST"])
def manageRequest():
data = request.get_json()

## send data to ws
ws.send(data)

## we need the response on the on_message method

response = {} ##ws response
makeSomething(response)

return newResponse

is there a way to wait for the async response in the method, just like a Completable in Java?

1

There are 1 best solutions below

1
bmitc On

is there a way to wait for the async response in the method

No, there is no simple way to do that, in terms of a simple keyword, outside of the event loop. All coroutines must run inside an event loop, and only coroutines can call the await keyword.

asyncio.run will run the passed in coroutine on the event loop and then return its result when finished, but it is mainly intended to be the entry point for the entire program or an entire thread. It will block the thread it is called on until the coroutine(s) is(are) completely finished, and it has some setup and cleanup it does for the event loop, so it is heavy handed.

The primary method to interact with an event loop is to use asyncio.run_coroutine_threadsafe. Note that this requires you to have a reference to the asyncio event loop itself. I am not familiar with Flask, and so it sounds like you have a thread running with the WebSocket server event loop. You'll need the reference to that loop.

@app.route("/request", methods=["POST"])
def manageRequest():
    data = request.get_json()

    ## send data to ws
    # Call your coroutine on the running event loop. Have the
    # coroutine return the response with `return ...`
    future = asyncio.run_coroutine.threadsafe(ws.send(data), loop)

    ## we need the response on the on_message method

    # Now, wait on the future's result to get the coroutine's
    # returned result. You can use a timeout if you would like.
    response = future.result() ##ws response
    newResponse = makeSomething(response)

    return newResponse

Another method, which may be desired if your coroutines and WebSocket server are setup in such a way where it isn't desired for the coroutine to return the response value directly, you could use a standard queue.Queue to get data back out of the event loop.

@app.route("/request", methods=["POST"])
def manageRequest():
    data = request.get_json()

    ## send data to ws
    # create a one-element queue for the response
    response_queue = queue.Queue(maxsize=1)
    # Call your coroutine on the running event loop and also pass in
    # your response queue for the reply. Inside the coroutine, process
    # the call, and then place the response on the passed in queue
    # using `Queue.put_nowait`.
    asyncio.run_coroutine.threadsafe(ws.send(data, response_queue), loop)

    ## we need the response on the on_message method

    # Now, wait on the queue to get filled with the response.
    # You could also use a timeout if you'd like.
    response = response.queue.get() ##ws response
    newResponse = makeSomething(response)

return newResponse