Deadlock with Python async processes and semaphore

329 Views Asked by At

Why does this deadlock?

#!/usr/bin/env python3

import asyncio

async def _stream_subprocess(id: int, command: "list[str]"):
    proc = await asyncio.create_subprocess_exec(*command, stdout=asyncio.subprocess.PIPE)
    await proc.wait()
    print(f"{id}: Done")


async def run(id: int, command: "list[str]"):
    print(f"{id}: Running")
    await _stream_subprocess(id, command)
    print(f"{id}: Throwing")

    raise RuntimeError("failed")


async def run_with_parallelism_limit(id: int, command: "list[str]", limit: asyncio.Semaphore):
    async with limit:
        print(f"{id}: Calling run")
        await run(id, command)
        print(f"{id}: Run finished")


async def main():
    sem = asyncio.Semaphore(1)
    await asyncio.gather(
        run_with_parallelism_limit(0, ["python", "--version"], sem),
        run_with_parallelism_limit(1, ["python", "--version"], sem),
    )


if __name__ == "__main__":
    asyncio.run(main())

Output:

0: Calling run
0: Running
0: Done
0: Throwing
1: Calling run
1: Running

There's this question which is possibly related but it's difficult to tell since it isn't solved and the code is different.

1

There are 1 best solutions below

3
Andrej Kesely On

Put return_exceptions=True to asyncio.gather():


...

async def run_commands(commands: "list[list[str]]", num_jobs: int):
    sem = asyncio.Semaphore(num_jobs)
    tasks = [run_with_parallelism_limit(command, sem) for command in commands]
    await asyncio.gather(*tasks, return_exceptions=True)  # <-- here!

...

Then the result is:

RUN
Running
Throwing
RUN
Running
Throwing