I am using a multiprocessing queue with one producer and 2 consumers. Below is a minimal example
from multiprocessing import Process, JoinableQueue
import time
import os
import numpy as np
q = JoinableQueue(maxsize=2)
def producer():
for i in range (500):
# put 2 random numbers into the queue
r1 = np.random.randint(1, 100)
q.put(r1)
r2 = np.random.randint(1, 100)
q.put(r2)
print('New Producer', r1, r2, time.ctime())
i = i + 1
time.sleep(5)
def worker1():
while True:
item = q.get()
pid = os.getpid()
print(f'pid {pid} Working on {item}')
print(f'pid {pid} Finished {item}')
def worker2():
while True:
item = q.get()
pid = os.getpid()
print(f'pid {pid} Working on {item}')
print(f'pid {pid} Finished {item}')
Process(target=worker1, daemon=True).start()
Process(target=worker2, daemon=True).start()
p = Process(target=producer)
p.start()
p.join()
q.join()
Which gives
New Producer 16 10 Wed Mar 20 12:03:08 2024
pid 25621 Working on 16
pid 25621 Finished 16
pid 25622 Working on 10
pid 25622 Finished 10
New Producer 97 34 Wed Mar 20 12:03:13 2024
pid 25621 Working on 97
pid 25621 Finished 97
pid 25622 Working on 34
pid 25622 Finished 34
...
If I just modify worker1 like:
def worker1():
while True:
time.sleep(500)
Then the second worker retrieves both numbers:
New Producer 9 7 Wed Mar 20 12:05:18 2024
pid 25812 Working on 9
pid 25812 Finished 9
pid 25812 Working on 7
pid 25812 Finished 7
New Producer 91 27 Wed Mar 20 12:05:23 2024
pid 25812 Working on 91
pid 25812 Finished 91
pid 25812 Working on 27
pid 25812 Finished 27
Is There a way to prevent this and basically block the producer if there has been a problem with worker1 ?