I'm using rethinkdb's pub/sub system in python app (rethinkdb==2.4.9).
Problem is that I cannot maintain reliable connection, meaning that if there is any connection loss between subscriber and server there is no way to detect it.
Checking conn.is_open() is useless since it always returns True even if I drop tcp connections on my firewall.
So, my main problem is keeping connection alive.
This is my subscriber class:
class MyExchanger:
def __init__(
self,
topic: str,
connection_args: dict,
q: queue.Queue,
self_destruct: threading.Event,
**kwargs,
):
self.name = connection_args.pop("name", None)
self.connection_args = connection_args
self.destruct = self_destruct
self.watcher_period = kwargs.get("watcher_period", 30)
self.exchanger = Exchange(name=self.name, **connection_args)
self.conn = self.exchanger.conn
self.r = self.exchanger.r
self.conn.open = True
self.logger = exchanger_logger
self.q = q
self.topic = topic
self.msg_ack = api_client.msg_ack
self.watcher = threading.Thread(target=self._conn_watcher)
self.listener = threading.Thread(target=self._handle_messages)
# self.connect()
def subscribe(self):
filter = lambda topic: topic.match(self.topic)
return self.exchanger.queue(filter)
def _conn_watcher(self):
while not self.destruct.is_set():
if not self.conn.open:
print(f"{self.name} for {self.topic} is closed. Trying to reconnect")
self.logger.info(
f"{self.name} for {self.topic} is closed. Trying to reconnect"
)
try:
self.connect()
self.start_exchanger()
print(f"{self.name} for {self.topic} recconnected successfully")
self.logger.info(
f"{self.name} for {self.topic} recconnected successfully"
)
except Exception as e:
print(f"{self.name} for {self.topic} error reconnecting: {repr(e)}")
self.logger.info(
f"{self.name} for {self.topic} error reconnecting: {repr(e)}"
)
else:
print(f"{self.name} for {self.topic} connection is open")
self.logger.info(f"{self.name} for {self.topic} is open")
self.destruct.wait(self.watcher_period)
def connect(self):
self.exchanger = Exchange(name=self.name, **self.connection_args)
self.conn = self.exchanger.conn
self.conn.open = True
def _handle_messages(self):
subscription = self.subscribe()
print(f"{self.name} subscribed to topic: {self.topic}")
for _, msg in subscription.subscription():
print(f"message from {self.name}: {msg}")
self.logger.info(f"message from {self.name}: {msg}")
try:
self.msg_ack(msg["id"])
try:
self.q.put_nowait(json.loads(msg["msg"]))
except queue.Full:
pass
# self.add_to_queue(json.loads(msg["msg"]))
except Exception as err:
print(f"error processing message from {self.name}: {repr(err)}")
self.logger.error(
f"error processing message from {self.name}: {repr(err)}"
)