Acquire data in one process and analyze those data using two separate processes (in python, multiprocessing)

110 Views Asked by At

I aim to acquire data in one process and analyze those data using two separate processes, which will run in parallel.

In the provided minimal example, the initial process generates (every 1 second) data consisting of three arrays: array1, array2, and array3. Subsequently, two additional processes analyze these arrays.

I am seeking confirmation on the correctness of this approach, particularly regarding the analysis of the data. Is it best to analyze the data here :

# This is where I would do some processing on the data

?

from multiprocessing import shared_memory, Process, Lock, Value
import numpy as np
import time


# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, lock, new_value_flag1, new_value_flag2, iteration):
    for i in range(10):
        with lock:
            existing_shm = shared_memory.SharedMemory(name=n)
            np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
            np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
            np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
            np_array1[:] = np.random.randint(0, 1000, np_array1.shape)
            np_array2[:] = np.random.randint(0, 1000, np_array2.shape)
            np_array3[:] = np.random.randint(0, 1000, np_array3.shape)
            existing_shm.close()
            new_value_flag1.value = 1
            new_value_flag2.value = 1
            iteration.value = i
        time.sleep(1)

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, lock, new_value_flag1, iteration):
    while True:
        if new_value_flag1.value == 1:
            with lock:
                print('Start consumer1',time.time())
                existing_shm = shared_memory.SharedMemory(name=n)
                np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4]).copy()
                np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))]).copy()
                np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):]).copy()
                print(f"consumer 1, Iteration {iteration.value}:")
                existing_shm.close()
                new_value_flag1.value = 0
                print('Stop consumer1',time.time())
            # This is where I would do some processing on the data
            print(np_array1.mean())
            print(np_array2.mean())
            print(np_array3.mean())
        time.sleep(0.01)

def consumer2(n, shape1, shape2, shape3, lock, new_value_flag2, iteration):
    while True:
        if new_value_flag2.value == 1:
            with lock:
                print('Start consumer2',time.time())
                existing_shm = shared_memory.SharedMemory(name=n)
                np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4]).copy()
                np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))]).copy()
                np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):]).copy()
                print(f"consumer 2, Iteration {iteration.value}:")
                existing_shm.close()
                new_value_flag2.value = 0
                print('Stop consumer2',time.time())
            # This is where I would do some processing on the data
            print(np_array1.mean())
            print(np_array2.mean())
            print(np_array3.mean())
        time.sleep(0.01)
        
if __name__ == '__main__':
    # assume we have 3 arrays of different sizes (float32)
    shape1 = (2000, 20000)
    shape2 = (2000, 30000)
    shape3 = (2000, 40000)
    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    lock = Lock()
    new_value_flag1 = Value('i', 0)
    new_value_flag2 = Value('i', 0)
    iteration = Value('i', 0)
    
    
    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag1, new_value_flag2, iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag1, iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, lock, new_value_flag2, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()

EDIT Following the first comment, I have a new code :

from multiprocessing import shared_memory, Process, Value
import numpy as np
import time
# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, new_value_flag1, new_value_flag2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    for i in range(100):
        if new_value_flag1.value ==0 and new_value_flag2.value == 0:
            start_time = time.time()
            np_array1[:] = np.random.randint(0, 255, np_array1.shape)
            np_array2[:] = np.random.randint(0, 255, np_array2.shape)
            np_array3[:] = np.random.randint(0, 255, np_array3.shape)
            new_value_flag1.value = 1
            new_value_flag2.value = 1
            iteration.value = i
            print('producer', i, time.time()-start_time)
        time.sleep(0.5)
    existing_shm.close()

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, new_value_flag1, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_value_flag1.value == 1:
            # This is where I would do some processing on the data
            print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_value_flag1.value = 0
        time.sleep(0.01)
    existing_shm.close()

# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, new_value_flag2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_value_flag2.value == 1:
            # This is where I would do some processing on the data
            print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_value_flag2.value = 0
        time.sleep(0.01)
    existing_shm.close()
    
if __name__ == '__main__':
    # assume we have 3 arrays of different sizes
    shape1 = (200, 200)
    shape2 = (200, 300)
    shape3 = (200, 400)
    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    new_value_flag1 = Value('i', 0)
    new_value_flag2 = Value('i', 0)
    iteration = Value('i', 0)
    
    
    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, new_value_flag1, new_value_flag2, iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, new_value_flag1, iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, new_value_flag2, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()

EDIT 2 Without unnecessary mapping and unmapping + using Event

from multiprocessing import shared_memory, Process, Event, Value
import numpy as np
import time

# create a shared memory and write to it (producer)
def producer(n, shape1, shape2, shape3, new_values_event_1, new_values_event_2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    for i in range(100):
        if not new_values_event_1.is_set() and not new_values_event_2.is_set():
            start_time = time.time()
            np_array1[:] = np.random.randint(0, 255, np_array1.shape)
            np_array2[:] = np.random.randint(0, 255, np_array2.shape)
            np_array3[:] = np.random.randint(0, 255, np_array3.shape)
            new_values_event_1.set()
            new_values_event_2.set()
            iteration.value = i
            print('producer', i, time.time()-start_time)
        time.sleep(0.5)
    existing_shm.close()

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, new_values_event_1, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_values_event_1.is_set():
            # This is where I would do some processing on the data
            print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_values_event_1.clear()
        time.sleep(0.01)
    existing_shm.close()

# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, new_values_event_2, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        if new_values_event_2.is_set():
            # This is where I would do some processing on the data
            print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            new_values_event_2.clear()
        time.sleep(0.01)
    existing_shm.close()

if __name__ == '__main__':
    # assume we have 3 arrays of different sizes
    shape1 = (50, 50)
    shape2 = (50, 50)
    shape3 = (50, 50)
    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    new_values_event_1 = Event()
    new_values_event_2 = Event()
    iteration = Value('i', 0)

    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3, new_values_event_1, new_values_event_2, iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, new_values_event_1, iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, new_values_event_2, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()

EDIT 3 Without unnecessary mapping and unmapping + using Event + removing unnecessary polling

from multiprocessing import shared_memory, Process, Event, Value
import numpy as np
import time


# create a shared memory and write to it (producer
def producer(n, shape1, shape2, shape3, event_consumer1, event_consumer2, event_producer, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    for i in range(100):
            event_consumer1.wait()  # Wait until consumer 1 is ready
            event_consumer1.clear()  # Reset the event
            event_consumer2.wait()  # Wait until consumer 2 is ready
            event_consumer2.clear()  # Reset the event
            start_time = time.time()
            np_array1[:] = np.random.randint(0, 255, np_array1.shape)
            np_array2[:] = np.random.randint(0, 255, np_array2.shape)
            np_array3[:] = np.random.randint(0, 255, np_array3.shape)
            iteration.value = i
            print('producer', i, time.time()-start_time)
            event_producer.set()  # Signal the consumers that new data is available
            time.sleep(2) # delay to simulate the time at which the data is produced
    existing_shm.close()

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, event_consumer1, event_producer, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
            event_producer.wait()  # Wait until the producer has produced new data
            event_producer.clear()  # Reset the event
            # This is where I would do some processing on the data
            print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
            time.sleep(0.1) # delay to simulate the time at which the data is processed
            event_consumer1.set()  # Signal the producer that the data has been processed
    existing_shm.close()

# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, event_consumer2, event_producer, iteration):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:np.prod(shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*np.prod(shape1):4*(np.prod(shape1)+np.prod(shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(np.prod(shape1)+np.prod(shape2)):])
    while True:
        event_producer.wait()  # Wait until the producer has produced new data
        event_producer.clear()  # Reset the event
        # This is where I would do some processing on the data
        print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
        time.sleep(0.1) # delay to simulate the time at which the data is processed
        event_consumer2.set()  # Signal the producer that the data has been processed)
    existing_shm.close()

if __name__ == '__main__':
    # assume we have 3 arrays of different sizes
    shape1 = (5000, 50)
    shape2 = (5000, 50)
    shape3 = (5000, 50)


    total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4
    shm = shared_memory.SharedMemory(create=True, size= total_size)
    event_consumer1 = Event()
    event_consumer2 = Event()
    event_consumer1.set()  # Set the event to allow the producer to start
    event_consumer2.set()  # Set the event to allow the producer to start
    event_producer = Event()
    iteration = Value('i', 0)

    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3,  event_consumer1, event_consumer2,event_producer , iteration))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, event_consumer1, event_producer,  iteration))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, event_consumer2, event_producer, iteration))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # I know I have to crtl-c to stop the program
    #shm.close()
    #shm.unlink()
2

There are 2 best solutions below

2
Booboo On BEST ANSWER

Looking at your EDIT 3 code I see a few problems:

  1. On my Windows platform, the creation of shared memory raises an exception because you have total_size = np.prod(shape1)*4 + np.prod(shape2)*4 + np.prod(shape3)*4 and you then pass total_size as the size argument to your call to shared_memory.SharedMemory. But this call requires this argument to be an int rather than an numpy.int32. You should not be using np.prod to multiply together the elements of a tuple. Instead use operator.mul, for example operator.mul(*shape1).
  2. Your program does not terminate because your consumer processes are hanging. You should use an addition shared variable running that the consumers should check to see if the producer is still producing data.
  3. Instead of a single event_producer Event instance that is only used by one of your consumers, you should have one for each consumer.
  4. Your producer function is setting and waiting on events in an illogical order.

I have added comments in the following code tagged with 'Booboo' so you can search for them. Also, I have the producer creating only 3 iterations so that it terminates more quickly:

from multiprocessing import shared_memory, Process, Event, Value
import numpy as np
import time
# Used this to compute the size of shared memory - Booboo:
from operator import mul


# create a shared memory and write to it (producer
def producer(n, shape1, shape2, shape3, event_consumer1, event_consumer2, event_producer1, event_producer2, iteration, running):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:mul(*shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*mul(*shape1):4*(mul(*shape1)+mul(*shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(mul(*shape1)+mul(*shape2)):])
    for i in range(3): # Loop just 3 times - Booboo:
        # Note that the order of setting and waiting has been changed - Booboo:
        start_time = time.time()
        time.sleep(2) # delay to simulate the time at which the data is produced
        np_array1[:] = np.random.randint(0, 255, np_array1.shape)
        np_array2[:] = np.random.randint(0, 255, np_array2.shape)
        np_array3[:] = np.random.randint(0, 255, np_array3.shape)
        iteration.value = i
        print('producer', i, time.time()-start_time)
        event_producer1.set()  # Signal the consumers that new data is available
        event_producer2.set()  # Signal the consumers that new data is available
        event_consumer1.wait()  # Wait until consumer 1 is ready
        event_consumer1.clear()  # Reset the event
        event_consumer2.wait()  # Wait until consumer 2 is ready
        event_consumer2.clear()  # Reset the event

    # Show we are no longer running - Booboo:
    running.value = 0
    # Wake up the consumers - Booboo:
    event_producer1.set()
    event_producer2.set()
    existing_shm.close()

# read from the shared memory using a different process (consumer 1)
def consumer1(n, shape1, shape2, shape3, event_consumer1, event_producer, iteration, running):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:mul(*shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*mul(*shape1):4*(mul(*shape1)+mul(*shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(mul(*shape1)+mul(*shape2)):])
    while True:
        event_producer.wait()  # Wait until the producer has produced new data or no lonfer running - Booboo
        event_producer.clear()  # Reset the event
        if not running.value:
            break
        # This is where I would do some processing on the data
        print('consumer1', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
        time.sleep(0.1) # delay to simulate the time at which the data is processed
        event_consumer1.set()  # Signal the producer that the data has been processed
    existing_shm.close()

# read from the shared memory using a different process (consumer 2)
def consumer2(n, shape1, shape2, shape3, event_consumer2, event_producer, iteration, running):
    existing_shm = shared_memory.SharedMemory(name=n)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:mul(*shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*mul(*shape1):4*(mul(*shape1)+mul(*shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(mul(*shape1)+mul(*shape2)):])
    while True:
        event_producer.wait()  # Wait until the producer has produced new data or no lonfer running - Booboo
        event_producer.clear()  # Reset the event
        if not running.value:
            break
        # This is where I would do some processing on the data
        print('consumer2', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
        time.sleep(0.1) # delay to simulate the time at which the data is processed
        event_consumer2.set()  # Signal the producer that the data has been processed)
    existing_shm.close()

if __name__ == '__main__':
    # assume we have 3 arrays of different sizes
    shape1 = (5000, 50)
    shape2 = (5000, 50)
    shape3 = (5000, 50)


    total_size = mul(*shape1)*4 + mul(*shape2)*4 + mul(*shape3)*4
    shm = shared_memory.SharedMemory(create=True, size=total_size)
    event_consumer1 = Event()
    event_consumer2 = Event()
    event_consumer1.set()  # Set the event to allow the producer to start
    event_consumer2.set()  # Set the event to allow the producer to start
    event_producer1 = Event()
    event_producer2 = Event()
    iteration = Value('i', 0)
    running = Value('i', 1)

    p1 = Process(target=producer, args=(shm.name, shape1, shape2, shape3,  event_consumer1, event_consumer2, event_producer1, event_producer2, iteration, running))
    p2 = Process(target=consumer1, args=(shm.name, shape1, shape2, shape3, event_consumer1, event_producer1,  iteration, running))
    p3 = Process(target=consumer2, args=(shm.name, shape1, shape2, shape3, event_consumer2, event_producer2, iteration, running))

    p2.start()
    p3.start()
    time.sleep(2) # delay to make sure the consumer processes are ready
    p1.start()

    p2.join()
    p3.join()
    p1.join()

    # Didspose of shared memory - Booboo:
    shm.close()
    shm.unlink()

Prints:

producer 0 2.029968023300171
consumer1 0 127.14897 127.03962 127.098465 1706964539.3494873
consumer2 0 127.14897 127.03962 127.098465 1706964539.3494873
producer 1 2.033132553100586
consumer1 1 127.50007 127.03064 127.2023 1706964541.3846073
consumer2 1 127.50007 127.03064 127.2023 1706964541.3846073
producer 2 2.0330991744995117
consumer2 2 126.92322 126.934265 126.91858 1706964543.423699
consumer1 2 126.92322 126.934265 126.91858 1706964543.423699

Can We Do Better?

The problem with this design is what if you have a large number of consumers, for example 10? That's a lot of events that have to be created and set. Instead we will use two multiprocessing.Condition instances as follows:

from multiprocessing import shared_memory, Process, Value, Condition
import numpy as np
import time
# Used this to compute the size of shared memory - Booboo:
from operator import mul


# create a shared memory and write to it (producer
def producer(name, shape1, shape2, shape3, n_consumers, produce_condition, consume_condition, consumed_count, iteration, running):
    existing_shm = shared_memory.SharedMemory(name=name)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:mul(*shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*mul(*shape1):4*(mul(*shape1)+mul(*shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(mul(*shape1)+mul(*shape2)):])
    for i in range(3): # Loop just 3 times
        # Produce data for the next iteration without updating shared memory:
        start_time = time.time()
        time.sleep(2) # delay to simulate the time at which the data is produced
        array1 = np.random.randint(0, 255, np_array1.shape)
        array2 = np.random.randint(0, 255, np_array2.shape)
        array3 = np.random.randint(0, 255, np_array3.shape)

        if i != 0:
            # Wait for prior data to be consumed before we can update shared memory:
            with produce_condition:
                produce_condition.wait_for(lambda: consumed_count.value == n_consumers)
            consumed_count.value = 0  # reset for next time

        # Update shared memory with already computed data:
        np_array1[:] = array1
        np_array2[:] = array2
        np_array3[:] = array3
        print('producer', i, time.time()-start_time)

        with consume_condition:
            iteration.value = i
            # Tell consumers there is new data:
            consume_condition.notify_all()

    # Show we are no loner running:
    with consume_condition:
        running.value = 0
        consume_condition.notify_all()

    existing_shm.close()

# read from the shared memory using a different process:
def consumer(id, name, shape1, shape2, shape3, produce_condition, consume_condition, consumed_count, iteration, running):
    existing_shm = shared_memory.SharedMemory(name=name)
    np_array1 = np.ndarray(shape1, dtype=np.float32, buffer=existing_shm.buf[:mul(*shape1)*4])
    np_array2 = np.ndarray(shape2, dtype=np.float32, buffer=existing_shm.buf[4*mul(*shape1):4*(mul(*shape1)+mul(*shape2))])
    np_array3 = np.ndarray(shape3, dtype=np.float32, buffer=existing_shm.buf[4*(mul(*shape1)+mul(*shape2)):])

    next_iteration = -1
    while True:
        next_iteration += 1 # What we expect the next iteration value to be:
        with consume_condition:
            # Wait for either the producer havinn terminated or a new iteration is available:
            consume_condition.wait_for(
                lambda: not running.value or iteration.value == next_iteration
            )

        if iteration.value != next_iteration:
            # Running.value must be 0 but we do not check this because there can still be
            # new data we haven't processed even though the producer is no longer running,
            # So as long as iteration.value is the next iteration expected we have
            # new data to process.
            break

        print(f'consumer{id}', iteration.value, np_array1.mean(), np_array2.mean(), np_array3.mean(), time.time())
        time.sleep(1.5) # delay to simulate the time at which the data is processed

        # Show we have consumed the data:
        with produce_condition:
            consumed_count.value += 1
            produce_condition.notify(1)

    existing_shm.close()


if __name__ == '__main__':
    # assume we have 3 arrays of different sizes
    shape1 = (5000, 50)
    shape2 = (5000, 50)
    shape3 = (5000, 50)


    total_size = mul(*shape1)*4 + mul(*shape2)*4 + mul(*shape3)*4
    shm = shared_memory.SharedMemory(create=True, size=total_size)

    N_CONSUMERS = 10

    produce_condition = Condition()
    consume_condition = Condition()

    consumed_count = Value('i', 0)

    iteration = Value('i', -1)
    running = Value('i', 1)

    processes = []

    p = Process(target=producer, args=(shm.name, shape1, shape2, shape3, N_CONSUMERS, produce_condition, consume_condition, consumed_count, iteration, running))
    processes.append(p)

    for id in range(N_CONSUMERS):
        processes.append(Process(target=consumer, args=(id, shm.name, shape1, shape2, shape3, produce_condition, consume_condition, consumed_count, iteration, running)))

    for p in processes:
        p.start()

    t = time.time()

    for p in processes:
        p.join()

    print('Total time:', time.time() - t)
    # Didspose of shared memory:
    shm.close()
    shm.unlink()

Prints:

producer 0 2.0170440673828125
consumer1 0 126.91085 127.04398 126.95515 1706970488.860498
consumer9 0 126.91085 127.04398 126.95515 1706970488.860498
consumer5 0 126.91085 127.04398 126.95515 1706970488.860498
consumer2 0 126.91085 127.04398 126.95515 1706970488.860498
consumer8 0 126.91085 127.04398 126.95515 1706970488.860498
consumer0 0 126.91085 127.04398 126.95515 1706970488.860498
consumer4 0 126.91085 127.04398 126.95515 1706970488.860498
consumer3 0 126.91085 127.04398 126.95515 1706970488.860498
consumer7 0 126.91085 127.04398 126.95515 1706970488.860498
consumer6 0 126.91085 127.04398 126.95515 1706970488.860498
producer 1 2.000509023666382
consumer9 1 126.91934 127.10621 127.05637 1706970490.8453243
consumer0 1 126.91934 127.10621 127.05637 1706970490.8453243
consumer1 1 126.91934 127.10621 127.05637 1706970490.8453243
consumer8 1 126.91934 127.10621 127.05637 1706970490.8453243
consumer2 1 126.91934 127.10621 127.05637 1706970490.8453243
consumer5 1 126.91934 127.10621 127.05637 1706970490.8453243
consumer4 1 126.91934 127.10621 127.05637 1706970490.8453243
consumer3 1 126.91934 127.10621 127.05637 1706970490.8453243
consumer7 1 126.91934 127.10621 127.05637 1706970490.8453243
consumer6 1 126.91934 127.10621 127.05637 1706970490.8453243
producer 2 2.016110420227051
consumer9 2 127.029526 126.93016 126.82547 1706970492.8614347
consumer8 2 127.029526 126.93016 126.82547 1706970492.8614347
consumer2 2 127.029526 126.93016 126.82547 1706970492.8614347
consumer0 2 127.029526 126.93016 126.82547 1706970492.8614347
consumer6 2 127.029526 126.93016 126.82547 1706970492.8614347
consumer1 2 127.029526 126.93016 126.82547 1706970492.8614347
consumer3 2 127.029526 126.93016 126.82547 1706970492.8614347
consumer5 2 127.029526 126.93016 126.82547 1706970492.8614347
consumer7 2 127.029526 126.93016 126.82547 1706970492.8614347
consumer4 2 127.029526 126.93016 126.82547 1706970492.8614347
Total time: 8.093537330627441

Update

I have modified the above code that uses conditions instead of events so that there is greater overlap in processing among the producers and consumers. Before, the producer started to produce the next iteration only after the consumers had all finished processing the current iteration. Now the producer can do all the work necessary to create the new array values for the next iteration in parallel with the consumers consuming the current values and waits for the consumers to finish their processing before updating the shared arrays with the new data. I have increased the consumption time to 1.5 seconds. In the previous version this would have added several seconds to the total running time to process 3 iterations.

3
RaJa On

Shared memory is one approach. But I would likely use queues to communicate with the consumer processes. That way, you don't need any flags to sync the consumers. These consumers would idle until new data is available. It also allows to send the results of the analysis back to the producer (if required).

Check to python documentation for an example how to use queues: https://docs.python.org/3/library/multiprocessing.html#examples

I would use a separate queue from the producer to each consumer. Otherwise you could put your 3 arrays into a tuple and add the tuple twice to the a queue both consumers have access to. Assuming that the analysis takes longer, it's likely that the subsequent tuples are processed by different consumers.