Multiprocessing in pandas dataframe for regex pattern matching

35 Views Asked by At

I am working on finding messages that match any regex from a list of unique regexes in a large dataframe. Here is the code:-

def match_regexes(message_body, regexes):
    for regex in regexes:
        if re.search(regex, message_body):
            return True
    return False

##Takes a really long time to run
in_debit_clean_df = ZERO_cashflow_IN_combined_df['message_body'].progress_apply(lambda x: match_regexes(x, in_debit_regexes))
in_credit_clean_df = ZERO_cashflow_IN_combined_df['message_body'].progress_apply(lambda x: match_regexes(x, in_credit_regexes))
in_balance_clean_df = ZERO_cashflow_IN_combined_df['message_body'].progress_apply(lambda x: match_regexes(x, in_balance_regexes))

Now above code thats relly long time to run (with progress_apply shows ~36 hours for the first call) and as I am working on a sagemaker notebook instance (ml.c5.9xlarge) computational power is not a problem. I want to be able to use multiprocessing and utilize all the cores to tackle this problem instead of going with the traditional code above for regex matching in the 'message_body' for a list of unique regexes. I am not very proficient in writing code for multiprocessing, batch processing and parallelization in general, so asking in community of someday can help me with this problem?

I tried some chatgpt naive multiprocessing code like this:-

def match_regexes(message_body, regexes, progress_counter):
    for regex in regexes:
        if re.search(regex, message_body):
            with progress_counter.get_lock():
                progress_counter.value += 1
            return True
    return False

def process_dataframe(df, regexes, column_name):
    total_rows = len(df)
    processed_rows = multiprocessing.Value('i', 0)
    start_time = time.time()
    
    # Create progress counter
    progress_counter = multiprocessing.Value('i', 0)
    
    def process_row(row):
        return match_regexes(row[column_name], regexes, progress_counter)
    
    # Process dataframe using concurrent processing
    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = list(executor.map(process_row, df.itertuples(index=False, name=None)))
    
    # Track progress
    elapsed_time = time.time() - start_time
    print(f"\rProcessed: {processed_rows.value}/{total_rows} rows | Elapsed: {timedelta(seconds=int(elapsed_time))}", end="", flush=True)
    
    return results

# Example usage
ng_debit_clean_df = process_dataframe(ZERO_NG_combined_df, ng_debit_regexes, 'message_body')
ng_credit_clean_df = process_dataframe(ZERO_NG_combined_df, ng_credit_regexes, 'message_body')

but this seems to be running even slower than the non-parallelization code and seems to be not even utlizing all the cores and cpu utilization also doesnt fill up with this.

Computational power isn't a limit for me but I want to be able to use some sort of multiprocessing, batch processing, parallelization to get the very above code attached running faster

0

There are 0 best solutions below