I am facing this error when performing write operation in foreach() on a dataframe. The piece of code was working fine for over 3 months but started failing since last week.

To give some context, I have a dataframe extract_df which contains 2 columns xml_full_name and content. I use the below code to write these records as xmls to a target folder in ADLS.

extract_df.foreach(write_file)

write_file is defined as:

def write_file(row):
    with open(row.extract_path, "wb") as f:
        f.write(row.content)

The notebook also uses spark.write command to write some parquets which is working fine.

On further investigation, I found that this issue could be related to parallelism and as a work around I tried the below:

for row in extract_df.collect():
    with open(row.extract_path, "wb") as f:
        f.write(row.content)

This works which means the connection is fine but parallelism isn't working. However, this can't be used as a fix though as it will bring down the performance.

Anyone here has faced this issue? If this is related to some configuration, any suggestions on what and where I can check. All inputs are welcome. Thanks.

1

There are 1 best solutions below

2
DileeprajnarayanThumula On

I have tried the below:

import os

def write_file_partition(rows):
    for row in rows:
        with open(row.xml_full_name, "w") as f:
            f.write(row.content)
extract_df.foreachPartition(write_file_partition)
write_file_partition(extract_df.collect())

Results:

Enter image description here

In the above code, the foreachPartition() method is used to apply a function to each partition of the RDD/DataFrame.

I have defined a function write_file_partition() which takes a partition of the extract_df DataFrame and writes the content to the file.

You can then call the foreachPartition() method on the extract_df DataFrame and pass the write_file_partition() function as an argument. This will apply the function to each partition of the DataFrame in parallel.