How do you properly handle exceptions for asynchronous code in Python for NATS?

598 Views Asked by At

main.py

import asyncio
import signal

from nats.aio.client import Client as NATS

from client.nats_client import NATSClient
from factory.logger import LoggerFactory
from helpers.environment import get_log_level

log = LoggerFactory.get_logger(__name__, log_level=get_log_level())


if __name__ == "__main__":
    """Using a, low-level asyncio API, to run the CustomClient coroutines"""
    client = NATSClient(nc=NATS())
    event_loop = asyncio.get_event_loop()
    event_loop.create_task(client.start())
    try:
        event_loop.run_forever()
    except KeyboardInterrupt:  # CTRL-C was pressed by the user
        log.info("CTRL-C was pressed!")
    except asyncio.CancelledError:
        log.info("cancelled")
    finally:
        event_loop.close()

client.py

from factory.logger import LoggerFactory
from helpers.environment import (
    get_log_level,
    get_nats_connection,
    get_nats_queue,
    get_nats_subscription_subject,
)

from client.callbacks import closed_cb, disconnected_cb, error_cb, reconnected_cb
from client.handlers import message_handler

log = LoggerFactory.get_logger(__name__, log_level=get_log_level())


class NATSClient:
    def __init__(self, nc):
        """_summary_

        Args:
            nc (_type_): _description_
        """
        self.nc = nc

    async def start(self):
        """_summary_"""
        try:
            servers = get_nats_connection()
            log.info(f"NATS client connection attempt to: {servers} ...")
            # Setting explicit callbacks and list of servers.
            await self.nc.connect(
                servers=servers,
                reconnected_cb=reconnected_cb,
                disconnected_cb=disconnected_cb,
                error_cb=error_cb,
                closed_cb=closed_cb,
            )
        except Exception as e:
            log.error(f"Encountered an error connecting client to server, {e}")

        client = self.nc

        if client.is_connected:
            log.info("NATS client successfully connected to server!")

            try:
                # Subscription using a queue so that only a single subscriber
                await client.subscribe(
                    subject=get_nats_subscription_subject(),
                    queue=get_nats_queue(),
                    cb=message_handler,
                )
                json.loads(on_purpose)  # throw exception on purpose
            except Exception as e:
                log.info("Encountered error during message handling!", e)

            await client.flush(1)

The issue I am having is being able to, gracefully, exit out of the program if I manually press ctrl+c, and re-enter the loop for handling messages if something happens during the polling process on the NATS Queue messages are not processed, and I have to manually exit the code with ctrl+c.

I have also encountered other errors such as:

    Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000259F51E3550>
Traceback (most recent call last):
  line 116, in __del__
    self.close()
  line 108, in close
    self._loop.call_soon(self._call_connection_lost, None)
  line 746, in call_soon
    self._check_closed()
  line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.681 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-5' coro=<Client._read_loop() done, defined at \.venv\lib\site-packages\nats\aio\client.py:1908> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F53B0610>()]>>
2022-04-13 09:49:55.682 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-6' coro=<Client._ping_interval() done, defined at .venv\lib\site-packages\nats\aio\client.py:1891> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x00000259F5395E20>()]>>
Exception ignored in: <coroutine object Client._flusher at 0x00000259F53C28C0>
Traceback (most recent call last):
  line 1873, in _flusher
    future: asyncio.Future = await self._flush_queue.get()
  line 168, in get
    getter.cancel()  # Just in case getter is not done yet.
  line 746, in call_soon
    self._check_closed()
  line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.686 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-7' coro=<Client._flusher() done, defined at .venv\lib\site-packages\nats\aio\client.py:1861> wait_for=<Future cancelled>>
Exception ignored in: <coroutine object Subscription._wait_for_msgs at 0x00000259F53C24C0>
Traceback (most recent call last):
  line 271, in _wait_for_msgs
    msg = await self._pending_queue.get()
  line 168, in get
    getter.cancel()  # Just in case getter is not done yet.
  line 746, in call_soon
    self._check_closed()
  line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2022-04-13 09:49:55.689 1738 default_exception_handler ERROR  Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<Subscription._wait_for_msgs() done, defined at .venv\lib\site-packages\nats\aio\subscription.py:262> wait_for=<Future cancelled>>

Any help is greatly appreciated in understanding this async concept and error handling.

0

There are 0 best solutions below