python polars - kernel keep crashing while concatenating thousands of csv/feather files

2k Views Asked by At

I'm working with thousands csv/feather files (each contains ~xxx) on python. I initially use pandas to do the task but it takes so long, hence I try to use polars.
Note: the pandas version of my code is similar to the one below, it works just fine, but takes so long to finish.

The Goal

To read/scan all the csv/feather files then re-write it as one parquet file.
But before doing this, I need to read all the csv/feather files then concat it into 1 dataframe.

Problem and Question

Problem: When I run the code on VSCode jupyter notebook, but the kernel keep crashing

Canceled future for execute_request message before replies were done
The Kernel crashed while executing code in the the current cell or a previous cell. Please review the code in the cell(s) to identify a possible cause of the failure. Click here for more info. View Jupyter log for further details.

Question: How do I efficiently concatenate all the file without the kernel crashing?

Current workflow

  1. Read or scan the file from the folder
  2. Concatenate the file to the next one
all_tables = glob.glob(os.path.join(EXPORT_TABLE_FOLDER, "*.{}".format(table_format)))

parquet_name = "simplified_all_spatial_join_data_{}_p0.parquet".format(location_name)
parquet_path = os.path.join(EXPORT_TABLE_FOLDER, parquet_name)

# read the first file
if table_format == 'csv':
    temp_all_spatial_join_data = pl.scan_csv(all_tables[0], infer_schema_length=0)
else:
    temp_all_spatial_join_data = pl.scan_ipc(all_tables[0], infer_schema_length=0)

# read the rest of the files
if not os.path.exists(parquet_path):
    # clone the first scan as a placeholder (to be concatenated later)
    collected_temp_all_spatial_join_data = temp_all_spatial_join_data.collect().clone()
    
    # iterate through the files
    for table, iteration in tqdm(zip(all_tables[1:], range(len(all_tables[1:]))), total = len(all_tables[1:])):
        if table_format == 'csv':
            temp = pl.scan_csv(table, infer_schema_length=0)
        else:
            temp = pl.scan_ipc(table, infer_schema_length=0)

        temp_all_spatial_join_data = pl.concat([temp_all_spatial_join_data, temp])

        # each 25th iteration, collect the lazyframe as dataframe,
        if iteration % 25 == 0:
            collected_temp_all_spatial_join_data = pl.concat([
                collected_temp_all_spatial_join_data,
                temp_all_spatial_join_data.collect()
                ]).unique()
            
            # then re-assign the temp_all_spatial_join_data as the current file being scanned
            if table_format == 'csv':
                temp_all_spatial_join_data = pl.scan_csv(table, infer_schema_length=0)
            else:
                temp_all_spatial_join_data = pl.scan_ipc(table, infer_schema_length=0)

else:
    print ('WARNING: This file already exists!\nSkipping the process')

# write the concatenated files into a parquet file
collected_temp_all_spatial_join_data.write_parquet(parquet_path)

I'm not really sure, but I suspect it has something to do with the memory used to store the LazyFrames and the query plan.

The if iteration % 25 == 0: part is my effort to minimize the memory used to store the query plan by partitioning them into chunks of 25 files, collect it into DataFrame, then reset the query plan. It work for a smaller number of files (up to hundreds), but the kernel keep crashing when the file size reached thousands even if I make the chunk smaller.

1

There are 1 best solutions below

3
Dean MacGregor On

A couple initial impressions:

  1. I don't think lazyframes are helping you. At some point they need to go in memory and since you're not filtering or subsetting the columns I'm not sure where the savings would come from.
  2. You don't need to concat at every step. Simply add each file into a list and concat the list at the end. You can have a step where you check for the estimated_size so that if it crashes you know at what size to set a break point.

So do like:

all_tables = glob.glob(os.path.join(EXPORT_TABLE_FOLDER, "*.{}".format(table_format)))
df=[]
chunk=1
accumsize=0
sizethreshold = 1_000_000_000_000_000 # set this to something smaller than the last print after it crashes
for tablefile in all_tables:
    if tablefile.format == 'csv':
        temp = pl.read_csv(tablefile, infer_schema_length=0)
    else:
        temp = pl.read_ipc(tablefile, infer_schema_length=0)
    df.append(temp)
    accumsize += temp.estimated_size()
    if accumsize >= sizethreshold:
        df=pl.concat(df)
        df.write_parquet(parquet_path.replace(".parquet", f"{chunk}.parquet"))
        chunk+=1
        df=[]
        accumsize=0
    else:
        print(accumsize)
df=pl.concat(df)
df.write_parquet(parquet_path.replace(".parquet", f"{chunk}.parquet"))