Python paho mqtt: How to get all messages and disconnect?

482 Views Asked by At

Using the python paho mqtt client I want to get all messages in a given topic and disconnect from the broker.

I can easily do this using simple() as shown here, yet only if I specify the message count using msg_count. This would be fine if the message count were predictable or if there was a means to count the messages in a given topic.

import paho.mqtt.subscribe as mqtt_subscribe
messages = mqtt_subscribe.simple('proxies/#', msg_count=4, hostname='mqtt.somewhere.io')
for m in messages:
 print(m.payload.decode())

I'm also able to get all the messages in my topic using the example shown in Getting Started. However, this uses a loop_forever() and I simply want to get all messages and disconnect. I've tried other loop*() functions, yet none are offering the results I'm seeking.

The Callback Example also returns all messages yet it isn't clear to me how to disconnect from this.

And lastly, I can also use the mosquitto_sub client with a timeout to get all messages: mosquitto_sub -h mqtt.somewhere.io -t "proxies/#" -W 1 But I'd prefer to be consistent with the paho client and better understand it.

2

There are 2 best solutions below

0
hardillb On

A given topics doesn't have a "count" of messages. There is potentially 1 retained message, after that there could be zero to infinite messages sent over infinite time.

If the problem is that you don't know how many topics there are with retained messages attached, because you have used a wildcard, then you can wait for the first message that doesn't have the retained bit set in the header, because retained messages will be delivered before any other messages on connection.

If the client has an existing persistent session then the broker may have queued past messages (missed by the client while it was not connected) for that specific client, but there is no way to get the count of those messages. But this depends on the client having been previously connected and subscribed at a high QoS level.

You may need to adjust your thinking here.

0
Brits On

As per the related discussion on Reddit your aim was to connect, "retrieve all retained messages" matching a topic filter, and then disconnect.

The MQTT protocol does not provide a way to get a count of the retained messages that match a topic filter, or the option to only retrieve retained messages. When you subscribe the server will send any retained messages and then start a stream of new messages. You would need to determine when all retained messages have been received and disconnect at that point.

A basic approach might be as follows (copying the example app as OP did in the Reddit conversation):

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    client.subscribe("proxies/#")

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("mqtt.eclipseprojects.io", 1883, 60)

client.loop_start()
time.sleep(2)
client.disconnect()
client.loop_stop()

This will allow 2 seconds for the connection to be established, and any retained messages to be received. Its important to note that you may not get all retained messages within this period (say there are a lot of child topics with retained messages and/or a slow link), or you might get other messages (you may receive new messages after the retained ones).

For a basic use-case the limitations may not be an issue. However you could improve upon this by detecting the first non-retained message, or a suitable delay between messages, and disconnecting at that point.

You may note that the above solution is a bit of a hack; this is because what you are attempting is not really something the protocol was designed to support. It may be worth reconsidering your approach, and doing something like dumping all messages to a database and retrieving the most recent status from there.