Passing a Queue to Dynamically connecting Processes

43 Views Asked by At

I'm looking for a way to effectively communicate between a daemon (Main program) and other processes (bots). The difficulty here is that, I need to be able to have bots drop in and out of this path of communication at any time.

Thus far, I used Unix domain sockets to communicate between processes. This worked well enough while I had one-on-one communication. In my new set up, I'd like to not rely on them - It could be any number n of bots sending data, and they are not syncing their transmissions.

Logically, I went to queue.Queue to fix that. Having a single Queue object which main can steadily pull from is exactly what I believe I need.

The problem with that is, that I need the bots to be able to drop in and out of my communication network, so I way to pass my Queue to a newly registered process is needed.

I tried using a UDS to distribute my Queue to any process that hooks into main, but that's not an option since Queue isn't pickable. Additionally, accept() doesn't appear to work with a UDS either (which makes sense, it being a file-like and all).

Code:

class QueueDistributor(threading.Thread):

    def __init__(self, q, addr, name='Queue Distributor'):
        """ constructor, setting initial variables """
        self._stopevent = threading.Event()
        self.q = q
        self.addr = addr
        threading.Thread.__init__(self, name=name)

    def run(self):
        """ main control loop """

        if os.path.exists(self.addr):
            os.remove(self.addr)
        sock = socket(AF_UNIX, SOCK_STREAM)
        sock.settimeout(1)
        sock.bind(self.addr)
        sock.listen(1)

        while not self._stopevent.isSet():
            try:
                client, addr = sock.accept()
                try:
                    client.send(self.q)
                except timeout:
                    continue
                except Exception:
                    traceback.print_exc(file=sys.stdout)

            except timeout:
                continue
            except Exception:
                traceback.print_exc(file=sys.stdout)
        sock.close()

    def join(self, timeout=None):
        """ Stop the thread. """
        self._stopevent.set()
        threading.Thread.join(self, timeout)


class Homer:

    def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null',
                 stderr='/dev/null'):
        self.pidfile = pidfile
        self.redirects = stdin, stdout, stderr
        self.address = '/tmp/homer.uds'
        self.running = False
        self.q = Queue()
        self.q_distributor = QueueDistributor(self.q, self.address)

        # Open up file descriptors with zero buffering
        self.trades = open('/tmp/trades.csv', 'ab', buffering=0)
        self.tickers = open('/tmp/tickers.csv', 'ab', buffering=0)
        self.books = open('/tmp/books.csv', 'ab', buffering=0)

    #  ...

    def handle_data(self, data):
        ...

    def run(self):
        self.running = True

        # Launch queue distribution Thread
        self.q_distributor.start()

        while self.running:
            try:
                data = self.q.get(timeout=0.1)
                self.handle_data(data)
            except Empty:
                continue
            except NotImplementedError as e:
                log.error(e)
                continue

        # Shutdown Queue Distributor
        self.q_distributor.join()

The obvious alternative would be to receive data via TCP socket, drop it in a Queue and work from there, but is that the only possibility here?

I have seen multiprocessing.connection.Listener, but I assume that's just about the same as the homebrew TCP solution, no?

0

There are 0 best solutions below