I'm using multiprocessing module and I'm trying to write numpy arrays to a zipfile, concurrently. Actually, I'm manually writing a npz file. I'm doing so to write the npz file incrementally because the content may be huge and may overflows the memory. My problem here is that the code looks good but the resulting zipfile is not what I expect. Here is my code below:
import numpy as np
import zipfile
import io
import multiprocessing as mp
class ZipStreamEncoder(object):
def __init__(self, filename, manager):
self.arrayNb = manager.Value('i', 0)
self.lock = manager.Lock()
self.fileName = filename
def open(self):
self.zipfile = zipfile.ZipFile(self.fileName, 'a', compression=zipfile.ZIP_DEFLATED, allowZip64 = True)
def writeArray(self, arrayToExport, proc):
with self.lock:
print("Process " + str(proc) + " is writing array " + str(self.arrayNb.value))
buf = io.BytesIO()
np.lib.npyio.format.write_array(buf, arrayToExport)
self.zipfile.writestr(str(self.arrayNb.value) + ".npy", buf.getvalue())
self.arrayNb.value += 1
def close(self):
self.zipfile.close()
def extractor_worker(proc, zip_object):
zip_object.open()
zip_object.writeArray(np.array([[0,1,2],[0,1,2]]), proc)
zip_object.writeArray(np.array([[0,1,3],[0,1,3]]), proc)
zip_object.close()
return None
if __name__ == '__main__':
nb_processes = 4 #mp.cpu_count()
manager = mp.Manager()
q_tasks = manager.Queue()
pool = mp.Pool(nb_processes)
zip_object = ZipStreamEncoder("my_test.zip", manager)
jobs = []
for proc in range(nb_processes):
job = pool.apply_async(extractor_worker, (proc, zip_object))
jobs.append(job)
for job in jobs:
job.get()
pool.close()
pool.join()
This is what I get from the console:
Process 0 is writing array 0
Process 0 is writing array 1
Process 1 is writing array 2
Process 2 is writing array 3
Process 3 is writing array 4
Process 1 is writing array 5
Process 2 is writing array 6
Process 3 is writing array 7
And in the zip archive, I have only 0.npy, 1.npy, 4.npy and 7.npy. Why that and where are the other files? I'm using Python 2.7 (can't change, nor upgrade, nor add libraries). I'd like to keep the file open because I'll call "writeArray" function a lot of times and opening and closing it is literally 10 times slower (this is important in my case). I'm also aware that I could dedicate a process to write the file, but this would be a waste of ressource in my case as it would wait for other processes to send data, while it could be used for complex tasks. Moreover, the writing process is not the botleneck in my case. I there any way I can fix this code?