I have a DataGenerator class
@ray.remote
class DataGenerator:
def generate_continuously(self):
while True:
time.sleep(5)
data = random.rand()
# I need data to be put into a queue common to all instances of DataGenerator
From the main script, I instantiate many of them
queue = # Some shared queue
# Create generator handles and start continuous collection for all
handles = [DataGenerator.remote() for i in range(10)]
ray.wait([handle.generate_continuously.remote() for handle in handles])
# Continuously pop the queue and store locally the results
all_data = []
while True:
data = queue.pop_all()
all_data.extend()
# do something compute-intensive with all_data
I need each handle to put their result into a common queue that I can repeatedly access from this main script.
What I've tried This is the closest I got to the desired result:
@ray.remote
class DataGenerator:
def generate(self):
time.sleep(5)
data = random.rand()
return data
N_HANDLES = 10
generator_handles = [DataGenerator.remote() for i in range(N_HANDLES)]
# Map generator handle index to the ref of the object being produced remotely
handle_idx_to_ref = {idx: generator_handles[idx].remote.generate() for idx in range(N_HANDLES)}
all_data = []
while True:
for idx, ref in handle_idx_to_ref.items():
ready_id, not_ready_id = ray.wait([ref], timeout=0)
if ready_id:
all_data.extend(ray.get([ready_id]))
# Start again generation for this worker
handle_idx_to_ref[idx] = generator_handles[idx].remote.generate()
# Do something compute-intensive with all_data
This is almost good, but if the compute-intensive operations takes too long, some DataGenerator could be finished and not started again until the next iteration. How can I improve on this code?
This is the best I could think of .Use SharedQueue actor to manage a list as a queue. DataGenerator actors push data into the queue, and the main script pops all data from it. Remember to use ray.get to get the result of the remote function call.