Multiprocessing pool, whether using map or async, hangs on last couple processes

63 Views Asked by At

I have a function that I am applying to hundreds of chunks of my larger dataset -- each individual job only takes a minute or two, and outputs a file if it is successful. This holds true for the first ~740 out of my ~750 individual jobs, but then the last several jobs, especially the very last one, seems to take increasingly longer, for no apparent reason (no errors/Exceptions). Originally my function was simply return a result which was being appended to a global list. I thought this might be the issue, so I changed to function to output the file, which I can access later on. However, that did not fix the lag issue. Next, I thought it might have to do with my reliance on the imap_unordered function within a context manager, which should take care of joining spawned processes and closing the pool:

  with get_context("spawn").Pool(processes=processes) as p:
        max_ = len(coverage_counting_job_params)
        print("max_ is {}".format(max_))
        with tqdm(total=max_) as pbar:
            for _ in p.imap_unordered(get_edit_info_for_barcode_in_contig_wrapper, coverage_counting_job_params):
                pbar.update()
                
                total_contigs += 1
                total_time = time.perf_counter() - start_time
                total_seconds_for_contig[total_contigs] = total_time

After looking at some other posts here, I thought perhaps something was going on with the joining, and switched to using the async function in order to manager it more manually:

    def update(result):
        pbar.update()
        num_files_made = len(glob("{}/*.tsv".format(coverage_processing_folder)))
        #print("Made {}".format(num_files_made))
        if num_files_made == max_:
            print("All {} expected files are present!".format(max_))
            return

        
   for i in range(pbar.total):
       pool.apply_async(
           get_edit_info_for_barcode_in_contig_wrapper,
           args=(coverage_counting_job_params[i],), 
           callback=update)
  # wait for completion of all tasks:
  print("Closing pool...")
  pool.close()
  print("Joining pool...")
  pool.join()

I am using spawn because I am using polars within my function and it seems it only works with the "spawn" type of context.

However, even using apply_async still had the same issue.

My multiprocessing job should take only 50 minutes or so based on the per-job time, when using 30 cores, but instead is taking upwards of 2 hours because of this weird hanging. I am going crazy trying to figure out how to get around this issue, as the lag makes the software painfully slow for end-users.

Any ideas what might be going on?

Update:

  • When I kill my original job because it is hanging, I sometimes get errors about leaked semaphores like "There appear to be 7 leaked semaphore objects to clean up at shutdown"
0

There are 0 best solutions below