dask - speed up column filtering

27 Views Asked by At

I have a 18 GB .parquet file with a ~300M rows of accounting data (which I cannot share) and split in to 53 row groups. My task is to 'clean' the data by retaining in each cell, only specific words from a dictionary. The reading of the file is trouble free. The processing of the file ends in a segmentation fault on a 20-core, 128GB RAM Ubuntu 22.04 desktop.

Using Python and the Dask library, I convert the data in to a Dask data frame with the following columns:

['rowid', 'txid', 'debit', 'credit', 'effective_date', 'entered_date', 'user_id', 'transaction', 'memo', 'type', 'account', 'total_amt']

The columns to be cleaned in this file are memo, type, and account. My approach is to take each of those columns and apply a filter_field and a hash_field method to them:

        if isinstance(data, dd.DataFrame):
            # clean memo columns
            # data = data.repartition(npartitions=20) <incl. this line in experiments with partition size>
            result = [data[col].apply(lambda x: self.filter_field(text=x, word_dict=word_dict), meta=data[col]) for col in memo_columns]
            for i, col in enumerate(memo_columns):  # this loop seems to be req'd to assign values
                data[col] = result[i]
            
            # second: hash the name/id fields
            id_cols = name_columns + id_columns + account_columns
            result = [data[col].apply(lambda x: self.hash_field(text=x), meta=data[col]) for col in id_cols]
            for i, col in enumerate(id_cols):
                data[col] = result[i]
                
            del result
            gc.collect()

The filter_field takes each cell, removes symbols, then checks to see if remaining words are in a dictionary, and if they are not the words are dropped.

The hash_field is just shake_256(text.encode(encoding='utf8')).hexdigest(20).

I know the filtering is sound b/c everything runs fine on near identical files of up to 128M rows. Two things happen when I run this larger file:

  1. at some point I get a segmentation fault. Sometimes it happens early in the processing, sometimes later.
  2. rarely are more than 3 cores processing at more 30-50% and when there are more the additional cores are at ~1-2% (observed via htop)

What I would like to know:

  1. is there a better approach than the looping/vectorizing I used; or
  2. how can I get more cores working on the process.

Notes:

  1. I tried various approaches with partition size changes, both by varying number and size. There was no visible improvement in processing and the large file still threw a seg fault.
  2. what I expect my approach to be doing is taking each column presented, dividing it in to ~20 pieces, then processing those pieces in parallel (one piece per processor).
0

There are 0 best solutions below