I have a Python Flask app that publishes messages to the MQTT topic and subscribes from the MQTT topic. It is deployed on Kubernetes and runs on gunicorn with the gevent worker class.
The app creates two Paho mqttv5 clients for each publishing and subscribing to the messages and the client uses TLS default context.
My custom Paho client code:
@attr.s
class PahoClient:
KEEPALIVE_IN_SECONDS = 60
_message_bus_host = attr.ib(type=str, default=None)
_message_bus_port = attr.ib(type=int, default=None)
client_id = attr.ib(type=str, default=None)
username = attr.ib(type=str, default=None)
password = attr.ib(type=str, default=None)
connected = attr.ib(type=bool, default=False)
_unexpected_disconnect = attr.ib(type=bool, default=False)
def __attrs_post_init__(self):
self._client = Client(client_id=self.client_id, userdata=id, protocol=MQTTv5)
if self.username and self.password:
self._client.username_pw_set(username=self.username, password=self.password)
self._client.tls_set_context(context=ssl.create_default_context())
self._client.on_connect = self._on_connect
self._client.on_publish = self._on_publish
self._client.on_message = self._handle_message
self._client.on_disconnect = self._on_disconnect
self._log_on = self.on_log
def on_log(self, client, userdata, level, buf):
logger.debug(f"log {buf}")
def loop(self):
if self.loop_in_own_thread:
raise ValueError("Can't call loop if connector is running in its own thread")
self._client.loop()
def connect(self):
self._client.connect(host=self._message_bus_host, port=self._message_bus_port,
keepalive=self.KEEPALIVE_IN_SECONDS)
# runs a thread in the background to call loop() automatically
if self.loop_in_own_thread:
self._client.loop_start()
def disconnect(self):
# disconnect client from broker
self._client.disconnect()
# stop thread
if self.loop_in_own_thread:
self._client.loop_stop()
def _register_callback(self, topic: str, callback: Callable):
self._client.message_callback_add(topic, callback)
self._client.subscribe(topic=topic)
def subscribe(self, subscriptions: List[Type['Subscription']]) -> None:
"""
Register callbacks for all subscriptions.
"""
for subscription in subscriptions:
if subscription not in self._subscriptions:
self._subscriptions.append(subscription)
# add callbacks for new subscriptions
for subscription in subscriptions:
self._register_callback(subscription.target, subscription.callback.callback)
def send_message(self, message: MqttMessage) -> MQTTMessageInfo:
return self._client.publish(topic=message.resource,
payload=payload,
qos=message.quality_of_service,
retain=message.retain)
def _handle_message(self, client, userdata, message):
"""
Message handler for all topics which have no explicit handlers subscribed to.
"""
logger.debug("Received unsubscribed message %s" % message.payload)
def _on_publish(self, client, userdata, mid):
logger.debug("on_publish for client {0}, mid {1}".format(self.client_id, mid))
def _on_connect(self, client, userdata, flags, rc, properties):
"""
The callback for when the client receives a CONNACK response from the server.
"""
logger.info("%s connected with result code %s to broker %s:%d" % (self.client_id, rc,
self._client.socket().getpeername()[0],
self._client.socket().getpeername()[1]))
self.connected = True
if self._unexpected_disconnect:
logger.info(f"re-subscribing to {self._subscriptions} topics")
# Re-register subscriptions after disconnect
self.subscribe(self._subscriptions)
self._unexpected_disconnect = False
def _on_disconnect(self, client, userdata, rc, properties):
if rc != 0:
logger.warning(f"Unexpected message bus disconnection, code {rc}, {self._client._client_id}")
self.connected = False
if rc == 1:
logger.warning("Possibly there is already another client connected with the same identifier")
self._unexpected_disconnect = True
logger.info(f"Trying to reconnect {self._client._client_id}")
self.connect()
The issue is Paho mqtt client throws the following error when the app publishes more than 100kb messages. The app keeps publishing messages without any error but no callback method gets called. The paho-mqtt client throws this error only when the app runs on Gunicorn with gevent worker class.
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/threading.py", line 973, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.9/threading.py", line 910, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 3591, in _thread_main
self.loop_forever(retry_first_connection=True)
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1756, in loop_forever
rc = self._loop(timeout)
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1181, in _loop
rc = self.loop_write()
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1577, in loop_write
rc = self._packet_write()
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 2464, in _packet_write
write_length = self._sock_send(
File "/usr/local/lib/python3.9/site-packages/paho/mqtt/client.py", line 649, in _sock_send
return self._sock.send(buf)
File "/usr/local/lib/python3.9/site-packages/gevent/_ssl3.py", line 501, in send
return self._sslobj.write(data)
ssl.SSLError: [SSL: BAD_LENGTH] bad length (_ssl.c:2483)
The gunicorn command in kuberenetes looks like below:
spec:
containers:
- name: mqtt-flask-app
image: <docker image>
command:
- bash
- '-c'
- gunicorn wsgi -w 1 -b :5000 -t 600 --worker-class gevent
ports:
- containerPort: 5000
protocol: TCP
It works fine after deploying app on kubernetes by removing gevent package from requirements.txt and worker-class option from gunicorn command in Kubernetes deployment like below.
spec:
containers:
- name: mqtt-flask-app
image: <docker image>
command:
- bash
- '-c'
- gunicorn wsgi -w 1 -b :5000 -t 600
ports:
- containerPort: 5000
protocol: TCP
I have to run Flask app on Gunicorn with gevent worker class. Could please suggest solution for this issue?