Mosquitto MQTT Broker performance on Docker

405 Views Asked by At

Here are three scenarios that I tried while ingesting data into MQTT broker.

Scenario 1

100 records per second with QoS=1, for a duration of 30 seconds from 3 machines (where each machine is a docker container) in parallel to a MQTT broker which is also a docker container. So each machine is generating a total of 3000 messages over 30 seconds Size of each message is 260 bytes.

Here is the receiver side data and below is the count of records received per machine.

JSON_DATE:MACHINE_NAME                COUNT(*)
"09166423-892a-4c84-9717-83b91ecbf2d5"  1,956
"b05eb890-ed0a-4272-aa95-7dce4f066cdb"  2,037
"e1ede903-f8f5-4bc8-95bb-ee286e6c00c3"  2,419

Inference : There is data loss for each machine

Scenario 2

100 records per 5 seconds with QoS=1, for a duration of 30 seconds from 5 machines (where each machine is a docker container) in parallel to a MQTT broker which is also a docker container. So each machine is generating a total of 3000 messages over 30 seconds Size of each message is 260 bytes.

Here are two scenarios that I tried while ingesting data into MQTT broker.

JSON_DATE:MACHINE_NAME                 COUNT(*)
"043fc770-104c-4d0f-99e8-aae4a22bea60"  3,721
"06d39ec0-d799-43a9-9c58-16bbb67c0217"  3,085
"4374073b-6c75-4788-b1f7-bcd65aa755aa"  3,587
"675467d5-4361-4a61-becf-0a20fc5ad84f"  3,500
"825d3880-99db-425b-b867-3254fef394f3"  3,495

Inference : There is no data loss for any machine. There are multiple deliveries for most of the machines

Scenario 3

100 records per 5 seconds with QoS=1, for a duration of 30 seconds from 10 machines (where each machine is a docker container) in parallel to a MQTT broker which is also a docker container. So each machine is generating a total of 3000 messages over 30 seconds Size of each message is 260 bytes.

Here are two scenarios that I tried while ingesting data into MQTT broker.

JSON_DATE:MACHINE_NAME                 COUNT(*)
"06e08aeb-f258-40eb-adcd-efe1b2e3725e"  3,162
"0be5975c-b364-450e-b550-62834d32a18f"  3,181
"30f6cea9-efde-42b5-bcb2-b81e360d27b4"  2,959
"500465c1-7ff0-4097-a0d7-d22b55b73079"  3,142
"ac61be0c-ac40-44fd-930c-10b01d98aa4a"  3,235
"b259f4cd-cd69-4582-b428-53192ac5d13e"  3,027
"b8567bf2-cdc8-4907-8c03-7bbe78d36cb8"  3,212
"ca3bda96-4b60-4d97-a88a-22b10d500650"  2,932
"d6bb5650-1fdb-45cf-9bfc-a979594aba0f"  3,287
"dff45255-b48e-45a9-9e71-a7cfd9559139"  3,069

Inference : There is minor data loss for some machines

Below is the screen shot of the docker stats when scenario 2 was in execution.

enter image description here

##Publisher Code. This is run within a docker container

import paho.mqtt.client as paho 
from paho import mqtt
from random import randrange, uniform
import time
from time import sleep
from multiprocessing import Process
from os import getpid
import os
import random
import datetime
import os
import uuid



def task():
    pid = getpid()
    machine_name =  str(uuid.uuid4())
    topic_name =  machine_name
    print("{0} : {1}".format(machine_name, topic_name))

    for i in range(30) :

        mqttBroker ="localhost" 
        client = paho.Client("Temperature_Inside")
        client.connect(mqttBroker,1883)
        client.loop_start()

        for _ in range(100):
            sensor_reading = str(random.uniform(10, 12))
            ingest_time = str(datetime.datetime.now())
            sensor_payload = "{ 'Topic_Name': \'" + topic_name + "\', 'Msg_Id': \'" + str(i)+ '-'+ str(_) + "\', 'Ingestion_Time': \'" + ingest_time + "\', 'Machine_Name': \'" + machine_name + "\','Reading': \'"+ sensor_reading +"\' }"
            print(sensor_payload)
            client.publish(topic_name,sensor_payload,qos=1)
            
        sleep(5)



# entry point
if __name__ == '__main__':
    sleep(1)

    task()
## Code to trigger the docker container

from multiprocessing import Process
import os
import subprocess
import docker
import sys


def task():
    client = docker.from_env()
    container = client.containers.run('datagen', detach=True,network="host",auto_remove=True)
    print(container.logs())



# entry point
if __name__ == '__main__':
    
    print("Number of threads to start : {0}".format(sys.argv[1]))

    n=int(sys.argv[1])
    for i in range(n):
        process = Process(target=task)
        process.start()



##Finally here the subscriber code

import paho.mqtt.client as paho 
from paho import mqtt
import time

def on_message(client, userdata, message):
    print("received message: " ,str(message.payload.decode("utf-8")))
    with open('files_sub/data_recv.txt','a+') as f:
         f.write("Message received: "  + str(message.payload) + "\n")

mqttBroker ="localhost"
client = paho.Client("Smartphone")
client.connect(mqttBroker,1883) 

client.loop_start()

client.subscribe("#",qos=1)
client.on_message=on_message 

time.sleep(300)
client.loop_stop()

Here is what I am looking for.

Is there a threshold that we can use as a benchmark for MQTT broker running on a docker ? Is there a way that we can improve the performance ?

0

There are 0 best solutions below