When I use ThreadPoolExecutor, I can send a requests batch with limitation of parallel requests like this:
with ThreadPoolExecutor(max_workers=MAX_PARALLEL_REQUESTS) as pool:
results = list(pool.map(request_func, requests_input_data))
How can I repeat this behavior with asyncio? Are there some libraries for this or I should write it by myself with something like "wait for first future has been completed then add a new request"?
Python's asyncio itself has the
run_in_executorcall, which will run synchronous code inside a ThreadPoolExecutor - then you can have the exact same semantics.Otherwise, if you want to add new asynchronous tasks, and keep the running tasks down to a limit, then you have to roll your own code - possibly around an asyncio.Semaphore and the
waitcall.It is not hard to do a working version, good enough to be used - you can then improve the API as much as you want (return exceptions/ignore/raise), return partial results after a timeout, etc...
One that will run everything before returning can be made with shorter code. Note that the class bellow is mostly boiler plate, but for the 4 lines of core-logic inside the
resultsmethod.Note that this code will avoid creating the tasks - holding the co-routine function callable and arguments to it, as usually done in synchronous code. That is needed because if we'd create the task at once (to keep it as an object in ".pending_tasks"), the asyncio-loop would automatically step through these pending tasks, regardless of they being included in the call to
asyncio.wait: each time async code finds anawaitit will step over all ready tasks. In "real life" these tasks could start an HTTP API transaction, or SQL request - and the target server would be overwhelmed by requests, regardless of we carefully picking only "max_workers" results at a time. The simpler alternative to doing this is indeed using asyncio semaphores (as I noted above, and ended up not needing in this code) - but inside the task code itself.For example:
if there are many such co-routines representing a task, one might use a decorator to automatically limit the number of started tasks targeting the same I/O resource.