Populating and processing the same queue/list in different processes without deleting items from the queue/list

48 Views Asked by At

I am a bit confused about how even to write a title for this question. I am trying to write a code that is able to populate a queue in a process and use the data in the queue in several different processes. But I also do not want the items to be deleted from the queue until all processes are done with their job. I know the queues aren't supposed to work like this. So, I tried to use lists. I can populate the list with multiprocess.manager(), but it is suddenly empty when I pass it to different processes. I used the answer to this question and utilized 2 queues:

import multiprocessing
import random
import time

def save_data(save_que):
    for data in iter(save_que.get, "STOP"):
        print("Process ID: ", multiprocessing.current_process().name, ", Data: ", data, ", Length of queue: ", save_que.qsize())
        if multiprocessing.current_process().name == "Process-3":
            time.sleep(1)
    print("All data saved")
    return

def produce_data(save_que_1, save_que_2):
    for data in range(10):
        time.sleep(random.random())
        print("sending data", data)
        save_que_1.put(data)
        save_que_2.put(data)
    save_que_1.put("STOP")
    save_que_2.put("STOP")
  
if __name__ == "__main__":
    manager_1 = multiprocessing.Manager()
    manager_2 = multiprocessing.Manager()
    save_que_1 = manager_1.Queue()
    save_que_2 = manager_2.Queue()
    save_p_1 = multiprocessing.Process(target=save_data, args=(save_que_1,))
    save_p_2 = multiprocessing.Process(target=save_data, args=(save_que_2,))
    save_p_1.start()
    save_p_2.start()
    produce_data(save_que_1, save_que_2)
    save_p_1.join()
    save_p_2.join()

This code works exactly as I want, but I don't think using 2 queues is a good solution since if the data process is much slower than the population, this method can be very heavy on resources.

I tried using queue.Queue(), and it seems to work well IF I completely populate the queue and, later on, pass it to subprocesses. I read in this case, the subprocesses do not share memory but instead, copy the queue and use it on their own terms. I think this solution is worse than a double queue since there will be an additional queue (1 from the main processes, 2 copied into subprocesses). This also does not let the dynamic population of the queue while the subprocesses are doing their own thing.

PIPE also looks like a way to pass data between subprocess workers. Since the data from the queue is deleted, the other processes can't access it. I may be able to send this data to the other processes using a pipe. But this may require quite a lot of synchronization. If worker-1 lags behind and worker-2 is too far ahead, worker-2 is required to know how many data points worker-1 needs and save it somewhere to pass it to worker-1 when needed. I think is a lackluster solution and may require as much resources as double queue on top of probably much harder to code.

I very recently started coding in Python, so I may be unaware of some concepts that make this problem very easy to solve. Does anyone have any idea how to solve this problem?

0

There are 0 best solutions below