Consider an SSHCluster with multiple hosts.
cluster = SSHCluster(["localhost", "hostname"],
connect_options={"known_hosts": None},
worker_options={"n_workers": params["n_workers"], },
scheduler_options={"port": 0, "dashboard_address": ":8797"},)
client = Client(cluster)
I have tried starting multiple workers on the same host (params["n_workers"] > 1), and found this to be rather wasteful in memory. In fact, I could not even get a successful run, without crashing, very likely due to running out of memory. I don't have these problems with multiprocessing, even when using multiple processes.
I believe a better strategy would be to re-design the worker method to be more fine-grained, and to require smaller input parameters and return smaller output results. This will take me a bit longer to achieve, and in the meantime, I am trying to start 1 worker on each host (params["n_workers"] = 1), and utilise the multiprocessing "pool" in the worker, to parallelise across the available cores in each host. (I would have my own config that decides how many processes to use, etc)
manager = mp.Manager()
pool = mp.Pool()
I tried it, but got the error:
AssertionError: daemonic processes are not allowed to have children
Then I tried to create a Dask Distributed scheduler with scheduler = "processes", which seems to utilise a similar approach to the pool method.
delayed_values = [delayed(worker)(param) for param in params]
futures = compute(delayed_values, scheduler="processes", num_processes=n_processes)
The above code returns the same error. This is likely happening because the "nanny" is creating the workers in multiprocessing processes, and then I am trying to create more processes from within each worker process.
I have found various links suggesting passing the --no-nanny param. See link1 which suggests passing --no-nanny to the worker directly, see link2 which mentions that it is possible to do it via the CLI. But I have not found an example of how to do this programmatically. I am not sure whether this is possible, and if it is possible, how to achieve it (via which object etc). I have tried looking into the Dask code, but have not figured it out.