I am facing this error when performing write operation in foreach() on a dataframe. The piece of code was working fine for over 3 months but started failing since last week.
To give some context, I have a dataframe extract_df which contains 2 columns xml_full_name and content. I use the below code to write these records as xmls to a target folder in ADLS.
extract_df.foreach(write_file)
write_file is defined as:
def write_file(row):
with open(row.extract_path, "wb") as f:
f.write(row.content)
The notebook also uses spark.write command to write some parquets which is working fine.
On further investigation, I found that this issue could be related to parallelism and as a work around I tried the below:
for row in extract_df.collect():
with open(row.extract_path, "wb") as f:
f.write(row.content)
This works which means the connection is fine but parallelism isn't working. However, this can't be used as a fix though as it will bring down the performance.
Anyone here has faced this issue? If this is related to some configuration, any suggestions on what and where I can check. All inputs are welcome. Thanks.
I have tried the below:
Results:
In the above code, the
foreachPartition()method is used to apply a function to each partition of the RDD/DataFrame.I have defined a function
write_file_partition()which takes a partition of the extract_df DataFrame and writes the content to the file.You can then call the
foreachPartition()method on the extract_df DataFrame and pass thewrite_file_partition()function as an argument. This will apply the function to each partition of the DataFrame in parallel.