Started a Twisted Web Socket Client reactor from within an H2o.ai Wave app

49 Views Asked by At

I am currently working on a Wave dashboard app, which shows stats and insights real-time. The process is somewhat like this:

  1. Authenticate using a login Form.
  2. On successful login, Connect to a Web Socket URL for receiving data.
  3. On receiving the data, update on the dashboard.

Now I am using AutoBahn Websockets with a Twisted Approach for a Web Socket Client to connect to the server URL. I am facing two issues here:

  1. Starting the Web Socket Client asynchronously using twisted's reactor.deferToThread() seems to block running any other method after that. Please note that this approach works outside of a Wave Application.
  2. I also tried starting a asyncio.ensure_future() in the callback of the WebSocketClientProtocol so as to update from there when data is received but I am struggling with that as well. So as a workaround, I decided to dump the receiving data in a Queue to be picked up by an independent polling_function() loop which is started after the web socket client, which will update the data on the dashboard. But since the socket reactor is blocking, this doesnt seem to work as well.

I have attached the code for your reference. Any inputs will be appreciated!

from twisted.internet import reactor,threads
from autobahn.twisted.websocket import WebSocketClientFactory,WebSocketClientProtocol

_EngineData=Queue()
    

class EngineClientProtocol(WebSocketClientProtocol):
    _factory=list()
    _q=Q

    @classmethod
    def relay_message_to_engine(cls, data):
        for c in set(cls._factory):
            reactor.callFromThread(cls.sendMessage, c, data)
        logging.info("Relaying Message To Engine")

    @classmethod
    def set_query_context(cls,q:Q):
        cls._q=q

    def onConnect(self, response):
        self._factory.append(self)
        print("Engine Server connected: {0}".format(response.peer))
        logging.info("Engine Server connected: {0}".format(response.peer))
 
    def onConnecting(self, transport_details):
        logging.info("Connecting transport details: {}".format(transport_details))
    
    def onOpen(self):
        logging.info("WebSocket connection open.")
        print("WebSocket connection open.")
        self.sendMessage("CONNECT|admin".encode('utf8'))

    def onMessage(self, payload, isBinary):
        try:
            _message=payload.decode('utf8')
            fields=_message.split('|')
            flag=fields[0]
            logging.info(f"Received {flag} from ManagerServer")
            print(f"Received {flag} from ManagerServer")
            match flag:
                case "USERINFO":
                    try:
                        info=json.loads(fields[1])
                        _EngineData.put_nowait(info)
                        print(_EngineData.qsize())o
                    except Exception as e:
                        print('Error Occurred')
                        type_, value_, traceback_ = sys.exc_info()
                        stack_trace = traceback.format_exception(type_, value_, traceback_)
                        logging.error(stack_trace)
        except Exception as e:
            type_, value_, traceback_ = sys.exc_info()
            stack_trace = traceback.format_exception(type_, value_, traceback_)
            logging.error(stack_trace)
    
    def onClose(self, wasClean, code, reason):
        self._factory.remove(self)
        logging.info("WebSocket connection closed: {0}  WasClean {1} Code {2}".format(reason,wasClean,code))


def StartWebSocketClient(q:Q):
    try:
        logging.info("Starting web socket client")
        reactor.connectTCP(q.client._host, q.client._port, q.client.engine_client) #05-10-23
        reactor.run(installSignalHandlers=False)
    except Exception as e:
        type_, value_, traceback_ = sys.exc_info()
        stack_trace = traceback.format_exception(type_, value_, traceback_)
        logging.error(stack_trace)

Code within the serve function i.e. on clicking the login button of the login form

elif q.args.btn_login:
    #Authenticate via an API and on successful login,
    #Initializing Client Web Socket
    q.client.engine_client = WebSocketClientFactory(ServerURL)
    q.client.engine_client.protocol = EngineClientProtocol
    EngineClientProtocol.set_query_context(q)
    q.client._host=host
    q.client._port=int(port) 
    status=threads.deferToThread(StartWebSocketClient(q))
    if status:
       q.client.future=asyncio.ensure_future(polling_function(q))
       await q.client.future
       print('Done')
   await q.page.save()

where polling_function(q) is the function used to update the cards in the wave dashboard, ServerURL contains the Web socket URL, something like "ws://localhost:15440".

0

There are 0 best solutions below