run_in_executor causes a TimeoutError that was never retrieved

30 Views Asked by At

I've created a program that processes a stream of data coming from a device and writes it to a websocket for consumption by a web app. The library I've written to read from said device and yield its computed values is a synchronous (ergo blocking) library. I don't want to re-write it in async, so I am using the asyncio run_in_executor function to run it in a thread (to be precise: I'm using tornado because the program will also accept web requests).

While the code works, I get frequent errors saying that Future exception was never retrieved. This exception is a Timeout error related to the code for running the blocking function in an executor (error below code block).

Note that I could not properly run the function in the executor without setting the asyncio event policy to tornado.platform.asyncio.AnyThreadEventLoopPolicy(). If I do not do that, I am constantly getting the error that "there is no current event loop in ThreadExecutor", and I've not found a way around it.

class ClientHandler(tornado.websocket.WebSocketHandler):
    clients = set()

    def __init__(self, *args, **kwargs):
        self.config = kwargs.pop("config")
        super().__init__(*args, **kwargs)
        self.started = False

    def on_message(self, message):
        if message == "data":
            WebClientStreamHandler.clients.add(self)

            self.start_client()

    def write_data(self, message: str):
        for client in WebClientStreamHandler.clients:
            client.write_message(message)


    def start_client(self):
        if self.started:
            return
        
        asyncio.set_event_loop_policy(tornado.platform.asyncio.AnyThreadEventLoopPolicy())
        sync_func = partial(run_client_forever, self.config)
        loop = tornado.ioloop.IOLoop.current()
        loop.run_in_executor(None, sync_func)
        self.started = True

ERROR (abridged):

ERROR:asyncio:Future exception was never retrieved
future: <Future finished exception=TimeoutError('timed out') created at /usr/lib/python3.11/asyncio/base_events.py:427>
Traceback:
  ...
  File "/path/src/cps_demo/web_stream_handler.py", line 118, in start_client
    loop.run_in_executor(None, sync_func)
  File "/path/venv/lib/python3.11/site-packages/tornado/platform/asyncio.py", line 266, in run_in_executor
    return self.asyncio_loop.run_in_executor(executor, func, *args)
  File "/usr/lib/python3.11/asyncio/base_events.py", line 828, in run_in_executor
    return futures.wrap_future(
  File "/usr/lib/python3.11/asyncio/futures.py", line 417, in wrap_future
    new_future = loop.create_future()
  File "/usr/lib/python3.11/asyncio/base_events.py", line 427, in create_future
    return futures.Future(loop=self)
TimeoutError: timed out
1

There are 1 best solutions below

0
bluppfisk On

I figured it out. The exception would only be raised when a client disconnected and re-connected.

Basically, the started variable is an instance variable, but since tornado starts a new instance of the handler on every connection, started was set to False, causing the client to start a second time.

Since the device only supports a single TCP connection at a time, this second client eventually timed out, causing exception.

Making the started a class variable along with clients solved the problem.

I should probably wrap this function in a Singleton.