I'm quite new to using multiprocessing and I'm trying to figure out if there is a better way to use multiprocessing when it has to be within a loop. Or at least I think it has to be within a loop. Let me try to describe the outline and then a better solution may be obvious.
I'm processing large spatial datasets with lots of data points. I'm trying to calculate values for smaller sub-regions of the large dataset by splitting it up with a regular grid. Each grid calculation can be calculated independently of another so this makes it perfect for parallel processing - I just need to know what data to send to each grid. Typically, there will be 1000's of the smaller grid-bins.
However, I'm trying to track these changes over time so I'm then repeating the same process for each timestep of data. Typically there will be 10-500 timesteps of data to process. Each timestep's data is stored in a several files on disk that I have to load when processing the timestep.
Currently my code looks a bit like the following psuedo-code and it takes about 30-60s to process a timestep. Reading and binning probably only takes a few seconds of that time and the rest is the multi-processing of each bin.
from multiprocessing import Pool
for t in range(num_timeteps_to_process):
# read data files for this timestep
data_1 = read_file1_data(path, t)
data_2 = read_file2_data(path, t)
data_3 = read_file3_data(path, t)
# find what data is in each grid_bin - returns data and bin_id
binned_data = spatial_binning_function(data_1, num_bins_x=100, num_bins_y=100)
# loop through all bins doing calculation
bin_calc_arg_list = []
for bin_indx, data_id in enumerate(binned_data):
bin_calc_arg_list.append((bin_indx, data_id, data_1, data_2, data_3, other_settings))
print(" Running Multiprocessing Pool")
# create the process pool - multiprocessing
with Pool(processes=num_processors) as pool:
# execute a task
results = pool.starmap(multiprocess_timesteps_bins, bin_calc_arg_list)
# close the process pool
pool.close()
# wait for issued tasks to complete
pool.join()
# collate results here and return
...
...
...
Data_1, Data_2 and Data_3 are large numpy arrays of size n x 12, where n is typically larger than 200,000.
Each call to the multiprocess_timesteps_bins function returns a tuple of a unique bin_id and a list of 30 different calculated properties for the bin. results is then added to a dictionary with the timestep as the key.
That's the general flow of my code and it's quite a bit quicker than any single-threaded implementation but I'm aware that the opening and closing of the processing pool within each timestep loop is probably a really bad idea but the reading and subdivision of data needs to be done per timestep as I don't think running multiple processes where each one is reading the same data a few thousand times is better.
From a few print statements it seems that there may be several seconds required starting up the pool each timestep and maybe some more time spent shutting it down.
As I said, I'm not overly familiar with multiprocessing yet so I'm not quite sure what is the best way to set up a problem like this - I'm assuming my implementation is not how it should be done, but it works and it's faster than single threaded.
Should I be using some other dynamic queue that I open at the start and then pop items into that queue during each timestep? And then close once at the end?
I think what I want to do is this - move the pool creating outside the loop, read the data and then use starmap to call multiple processors to processes the bins, and because starmap is a blocking call I can just process the results before restarting the loop, while keeping the pool open.
Only close and join when I'm finished this part of the code.
This will save a lot of time for starting and shutting down the pool each timestep when using the context manager ad should be ok to use like that?
Any pointers or suggestions welcome.