The current task at hand that I have requires multiple array manipulations that take longer than what is feasible. I am trying to utilize the multiprocessing package to accelerate the process, but I am currently having a problem with mixing np.memmap with multiprocessing. Specifically, I can't seem to be able to write to the memory map or save it afterward.
The current plan is to create a large numpy memmap on the disk, generate data and copy it onto the memmap one by one, then save it to be used later. The problem seems to be coming from the writing or the saving part, as the data generation part works fine. The below is the sample code.
import os
import multiprocessing
import numpy as np
from datetime import date
from tqdm.auto import tqdm
from Micelle_Simulation import Simulated_Micelle
def target(
id_: int,
q_range: np.ndarray,
fp: np.memmap,
shape: str,
r: float,
epsilon: float,
N_agg: int,
rho_beta: float,
b: float,
N: int,
n: int,
f_core: float
) -> None:
micelle = Simulated_Micelle(
shape=shape,
r=r,
epsilon=epsilon,
N_agg=N_agg,
rho_beta=rho_beta,
b=b,
N=N,
n=n,
f_core=f_core
)
n_corona = n*N*N_agg
n_core = int((f_core/(1 - f_core))*n_corona)
params = np.array((
r,
epsilon,
micelle.R_g,
N_agg,
rho_beta,
b,
N,
n,
f_core
))
I_q = micelle.Debye_Scattering(
nodes=micelle.scatterers,
core_node_num=n_core,
q_range=q_range,
rho_beta=rho_beta
)
params_I_q = np.hstack((params, I_q))
fp[id_, :] = params_I_q
print(params_I_q)
def main(*args, **kwargs) -> int:
remainder = 2
p_count = os.cpu_count() - remainder
processes = []
for i in range(p_count):
processes.append(None)
sample_num = 2
shape = 'sphere'
cwd = os.getcwd()
today = str(date.today()).replace("-", "_")
username = os.getlogin()
end = ".npy"
file_name = f"{today}_{username}"
attempt = 0
while True:
temp_name = f"{file_name}_{attempt}{end}"
temp_path = os.path.join(cwd, temp_name)
if not os.path.isfile(temp_path):
path = temp_path
break
else:
attempt += 1
fp = np.memmap(path, dtype='float32', mode='w+', shape=(sample_num, 265))
# storage = np.empty((sample_num, 265))
q_log_range = np.arange(-2, 0, np.true_divide(1, 128))
q_range = np.power(10, q_log_range - 2*np.log10(2))
p_num = 0
print(f'Using {p_count} threads.')
for i in tqdm(range(sample_num)):
r = np.power(2, np.random.uniform(6, 10))
epsilon = 1
N_agg = np.random.randint(8, 33)
rho_beta = np.random.normal(0.1, 0.02)
b = np.power(2, np.random.uniform(4, 7))
N = np.random.randint(2, 9)
n = 6
f_core = np.random.normal(0.7, 0.05)
args = (i, q_range, fp, shape, r, epsilon, N_agg, rho_beta, b, N, n, f_core)
while True:
if processes[p_num] is None or not processes[p_num].is_alive():
p = multiprocessing.Process(target=target, args=args)
p.start()
processes[p_num] = p
break
else:
p_num = (p_num + 1)%p_count
print('Loop completed.')
for process in processes:
if process:
process.join()
else:
continue
print("Task completed.")
fp.flush()
print(fp)
return 0
if __name__ == '__main__':
main()
Ignore any part of the code that involves Micelle_Simulation. It just creates a (265, ) array.
The issue seems to be coming from the following parts of the code.
fp[id_, :] = params_I_q
fp = np.memmap(path, dtype='float32', mode='w+', shape=(sample_num, 265))
while True:
if processes[p_num] is None or not processes[p_num].is_alive():
p = multiprocessing.Process(target=target, args=args)
p.start()
processes[p_num] = p
break
else:
p_num = (p_num + 1)%p_count
My expectation was that the code will create the data one by one, but with multiple processes, copy it onto the disk (on memmap), and save it for later use. So, if I wanted to create 10000 samples, it would create (10000, 265) memmap, generate (265) array data one by one, and copy it to the memmap.
However, what I actually got was an array of zero, as if the memmap hadn't been touched at all.
There were no errors during the test.
P.S. I know that my code for multiprocessing is not optimal. I have not studied CS or SWE, and this was intended as a short script for data generation. Any improvement or advice will be appreciated.
Changed the multiprocessing to multithreading. Seems to work fine for my purpose.