How does Dask manage file descriptors?
For example when creating a dask.array from an hdf5 file. When the array is large enough to be chunked.
Do the created tasks inherit the file descriptor created in the Client? Or every single task reopens the file in the worker?
How other cases are managed?
Edit
Here is an a small example:
from distributed import Client
import dask.array as da
import h5py
c = Client(scheduler_file="scheduler.json")
file = "array.h5"
array = da.from_array(h5py.File(file)["/array"]).rechunk((1, 1000, 1000))
array2 = da.from_array(h5py.File(file)["/array"]).rechunk((1, 1000, 1000))
res = array.max() - array2.max()
c.compute(res).result()
c.shutdown()
Short answer: when using distributed (i.e.,
Client, as in the question), all tasks must be serialised and passed to worker processes. This means that low-level file discriptors can never be shared, and the file will be reopened on workers. This is one of the reasons that "cloud native" formats like zarr exist.When you are using hdf with distributed, your File object will go through a special serialisation to ensure that it is rehydrated on workers to be as it was on the client with the minimum of disk reads.
If you are using hdf with the threaded scheduler, you will remain in the same process and the open file handle will be passed around.