Blocking the producer in multiprocessing Python queues

32 Views Asked by At

I am using a multiprocessing queue with one producer and 2 consumers. Below is a minimal example

from multiprocessing import Process, JoinableQueue
import time
import os
import numpy as np
q = JoinableQueue(maxsize=2)

def producer():
    for i in range (500):
        # put 2 random numbers into the queue
        r1 = np.random.randint(1, 100)
        q.put(r1)
        r2 = np.random.randint(1, 100)
        q.put(r2)
        print('New Producer', r1, r2, time.ctime())
        i = i + 1
        time.sleep(5)


def worker1():
    while True:
        item = q.get()
        pid = os.getpid()
        print(f'pid {pid} Working on {item}')
        print(f'pid {pid} Finished {item}')

def worker2():
    while True:
        item = q.get()
        pid = os.getpid()
        print(f'pid {pid} Working on {item}')
        print(f'pid {pid} Finished {item}')


Process(target=worker1, daemon=True).start()
Process(target=worker2, daemon=True).start()


p = Process(target=producer)
p.start()
p.join()

q.join()

Which gives

New Producer 16 10 Wed Mar 20 12:03:08 2024
pid 25621 Working on 16
pid 25621 Finished 16
pid 25622 Working on 10
pid 25622 Finished 10
New Producer 97 34 Wed Mar 20 12:03:13 2024
pid 25621 Working on 97
pid 25621 Finished 97
pid 25622 Working on 34
pid 25622 Finished 34
...

If I just modify worker1 like:

def worker1():
    while True:
        time.sleep(500)

Then the second worker retrieves both numbers:


New Producer 9 7 Wed Mar 20 12:05:18 2024
pid 25812 Working on 9
pid 25812 Finished 9
pid 25812 Working on 7
pid 25812 Finished 7
New Producer 91 27 Wed Mar 20 12:05:23 2024
pid 25812 Working on 91
pid 25812 Finished 91
pid 25812 Working on 27
pid 25812 Finished 27

Is There a way to prevent this and basically block the producer if there has been a problem with worker1 ?

0

There are 0 best solutions below