python multiprocessing locks inside async function

55 Views Asked by At

I have this non-async code which is running in async function

Basically ast.NodeTransformer transformations

def is_lgf_in_tree(tree: ast.Expression) -> bool:
    """Walk tree while lgf call is not found"""
    for n in ast.walk(tree):
        if (
            isinstance(n, ast.Call)
            and isinstance(n.func, ast.Name)
            and n.func.id == "lgf"
        ):
            return True
    return False

def transform(lgf: LGF, nested_lgfs: dict[str, LGF]) -> Tuple[LGF, ast.Expression]:
    transformer = LGFTransformer(nested_lgfs=nested_lgfs)
    tree = ast.parse(lgf.expression, mode="eval")

    while is_lgf_in_tree(tree):  # make flat layer by layer
        tree = transformer.visit(tree)

    return lgf, tree

In case if NodeTransformer code could help

class LGFTransformer(ast.NodeTransformer):
    def __init__(self, nested_lgfs: Dict[str, LGF]):
        super().__init__()
        self.nested_lgfs = nested_lgfs

    def visit_Call(self, node: ast.Call):
        if not isinstance(node.func, ast.Name):
            return self.generic_visit(node)
        if not (node.func.id == "lgf" and len(node.args) == 2 and len(node.keywords) == 0):
            return self.generic_visit(node)
        if not isinstance(node.args[0], ast.Constant):
            return self.generic_visit(node)

        _args = [arg.value for arg in node.args if isinstance(arg, ast.Constant)]
        lgf = self.nested_lgfs.get(node.args[0].value, None)
        if not lgf:
            return node

        exp = ast.parse(lgf.expression, mode="eval")
        return exp.body

Example of straightforward code

    results = []
    for lgf in initial_lgfs:
        results.append(transform(lgf, lgfs_by_name))

profiling 1

Example of my attempt to multiprocess it

    from multiprocess.pool import Pool
    arguments = [(lgf, lgfs_by_name) for lgf in initial_lgfs]
    with Pool(processes=4) as pool:
        results = pool.starmap(transform, arguments)

profiling 2

My idea is to run ast module in parallel to lower computation time

Results are unpleasant for me. What can I change/play with to transform multiple functions with ast.NodeTransformer. Why those new processes are trying to wait on each other. Is it because I share mutual data? Or async function is the problem? Could deepcopy solve it?

What am I missing and why is it trying to synchronize processes?

EDIT: I figured out that this massive delay is caused by process creating stuff. Is there a way to minimize this overhead? I tried to change 'fork' method of process creating to 'spawn' but it didn't help

0

There are 0 best solutions below