Multiprocessing.pool.imap on a cluster gets slower and slower as the iteration goes on

25 Views Asked by At

I am trying to run my code on an HPC cluster, which needs to call some function 8000 times in every one of 100 iterations.

In the first couple of iterations, it runs quite well, about 4 minutes per iteration. I don't know why it keeps getting slower with the iteration continuing for 4-5 hours, every iteration would cost 10+ minutes.

I checked the status of the job, it says the CPU efficiency was about 5% and memory was utilized at about 4%.

Why is this slowing down? Is it about I/O scheduling? Or other issues?

I read the old question text, and if my problem is also about I/O scheduling, do I have to rewrite the outer_program to keep everything in RAM? Or is there an easier way to solve this problem?

About the function, it uses subprocess.run to call an outer program and reads the file of the program's result, which would cost about 0.5 sec in one call.

def A_function(m:np.ndarray):
    model_tmp = m.reshape(-1)
        
    model_file = "."+ multiprocessing.current_process().name + ".rho"

    with open(model_file, 'r') as f:
        # read model from model_file
        ...
        
    data_file = "." + multiprocessing.current_process().name + ".dat"
    

    # maybe rewrite the outer_program to skip the read/write part? Although it may be a little difficult
    outer_program = subprocess.run(["outer_program", model_file, data_file],
                                    stderr=subprocess.STDOUT,
                                    stdout=subprocess.DEVNULL,
                                    check=True)
    # the outer_program would write its result into the data_file


    with open(data_file, 'r') as f:
        # read data from data_file
        ...

    return data

So I want to use multiprocessing.Pool with 64 processes to do parallel computing on 1 node which has 96 cores with 384GB RAM.

with multiprocessing.Pool(processes=64) as pool:
    ...
    ...
    # theta_p is a (8000,N_model) numpy.array
    theta_p_chunks = np.vsplit(theta_p, 8000)
    result = np.stack(list(pool.imap(A_function, theta_p_chunks, 1000)))

I also need numpy and scipy.linalg to deal with some big data array, so I use threadpoolctl to increase the thread number to 8 locally

with threadpool_limits(limits=8, user_api='blas'):
    x = scipy.linalg.solve(A, b) 

with some export likeexport OMP_NUM_THREADS=1 in my sbatch script:

export OMP_NUM_THREADS = 1
export MKL_NUM_THREADS = 1
export OPENBLAS_NUM_THREADS = 1
export VECLIB_MAXIMUM_THREADS = 1
export NUMEXPR_NUM_THREADS = 1
0

There are 0 best solutions below