I'm looking for a way to effectively communicate between a daemon (Main
program) and other processes (bots). The difficulty here is that, I need to be able to have bots drop in and out of this path of communication at any time.
Thus far, I used Unix domain sockets
to communicate between processes. This worked well enough while I had one-on-one communication. In my new set up, I'd like to not rely on them - It could be any number n
of bots
sending data, and they are not syncing their transmissions.
Logically, I went to queue.Queue
to fix that. Having a single Queue
object which main
can steadily pull from is exactly what I believe I need.
The problem with that is, that I need the bots
to be able to drop in and out of my communication network, so I way to pass my Queue
to a newly registered process is needed.
I tried using a UDS
to distribute my Queue
to any process that hooks into main
, but that's not an option since Queue
isn't pickable. Additionally, accept()
doesn't appear to work with a UDS
either (which makes sense, it being a file-like and all).
Code:
class QueueDistributor(threading.Thread):
def __init__(self, q, addr, name='Queue Distributor'):
""" constructor, setting initial variables """
self._stopevent = threading.Event()
self.q = q
self.addr = addr
threading.Thread.__init__(self, name=name)
def run(self):
""" main control loop """
if os.path.exists(self.addr):
os.remove(self.addr)
sock = socket(AF_UNIX, SOCK_STREAM)
sock.settimeout(1)
sock.bind(self.addr)
sock.listen(1)
while not self._stopevent.isSet():
try:
client, addr = sock.accept()
try:
client.send(self.q)
except timeout:
continue
except Exception:
traceback.print_exc(file=sys.stdout)
except timeout:
continue
except Exception:
traceback.print_exc(file=sys.stdout)
sock.close()
def join(self, timeout=None):
""" Stop the thread. """
self._stopevent.set()
threading.Thread.join(self, timeout)
class Homer:
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null',
stderr='/dev/null'):
self.pidfile = pidfile
self.redirects = stdin, stdout, stderr
self.address = '/tmp/homer.uds'
self.running = False
self.q = Queue()
self.q_distributor = QueueDistributor(self.q, self.address)
# Open up file descriptors with zero buffering
self.trades = open('/tmp/trades.csv', 'ab', buffering=0)
self.tickers = open('/tmp/tickers.csv', 'ab', buffering=0)
self.books = open('/tmp/books.csv', 'ab', buffering=0)
# ...
def handle_data(self, data):
...
def run(self):
self.running = True
# Launch queue distribution Thread
self.q_distributor.start()
while self.running:
try:
data = self.q.get(timeout=0.1)
self.handle_data(data)
except Empty:
continue
except NotImplementedError as e:
log.error(e)
continue
# Shutdown Queue Distributor
self.q_distributor.join()
The obvious alternative would be to receive data via TCP
socket, drop it in a Queue
and work from there, but is that the only possibility here?
I have seen multiprocessing.connection.Listener
, but I assume that's just about the same as the homebrew TCP
solution, no?