Best recommendation to read parquet files from S3 Bucket and then export into json files

36 Views Asked by At

Objective: I have 1.200 compressed parquet files (gz) in the S3 Bucket. Those files correspond to 67.000.000 of rows. My goal is to read all the parquet files in one dataframe and then group it by a column called id_outlet_info. Once I have grouped the dataframe, I must export each group into a json file. I have to read all the parquet files because the data is distributed along all files, that is why I cannot read one by one.

Strategy implemented: I started using pandas but I got an error of memory after reading the parquet number 600 with the library boto3 and pandas. Then, I decided to find a solution with the dask library. This is my code using the dask library:

import dask.dataframe as dd
import s3fs

def read_parquet_files_with_dask(bucket_name: str, 
                       access_key_id: str,
                       secret_access_key: str,
                       prefix: str,
                       columns_to_keep: list) -> dd:

  # Read all Parquet files under a specific prefix (folder path) in S3 bucket
  s3_path = "s3://" + bucket_name +"/" + prefix + "*.gz.parquet"

  dask_df = dd.read_parquet(s3_path,
                              engine='pyarrow',              
                              columns=columns_to_keep,
                              storage_options={
                                                  'key': access_key_id,
                                                  'secret': secret_access_key
                                                })


   
  return dask_df

def main(logs_file_path: str,
         bucket_name: str,
         access_key_id: str, 
         secret_access_key: str, 
         processing_type: str,
         prefix: str,
         aggregator_file_path: str,
         download_folder_path: str,
         states_file_path: str) -> str:

  try:
    
    data ={}

    ### Some code before, but not relevant

    dask_df = read_parquet_files_with_dask(bucket_name,
                                      access_key_id,
                                      secret_access_key,
                                      prefix,
                                      columns_to_keep)  
   
     
    restaurant_ids = dask_df["id_outlet_info"].unique().compute().sort_values().values.tolist()
    restaurant_ids.append(restaurant_ids[-1])
    dask_df_div = dask_df.set_index('id_outlet_info',
                    divisions=restaurant_ids
                    ).reset_index()

      for id in restaurant_ids:
          ### this compute takes a lot of time     
          pandas_df = dask_df_div.loc[id].compute() 
          print(pandas_df.head())
          pandas_df['option_category_id'] = pandas_df.groupby('option_category').cumcount()
          export_menus_to_json(pandas_df,download_folder_path)
  
  except Exception as e:
        
    logging.error(str(e))
    data['error_message'] = str(e)
    data['stack_trace'] = print_exc()
    
  finally:
    return dumps(data)

Problem: I decided to use the function set_index to create partitions based on the column called id_outlet_info. Then I iterate through the divisions and use loc function to retrieve a particular partition. My understanding is when I use compute() to access in memory the partition, it will take a lot of time because dask is trying to download the data from S3. How could I solve my goal?

0

There are 0 best solutions below