Allow multiple Ray workers to fill a common queue which can be accessed from the main thread

130 Views Asked by At

I have a DataGenerator class

@ray.remote
class DataGenerator:

    def generate_continuously(self):
        while True:
            time.sleep(5)
            data = random.rand()
            # I need data to be put into a queue common to all instances of DataGenerator

From the main script, I instantiate many of them

queue = # Some shared queue 
# Create generator handles and start continuous collection for all
handles = [DataGenerator.remote() for i in range(10)]
ray.wait([handle.generate_continuously.remote() for handle in handles])

# Continuously pop the queue and store locally the results 
all_data = []
while True:
    data = queue.pop_all()
    all_data.extend()
    # do something compute-intensive with all_data

I need each handle to put their result into a common queue that I can repeatedly access from this main script.

What I've tried This is the closest I got to the desired result:

@ray.remote
class DataGenerator:

    def generate(self):
        time.sleep(5)
        data = random.rand()
        return data

N_HANDLES = 10
generator_handles = [DataGenerator.remote() for i in range(N_HANDLES)]
# Map generator handle index to the ref of the object being produced remotely 
handle_idx_to_ref = {idx: generator_handles[idx].remote.generate() for idx in range(N_HANDLES)}
all_data = []
while True:
    for idx, ref in handle_idx_to_ref.items():
        ready_id, not_ready_id = ray.wait([ref], timeout=0)
        if ready_id:
            all_data.extend(ray.get([ready_id]))
            # Start again generation for this worker
            handle_idx_to_ref[idx] = generator_handles[idx].remote.generate() 
    
    # Do something compute-intensive with all_data

This is almost good, but if the compute-intensive operations takes too long, some DataGenerator could be finished and not started again until the next iteration. How can I improve on this code?

2

There are 2 best solutions below

0
lilvagg On

This is the best I could think of .Use SharedQueue actor to manage a list as a queue. DataGenerator actors push data into the queue, and the main script pops all data from it. Remember to use ray.get to get the result of the remote function call.

import ray
import time
import numpy as np

@ray.remote
class SharedQueue:
    def __init__(self):
        self.queue = []

    def push(self, data):
        self.queue.append(data)

    def pop_all(self):
        data = self.queue
        self.queue = []
        return data

@ray.remote
class DataGenerator:
    def __init__(self, queue):
        self.queue = queue

    def generate_continuously(self):
        while True:
            time.sleep(5)
            data = np.random.rand()
            ray.get(self.queue.push.remote(data))

ray.init()

# Create shared queue
queue = SharedQueue.remote()

# Create generator handles and start continuous collection for all
handles = [DataGenerator.remote(queue) for i in range(10)]
[handle.generate_continuously.remote() for handle in handles]

# Continuously pop the queue and store locally the results 
all_data = []
while True:
    data = ray.get(queue.pop_all.remote())
    all_data.extend(data)
    # do something compute-intensive with all_data
0
Federico Taschin On

Turns out I did not need a different architecture. Threaded actors are enough to solve my issue.

In practice, by setting max_concurrency=2 in the following line:

generator_handles = [DataGenerator.remote().options(max_concurrency=2) for i in range(N_HANDLES)]

I allow concurrent operations on the DataGenerator actors. This way, I can have a DataGenerator method that produces data in a while True loop and call another method to retrieve the data generated so far. Complete code:

@ray.remote
class DataGenerator:

    def __init__(self):
        self.data_buffer = []

    def generate(self):
        while True:
            time.sleep(5)
            data = random.rand()
            self.data_buffer.append(data)
    
    def pop_data(self):
        data = self.data_buffer
        self.data_buffer = []
        return data

N_HANDLES = 10
generator_handles = [
    DataGenerator.remote().options(max_concurrency=2) 
    for i in range(N_HANDLES)
]

all_data = []
while True:
    for handle in generator_handles:
        generated_data = ray.get(handle.pop_data.remote())
        all_data.extend(generated_data)
    
    # Do something compute-intensive with all_data

This code is much simpler, doesn't require any shared queue or object ref management, and I can have an actor generating data non-stop with the driver still being able to access it.

Note that without .options(max_concurrency=2) the line generated_data = ray.get(handle.pop_data.remote()) would get stuck as the Actor is busy in the generate() method.