i have server ip:192.168.33.10 launche the schudeler dask scheduler --host 0.0.0.0 this is master in this server i have file "/var/shared/job_skills.csv" and the workers is
192.168.33.11,192.168.33.12 launched with this cmd dask worker 192.168.33.10:8786 --local-directory /var/test --dashboard-address 8787 --host 0.0.0.0 --worker-port 39040 --nanny-port 39042
i wanna start script read_csv.py in master and distribute the task to worker , like chunck the data and do aggregation and every worker return result and print result .
i wannna do like this this scripte in master "read_csv.py"
import dask
import dask.dataframe as dd
from dask.distributed import Client
dask.config.set({"dataframe.convert-string": False})
client = Client("192.168.33.10:8786")
df = dd.read_csv("/var/shared/foo.csv")
df['job_skills'] = df['job_skills'].fillna('')
df = df["job_skills"].str.split(',').explode().str.strip()
grouped = df.value_counts().compute()
print(grouped)
in the workers give me like this :
2024-02-29 14:30:04,180 - distributed.worker - WARNING - Compute Failed
Key: ('str-strip-956897fad2adeffa85aa604734f0febb', 0)
Function: execute_task
args: ((subgraph_callable-50304a7f18bfe19fd3ff56b4a6d6db4f, 'str', 'strip', (), {}, 'explode-e2622e44a85e1396024ff799e4f97b6e', 'split', {'pat': ',', 'n': -1, 'expand': False}, 'getitem-f2b8974c7433cce59ce3453d7f35940e', 'job_skills', '', 'getitem-b9a23a03a236420b7c31ada8ec6055df', [(<function read_block_from_file at 0x7fd18829f920>, <OpenFile '/var/shared/foo.csv'>, 0, 1398, b'\n'), None, True, True]))
kwargs: {}
Exception: "FileNotFoundError(2, 'No such file or directory')"
how i can resolve this ?
One way to solve this without copying the file to workers.
Now each workers will read directly from source, and only those bytes it will be working on.
Possible alternatives for others: