Rethinkdb pub/sub python client connection persistance

34 Views Asked by At

I'm using rethinkdb's pub/sub system in python app (rethinkdb==2.4.9).
Problem is that I cannot maintain reliable connection, meaning that if there is any connection loss between subscriber and server there is no way to detect it.
Checking conn.is_open() is useless since it always returns True even if I drop tcp connections on my firewall.
So, my main problem is keeping connection alive.
This is my subscriber class:

class MyExchanger:
 def __init__(
    self,
    topic: str,
    connection_args: dict,
    q: queue.Queue,
    self_destruct: threading.Event,
    **kwargs,
):
    self.name = connection_args.pop("name", None)
    self.connection_args = connection_args
    self.destruct = self_destruct
    self.watcher_period = kwargs.get("watcher_period", 30)
    self.exchanger = Exchange(name=self.name, **connection_args)
    self.conn = self.exchanger.conn
    self.r = self.exchanger.r
    self.conn.open = True
    self.logger = exchanger_logger
    self.q = q
    self.topic = topic
    self.msg_ack = api_client.msg_ack
    self.watcher = threading.Thread(target=self._conn_watcher)
    self.listener = threading.Thread(target=self._handle_messages)
    # self.connect()

 def subscribe(self):
    filter = lambda topic: topic.match(self.topic)
    return self.exchanger.queue(filter)

 def _conn_watcher(self):
    while not self.destruct.is_set():
        if not self.conn.open:
            print(f"{self.name} for {self.topic} is closed. Trying to reconnect")
            self.logger.info(
                f"{self.name} for {self.topic} is closed. Trying to reconnect"
            )
            try:
                self.connect()
                self.start_exchanger()
                print(f"{self.name} for {self.topic} recconnected successfully")
                self.logger.info(
                    f"{self.name} for {self.topic} recconnected successfully"
                )
            except Exception as e:
                print(f"{self.name} for {self.topic} error reconnecting: {repr(e)}")
                self.logger.info(
                    f"{self.name} for {self.topic} error reconnecting: {repr(e)}"
                )
        else:
            print(f"{self.name} for {self.topic} connection is open")
            self.logger.info(f"{self.name} for {self.topic} is open")

        self.destruct.wait(self.watcher_period)

 def connect(self):
    self.exchanger = Exchange(name=self.name, **self.connection_args)
    self.conn = self.exchanger.conn
    self.conn.open = True

def _handle_messages(self):
    subscription = self.subscribe()
    print(f"{self.name} subscribed to topic: {self.topic}")
    for _, msg in subscription.subscription():
        print(f"message from {self.name}: {msg}")
        self.logger.info(f"message from {self.name}: {msg}")
        try:
            self.msg_ack(msg["id"])
            try:
                self.q.put_nowait(json.loads(msg["msg"]))
            except queue.Full:
                pass
            # self.add_to_queue(json.loads(msg["msg"]))
        except Exception as err:
            print(f"error processing message from {self.name}: {repr(err)}")
            self.logger.error(
                f"error processing message from {self.name}: {repr(err)}"
            )
0

There are 0 best solutions below