main.py
import asyncio
import signal
from nats.aio.client import Client as NATS
from client.nats_client import NATSClient
from factory.logger import LoggerFactory
from helpers.environment import get_log_level
log = LoggerFactory.get_logger(__name__, log_level=get_log_level())
if __name__ == "__main__":
"""Using a, low-level asyncio API, to run the CustomClient coroutines"""
client = NATSClient(nc=NATS())
event_loop = asyncio.get_event_loop()
event_loop.create_task(client.start())
try:
event_loop.run_forever()
except KeyboardInterrupt: # CTRL-C was pressed by the user
log.info("CTRL-C was pressed!")
except asyncio.CancelledError:
log.info("cancelled")
finally:
event_loop.close()
client.py
from factory.logger import LoggerFactory
from helpers.environment import (
get_log_level,
get_nats_connection,
get_nats_queue,
get_nats_subscription_subject,
)
from client.callbacks import closed_cb, disconnected_cb, error_cb, reconnected_cb
from client.handlers import message_handler
log = LoggerFactory.get_logger(__name__, log_level=get_log_level())
class NATSClient:
def __init__(self, nc):
"""_summary_
Args:
nc (_type_): _description_
"""
self.nc = nc
async def start(self):
"""_summary_"""
try:
servers = get_nats_connection()
log.info(f"NATS client connection attempt to: {servers} ...")
# Setting explicit callbacks and list of servers.
await self.nc.connect(
servers=servers,
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
error_cb=error_cb,
closed_cb=closed_cb,
)
except Exception as e:
log.error(f"Encountered an error connecting client to server, {e}")
client = self.nc
if client.is_connected:
log.info("NATS client successfully connected to server!")
try:
# Subscription using a queue so that only a single subscriber
await client.subscribe(
subject=get_nats_subscription_subject(),
queue=get_nats_queue(),
cb=message_handler,
)
json.loads(on_purpose) # throw exception on purpose
except Exception as e:
log.info("Encountered error during message handling!", e)
await client.flush(1)
The issue I am having is being able to, gracefully, exit out of the program if I manually press ctrl+c, and re-enter the loop for handling messages if something happens during the polling process on the NATS Queue messages are not processed, and I have to manually exit the code with ctrl+c.
I have also encountered other errors such as:
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000259F51E3550>
Traceback (most recent call last):
line 116, in __del__
self.close()
line 108, in close
self._loop.call_soon(self._call_connection_lost, None)
line 746, in call_soon
self._check_closed()
line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.681 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<Client._read_loop() done, defined at \.venv\lib\site-packages\nats\aio\client.py:1908> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F53B0610>()]>>
2022-04-13 09:49:55.682 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<Client._ping_interval() done, defined at .venv\lib\site-packages\nats\aio\client.py:1891> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F5395E20>()]>>
Exception ignored in: <coroutine object Client._flusher at 0x00000259F53C28C0>
Traceback (most recent call last):
line 1873, in _flusher
future: asyncio.Future = await self._flush_queue.get()
line 168, in get
getter.cancel() # Just in case getter is not done yet.
line 746, in call_soon
self._check_closed()
line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.686 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-7' coro=<Client._flusher() done, defined at .venv\lib\site-packages\nats\aio\client.py:1861> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object Subscription._wait_for_msgs at 0x00000259F53C24C0>
Traceback (most recent call last):
line 271, in _wait_for_msgs
msg = await self._pending_queue.get()
line 168, in get
getter.cancel() # Just in case getter is not done yet.
line 746, in call_soon
self._check_closed()
line 510, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.689 1738 default_exception_handler ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<Subscription._wait_for_msgs() done, defined at .venv\lib\site-packages\nats\aio\subscription.py:262> wait_for=<Future cancelled>>
Any help is greatly appreciated in understanding this async concept and error handling.