Why is aiomqtt.error.MqttError: Operation timed out raised?

325 Views Asked by At

I have a MQTT publisher module coded in Python 3.11.3 using aiomqtt 1.2.1. It's all based on asyncio tasks.

Whyt my code basically does is publishing a config_io message and after that publishing data_in messages. publishing the config_io works perfectly fine but after that a timeout error is thrown by aiomqtt: "wait_for raise MqttError(msg) from None aiomqtt.error.MqttError: Operation timed out"

So the connection to the broker works and I think the problem is related to the periodic publishing which is included. I use the library asyncio-periodic 2019.2. Can somebody support me in finding the issue?

import aiomqtt
import asyncio
from periodic import Periodic
import datetime
from datetime import timezone
import json
import ssl
import sys
import os

def get_json_config_io():
    current_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
    Json_Config = {"last_edited":str(current_time),
                   "value":"test"}
    return json.dumps(Json_Config)

def get_json_pub():
    plc.open()
    utc_timestamp = datetime.datetime.now(timezone.utc).replace(tzinfo=timezone.utc).timestamp()
    data_pub={
        "Timestamp":utc_timestamp,
        "value":"test"
        }
    json_pub = json.dumps(data_pub)
    return json_pub

async def config_publishing(client):
    resource = 'config_io'
    topic = "$resource/" + resource 
    json_pub = get_json_config_io()
    await client.publish(topic,payload=json_pub)
    print("config_io published")

async def periodic_publishing(client):
    resource = 'data_in'
    topic = "$resource/" + resource 
    json_pub = get_json_pub()
    await client.publish(topic,payload=json_pub)
    print("data published")
    await asyncio.sleep(1)

async def main():
    reconnect_interval = 5
    try:
        with open("iot_connector_fqdn.txt", "r") as connector_file:
            print("File 'iot_connector_fqdn.txt' found...")
            connector_fqdn = connector_file.read()
    except FileNotFoundError as exc:
        print("File 'iot_connector_fqdn.txt' not found...")
        connector_fqdn = input("IoT Connector FQDN? ")
        with open("iot_connector_fqdn.txt", "w") as connector_file:
            connector_file.write(connector_fqdn)
    token = open("./token.txt", "r").read()
    print("Using Token: {}\n".format(token))
    cert = "./DigiCertGlobalRootCA.cer"
    tls_params = aiomqtt.TLSParameters(ca_certs=cert,cert_reqs=ssl.CERT_REQUIRED)
    first_connection = True
    while True:
        try:
            async with aiomqtt.Client(hostname=connector_fqdn, port=8883, username='', password=token, client_id='', tls_params=tls_params) as client:
                pub_periodic = Periodic(30, periodic_publishing, client)
                if first_connection == True:
                    await config_publishing(client)
                    first_connection = False
                    await asyncio.sleep(5)
                    await pub_periodic.start(delay=0)
        except aiomqtt.MqttError as error:
            print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
            await asyncio.sleep(reconnect_interval) 

# Change to the "Selector" event loop if platform is Windows
if sys.platform.lower() == "win32" or os.name.lower() == "nt":
    from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
    set_event_loop_policy(WindowsSelectorEventLoopPolicy())

asyncio.run(main())

I already verified that the confiG_io message is sent and published to the broker. I tried to find another option for the periodic publishing but didn't find a suitable answer.

0

There are 0 best solutions below