Why is my function not behaving asynchronously (asyncio to_thread)?

40 Views Asked by At

Learning from this video, I'm trying to make my codes run faster.
I have a synchronous get function (here represented by mySynchronousFunction) that I want to evaluate over several values in a List.
I am coding all this in a jupyter notebook.

Here is a MWE:


# Cell one
import asyncio
import pandas as pd
import requests
from time import sleep

# cell two
def mySynchronousFunction():
    sleep(4)
    return(True)

async def turningMyFunctionAsync(string):
    print(f"starting job for {string}")
    returnVal = await asyncio.to_thread(mySynchronousFunction)
    print(f"end of job for {string}")
    return(returnVal)

# Cell tree
myList = [{'val':'val1'},{'val':'val2'},{'val':'val3'},{'val':'val4'},{'val':'val5'}]
async def mainAsyncFunction():
    for val in myList:
        val['result'] = await asyncio.create_task(turningMyFunctionAsync(val['val']))

# Cell four
await mainAsyncFunction()

# Prints:
# starting job for val1
# end of job for val1
# starting job for val2
# end of job for val2
# starting job for val3
# end of job for val3
# starting job for val4
# end of job for val4
# starting job for val5
# end of job for val5

I was expecting cell four to print all the "starting job for val1" first (i.e. launching all the processes at ones) considering the huge sleep time. But everything just run synchronously. What did I do wrong?

Thanks.

1

There are 1 best solutions below

2
bmitc On BEST ANSWER

The issue is the for loop inside mainAsyncFunction. The for loop is making your tasks synchronous because inside each iteration, you are awaiting the end of the task. So in each iteration, you create the task and then await it, which means that you start mySynchronousFunction in another thread and then get its value, all before continuing to the next iteration.

Also, note that your code actually doesn't run as is. You cannot call await outside of a coroutine. The only place this is allowed, I believe, is inside a Python REPL, which is done as a special case to make things easier to test I suppose.

There are several ways to solve this. There is async for which could potentially be used, but to be honest, I don't know how to use it. There are also TaskGroups. In this case, I created a solution that keeps it simple. We can simply build up a list of coroutines. Then, asyncio.gather is called on that list, which turns every coroutine into a task if it isn't already a task. Note the difference between running a coroutine directly and a task is that tasks execute concurrently. asyncio.gather is just one way to launch coroutines as concurrent tasks. Also note that asyncio.gather does not take a list. So one needs to use the *coroutines to pass the list as comma-separated arguments. In other words, asyncio.gather(coro1, coro2, coro3) is equivalent to asyncio.gather(*coros) for coros = [coro1, coro2, coro3]. Lastly, asyncio.gather returns a list of the task results in the order that they were passed into asyncio.gather.

async def mainAsyncFunction():
    coroutines = [turningMyFunctionAsync(val["val"]) for val in myList]
    results = await asyncio.gather(*coroutines)
    print(f"Results: {results}")
    return results

asyncio.run(mainAsyncFunction())

This prints out:

starting job for val1
starting job for val2
starting job for val3
starting job for val4
starting job for val5
end of job for val1
end of job for val2
end of job for val3
end of job for val4
end of job for val5
Results: [True, True, True, True, True]

My final comment is that asyncio does not necessarily make your code faster. It makes it concurrent. In cases where you are using pure coroutines and tasks and are not deferring to threads, it is possible that asyncio is faster because it is single threaded and thus avoids the overhead of switching threads. However, in your case, you are deferring to a thread. By default asyncio.to_thread uses a ThreadPoolExecutor, which I believe defaults to creating a thread pool that has as many threads as cores, so it does save some overhead of creating threads, but it still switches between them.