Parallelizing, Multiprocessing, CSV writer

63 Views Asked by At

I have a huge list of strings called term_list that I process one-by-one in a function called run_mappers(). One of the args is a csv_writer object. I append results to a list called from_mapper in the function. I write that list to a csv file using the csv_writer object. In my scouring for help, I read that multiprocessing module is not recommended for csv writing because it it pickles and csv_writer objects can't be pickled (can't find reference for this now in my billion tabs open on my desktop). I am not sure if multiprocessing is best suited for my task anyway.

def run_mappers(individual_string, other_args, csv_writer):
   # long processing code goes here, ending up with processed_result 
   from_mapper.append(processed_result)
   csv_writer.writerow(processed_result)

I want to speed up processing of this huge list, but am trying to control for memory usage by splitting the list into batches to process (term_list_batch). So I try:

def parallelize_mappers(term_list_batch, other_args, csv_writer):
    
    future_to_term = {}
    terms_left = len(term_list_batch)

    with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
        future_to_term = {executor.submit(run_mappers, term_list_batch, other_args, csv_writer): term for term in term_list_batch}
        try:
            for future in concurrent.futures.as_completed(future_to_term, timeout=180): # timeout after 3 min
                term = future_to_term[future]
                try:
                    result = future.result()
                    # Process result if needed
                except Exception as exc:
                    print(f"Job {term} generated an exception: {exc}")
                finally:
                    terms_left -= 1
                    if terms_left % 10 == 0:
                        gc.collect()
                        time.sleep(2)
        except concurrent.futures.TimeoutError:
            print("Timeout occurred while processing futures")
            for key, future in future_to_term.items():
                if key not in results:
                    future.cancel()

When I get a Timeouterror, my process just hangs and I'm not sure what to do to keep moving forward in my huge term_list. I also don't want to terminate the program. I just want to keep moving through term_list, or process the next batch. If a thread fails or something, I just want to ignore the term or toss the whole thread and continue processing term_list to write as many results to the file as I can.

Amongst my many attempts to trouble-shoot, I tried something like this, but am posting the one above as my best shot since it crunched through a few hundred terms before stalling on me. Other tries I've had had just died, had some Runtime error, had threads deadlocking, etc.

For reference, another attempt is below:

def parallelize_mappers(term_list_batch, other_args, csv_writer):
    
    timeout = 120
    terminate_flag = threading.Event()

    # Create a thread for each term
    threads = []
    for term in term_list_batch:
        thread = threading.Thread(target=run_mappers, args=(term, other_args, csv_writer, terminate_flag))
        threads.append(thread)
        thread.start()

    # Wait for all threads to complete or timeout
    for thread in threads:
        thread.join(timeout)

        # If the thread is still alive, it has timed out
        if thread.is_alive():
            print("Thread {} timed out. Terminating...".format(thread.name))
            terminate_flag.set()  # Set the flag to terminate the thread

Then I added a while not terminate_flag.is_set() to the run_mappers() function before executing rest of processing code. But this is just unbearably slow. Thank you in advance.

Mock code to reproduce/term_list to process below:

term_list = ['Dementia',
 'HER2-positive Breast Cancer',
 'Stroke',
 'Hemiplegia',
 'Type 1 Diabetes',
 'Hypospadias',
 'IBD',
 'Eating',
 'Gastric Cancer',
 'Lung Cancer',
 'Carcinoid',
 'Lymphoma',
 'Psoriasis',
 'Fallopian Tube Cancer',
 'Endstage Renal Disease',
 'Healthy',
 'HRV',
 'Recurrent Small Lymphocytic Lymphoma',
 'Gastric Cancer Stage III',
 'Amputations',
 'Asthma',
 'Lymphoma',
 'Neuroblastoma',
 'Breast Cancer',
 'Healthy',
 'Asthma',
 'Carcinoma, Breast',
 'Fractures',
 'Psoriatic Arthritis',
 'ALS',
 'HIV',
 'Carcinoma of Unknown Primary',
 'Asthma',
 'Obesity',
 'Anxiety',
 'Myeloma',
 'Obesity',
 'Asthma',
 'Nursing',
 'Denture, Partial, Removable',
 'Dental Prosthesis Retention',
 'Obesity',
 'Ventricular Tachycardia',
 'Panic Disorder',
 'Schizophrenia',
 'Pain',
 'Smallpox',
 'Trauma',
 'Proteinuria',
 'Head and Neck Cancer',
 'C14',
 'Delirium',
 'Paraplegia',
 'Sarcoma',
 'Favism',
 'Cerebral Palsy',
 'Pain',
 'Signs and Symptoms, Digestive',
 'Cancer',
 'Obesity',
 'FHD',
 'Asthma',
 'Bipolar Disorder',
 'Healthy',
 'Ayerza Syndrome',
 'Obesity',
 'Healthy',
 'Focal Dystonia',
 'Colonoscopy',
 'ART',
 'Interstitial Lung Disease',
 'Schistosoma Mansoni',
 'IBD',
 'AIDS',
 'COVID-19',
 'Vaccines',
 'Beliefs',
 'SAH',
 'Gastroenteritis Escherichia Coli',
 'Immunisation',
 'Body Weight',
 'Nonalcoholic Steatohepatitis',
 'Nonalcoholic Fatty Liver Disease',
 'Prostate Cancer',
 'Covid19',
 'Sarcoma',
 'Stroke',
 'Liver Diseases',
 'Stage IV Prostate Cancer',
 'Measles',
 'Caregiver Burden',
 'Adherence, Treatment',
 'Fracture of Distal End of Radius',
 'Upper Limb Fracture',
 'Smallpox',
 'Sepsis',
 'Gonorrhea',
 'Respiratory Syncytial Virus Infections',
 'HPV',
 'Actinic Keratosis']
2

There are 2 best solutions below

1
Hai Vu On BEST ANSWER

The way I see it, you want to parallel or multitask run_mappers() because this function might take a long time to finish. The CSV writing part does not need to be run in parallel because it is done relatively quick.

The first step is to redesign run_mappers() NOT to take in as parameter a CSV writer. Instead, this function should return the processed_result. This function might raise an exception and we will ignore the result for that thread. To be useful, I will write the errors out to err.txt

import csv
import logging
import random
import time
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s | %(levelname)s | %(message)s",
)

term_list = [
    "Dementia",
    # ... omitted for brevity
    "Actinic Keratosis",
]


def run_mappers(individual_string, other_args):
    # Simulate long processing time to get processed_result
    time.sleep(random.randint(1, 2))
    processed_result = [individual_string.strip(), other_args]

    # Simulate an exception
    if random.randint(1, 20) == 5:
        logging.error("%r -> failed", individual_string)
        raise (ValueError(individual_string))

    logging.debug("%r -> %r", individual_string, processed_result)
    return processed_result


def main():
    """Entry"""
    # run_mappers takes a long time, so this part is done in parallel
    with ThreadPoolExecutor() as executor:
        futures = [
            executor.submit(run_mappers, term, "other-args")
            for term in term_list
        ]

    # Writing to CSV does not need to be done in parallel because
    # it is relatively quick
    logging.info("Writing to CSV")
    with open("out.csv", "w") as stream, open("err.txt", "w") as err:
        writer = csv.writer(stream)
        for future in futures:
            if future.exception():
                err.write(f"{future.exception()}\n")
            else:
                writer.writerow(future.result())
    logging.info("Done CSV")


if __name__ == "__main__":
    main()

Output

2024-03-02 09:49:00,335 | DEBUG | 'HER2-positive Breast Cancer' -> ['HER2-positive Breast Cancer', 'other-args']
2024-03-02 09:57:55,174 | ERROR | 'Breast Cancer' -> failed
2024-03-02 09:49:11,366 | DEBUG | 'HPV' -> ['HPV', 'other-args']
...
2024-03-02 09:49:11,377 | DEBUG | 'Sepsis' -> ['Sepsis', 'other-args']
2024-03-02 09:49:11,377 | INFO | Writing to CSV
2024-03-02 09:49:11,378 | INFO | Done CSV

Notes

  • Run this script and if the results look alright, you can add your real run_mappers() code
  • I have no idea what other_args look like, so I fake it
  • You might want to replace ThreadPoolExecutor with ProcessPoolExecutor and compare the timing to see which solution works more efficently
0
potato On

Hai Vu's answer seems legit and I want to test it, but I thought I'd post my current lazy workaround in case it helps. I do follow the suggestion to not pass the csv_writer object in run_mappers(), that part is definitely on the money. I just pass the file name of the output instead, and open the file for writing in run_mappers(), adding a csv lock so that only 1 thread can have the csv_writer object at a time. I also ditched concurrent.futures and went with this nifty Parallel library. Not sure if this is entirely correct, but it's giving me the output I want...

from joblib import Parallel, delayed
csv_writer_lock = threading.Lock()

def run_mappers(individual_string, other_args, csv_filename):

   # long processing code goes here, ending up with processed_result 
   from_mapper.append(processed_result)
   # if file does not exist, create it and write
   else:
   with open(csv_filename, mode='a'):
       csv_writer.writerow(processed_result)

def parallelize_mappers(term_list, other_args, mapping_filename):
    n_workers = 2 * multiprocessing.cpu_count() - 1
    Parallel(n_jobs=n_workers,backend="multiprocessing")(
        delayed(run_mappers)
        (term, other_args, mapping_filename) 
  for term_pair in term_pair_list
  )