Here are three scenarios that I tried while ingesting data into MQTT broker.
Scenario 1
100 records per second with QoS=1, for a duration of 30 seconds from 3 machines (where each machine is a docker container) in parallel to a MQTT broker which is also a docker container. So each machine is generating a total of 3000 messages over 30 seconds Size of each message is 260 bytes.
Here is the receiver side data and below is the count of records received per machine.
JSON_DATE:MACHINE_NAME COUNT(*)
"09166423-892a-4c84-9717-83b91ecbf2d5" 1,956
"b05eb890-ed0a-4272-aa95-7dce4f066cdb" 2,037
"e1ede903-f8f5-4bc8-95bb-ee286e6c00c3" 2,419
Inference : There is data loss for each machine
Scenario 2
100 records per 5 seconds with QoS=1, for a duration of 30 seconds from 5 machines (where each machine is a docker container) in parallel to a MQTT broker which is also a docker container. So each machine is generating a total of 3000 messages over 30 seconds Size of each message is 260 bytes.
Here are two scenarios that I tried while ingesting data into MQTT broker.
JSON_DATE:MACHINE_NAME COUNT(*)
"043fc770-104c-4d0f-99e8-aae4a22bea60" 3,721
"06d39ec0-d799-43a9-9c58-16bbb67c0217" 3,085
"4374073b-6c75-4788-b1f7-bcd65aa755aa" 3,587
"675467d5-4361-4a61-becf-0a20fc5ad84f" 3,500
"825d3880-99db-425b-b867-3254fef394f3" 3,495
Inference : There is no data loss for any machine. There are multiple deliveries for most of the machines
Scenario 3
100 records per 5 seconds with QoS=1, for a duration of 30 seconds from 10 machines (where each machine is a docker container) in parallel to a MQTT broker which is also a docker container. So each machine is generating a total of 3000 messages over 30 seconds Size of each message is 260 bytes.
Here are two scenarios that I tried while ingesting data into MQTT broker.
JSON_DATE:MACHINE_NAME COUNT(*)
"06e08aeb-f258-40eb-adcd-efe1b2e3725e" 3,162
"0be5975c-b364-450e-b550-62834d32a18f" 3,181
"30f6cea9-efde-42b5-bcb2-b81e360d27b4" 2,959
"500465c1-7ff0-4097-a0d7-d22b55b73079" 3,142
"ac61be0c-ac40-44fd-930c-10b01d98aa4a" 3,235
"b259f4cd-cd69-4582-b428-53192ac5d13e" 3,027
"b8567bf2-cdc8-4907-8c03-7bbe78d36cb8" 3,212
"ca3bda96-4b60-4d97-a88a-22b10d500650" 2,932
"d6bb5650-1fdb-45cf-9bfc-a979594aba0f" 3,287
"dff45255-b48e-45a9-9e71-a7cfd9559139" 3,069
Inference : There is minor data loss for some machines
Below is the screen shot of the docker stats when scenario 2 was in execution.
##Publisher Code. This is run within a docker container
import paho.mqtt.client as paho
from paho import mqtt
from random import randrange, uniform
import time
from time import sleep
from multiprocessing import Process
from os import getpid
import os
import random
import datetime
import os
import uuid
def task():
pid = getpid()
machine_name = str(uuid.uuid4())
topic_name = machine_name
print("{0} : {1}".format(machine_name, topic_name))
for i in range(30) :
mqttBroker ="localhost"
client = paho.Client("Temperature_Inside")
client.connect(mqttBroker,1883)
client.loop_start()
for _ in range(100):
sensor_reading = str(random.uniform(10, 12))
ingest_time = str(datetime.datetime.now())
sensor_payload = "{ 'Topic_Name': \'" + topic_name + "\', 'Msg_Id': \'" + str(i)+ '-'+ str(_) + "\', 'Ingestion_Time': \'" + ingest_time + "\', 'Machine_Name': \'" + machine_name + "\','Reading': \'"+ sensor_reading +"\' }"
print(sensor_payload)
client.publish(topic_name,sensor_payload,qos=1)
sleep(5)
# entry point
if __name__ == '__main__':
sleep(1)
task()
## Code to trigger the docker container
from multiprocessing import Process
import os
import subprocess
import docker
import sys
def task():
client = docker.from_env()
container = client.containers.run('datagen', detach=True,network="host",auto_remove=True)
print(container.logs())
# entry point
if __name__ == '__main__':
print("Number of threads to start : {0}".format(sys.argv[1]))
n=int(sys.argv[1])
for i in range(n):
process = Process(target=task)
process.start()
##Finally here the subscriber code
import paho.mqtt.client as paho
from paho import mqtt
import time
def on_message(client, userdata, message):
print("received message: " ,str(message.payload.decode("utf-8")))
with open('files_sub/data_recv.txt','a+') as f:
f.write("Message received: " + str(message.payload) + "\n")
mqttBroker ="localhost"
client = paho.Client("Smartphone")
client.connect(mqttBroker,1883)
client.loop_start()
client.subscribe("#",qos=1)
client.on_message=on_message
time.sleep(300)
client.loop_stop()
Here is what I am looking for.
Is there a threshold that we can use as a benchmark for MQTT broker running on a docker ? Is there a way that we can improve the performance ?
