Parsing a payload from a user using mqtt into a function and value

75 Views Asked by At

Help me figure it out, the payload comes via mqtt. In the EmCallback class I parse it and get the command and the value (for example 0 or 1). In class CommandEmulators I have to execute the command sent by the user. I tried a list of how to implement this better and more correctly. I also tried to make logic for processing commands, for example from dubbing, but so far I only have an idea with a queue. I understand that many here will not want to understand the code, but maybe you will have instructions so that I know where to look for a solution to this issue, so I ask for as much specification as possible.

import json

from Interface.interface import InterfaceCallback


class EmCallback(InterfaceCallback):

    def __init__(self, mqttc, em):
        self.flag_get_data = None
        self.em = em
        self.mqttc = mqttc

    async def callback_data(self, topic="mpei/Operator/Command"):
        self.mqttc.message_callback_add(topic, self.get_data)

    def get_data(self, client, userdata, data):
        try:
            parsed_data = json.loads(data.payload.decode("utf-8", "ignore"))
            self.validate_data(data)
            if self.flag_get_data:
                self.test(parsed_data)
        except Exception as e:
            print(f"Error {e}")

    def validate_data(self, data):
        if data:
            self.flag_get_data = True

    def test(self, msg):
        key = list(msg.keys())[0]
        ...
from Emulators.emulators_logics_command import EmulatorsLogicsCommand

class CommandEmulators:

    def __init__(self, em):
        self.em = em
        self.flags = EmulatorsLogicsCommand()

    def on_off(self, client, userdata, msg):
        msg = msg.payload.decode()
        if self.flags.logics_on_off(msg):
            self.em.send_command("OUTPUT 1", self.em)
        else:
            self.em.send_command("OUTPUT 0", self.em)

    def set_voltage(self, client, userdata, msg):
        retval = 0
        msg = msg.payload.decode()
        self.em.set_prog_source_i('eth')
        self.em.set_prog_source_v('eth')
        if msg:
            self.em.send_command(f"SOUR:VOLT {msg}", self.em)
        else:
            retval = -1
        return retval

    def set_current(self, client, userdata, msg):
        retval = 0
        msg = msg.payload.decode()
        self.em.set_prog_source_i('eth')
        self.em.set_prog_source_v('eth')
        if msg:
            self.em.send_command(f"SOUR:CUR {msg}", self.em)
        else:
            retval = -1
        return retval

    def test2(self):
        print("OK")

class EmulatorsLogicsCommand:

    def __init__(self):
        self.flag_on = True
        self.flag_off = True
        self.command_queue = []

    def logics_on_off(self, msg):
        if int(msg):
            self.flag_off = True
            if self.flag_on:
                self.flag_on = False
                return msg

        else:
            self.flag_on = True
            if self.flag_off:
                print("Off")
                self.flag_off = False
                return msg


    def add_command(self, command):
        self.command_queue.append(command)


    def process_commands(self):
        while self.command_queue:
            command = self.command_queue.pop(0)
            if not self.is_execute_command(command):
                self.execute_command(command)

    @staticmethod
    def is_execute_command(command):
        return False

    @staticmethod
    def execute_command(command):

        print(f"Run: {command['id']} - {command['action']}")

UPD. Okay, so far I have made such an implementation, we passed an object of the command class during initialization, and from it I make a key-command dictionary

emulators_contact_one.connection_sim(data_path.get_data_path("setting.ini"))
emulators_command_one = CommandEmulators(emulators_contact_one)
emulators_callback_one = EmCallback(mqttc, emulators_contact_one, emulators_command_one)


    def test(self, msg):
        key = list(msg.keys())[0]
        dict_command = {
            'test': self.emulators_command_one.test2
        }
        func = dict_command[key]
        func(msg)
0

There are 0 best solutions below