Help me figure it out, the payload comes via mqtt. In the EmCallback class I parse it and get the command and the value (for example 0 or 1). In class
CommandEmulators I have to execute the command sent by the user. I tried a list of how to implement this better and more correctly. I also tried to make logic for processing commands, for example from dubbing, but so far I only have an idea with a queue. I understand that many here will not want to understand the code, but maybe you will have instructions so that I know where to look for a solution to this issue, so I ask for as much specification as possible.
import json
from Interface.interface import InterfaceCallback
class EmCallback(InterfaceCallback):
def __init__(self, mqttc, em):
self.flag_get_data = None
self.em = em
self.mqttc = mqttc
async def callback_data(self, topic="mpei/Operator/Command"):
self.mqttc.message_callback_add(topic, self.get_data)
def get_data(self, client, userdata, data):
try:
parsed_data = json.loads(data.payload.decode("utf-8", "ignore"))
self.validate_data(data)
if self.flag_get_data:
self.test(parsed_data)
except Exception as e:
print(f"Error {e}")
def validate_data(self, data):
if data:
self.flag_get_data = True
def test(self, msg):
key = list(msg.keys())[0]
...
from Emulators.emulators_logics_command import EmulatorsLogicsCommand
class CommandEmulators:
def __init__(self, em):
self.em = em
self.flags = EmulatorsLogicsCommand()
def on_off(self, client, userdata, msg):
msg = msg.payload.decode()
if self.flags.logics_on_off(msg):
self.em.send_command("OUTPUT 1", self.em)
else:
self.em.send_command("OUTPUT 0", self.em)
def set_voltage(self, client, userdata, msg):
retval = 0
msg = msg.payload.decode()
self.em.set_prog_source_i('eth')
self.em.set_prog_source_v('eth')
if msg:
self.em.send_command(f"SOUR:VOLT {msg}", self.em)
else:
retval = -1
return retval
def set_current(self, client, userdata, msg):
retval = 0
msg = msg.payload.decode()
self.em.set_prog_source_i('eth')
self.em.set_prog_source_v('eth')
if msg:
self.em.send_command(f"SOUR:CUR {msg}", self.em)
else:
retval = -1
return retval
def test2(self):
print("OK")
class EmulatorsLogicsCommand:
def __init__(self):
self.flag_on = True
self.flag_off = True
self.command_queue = []
def logics_on_off(self, msg):
if int(msg):
self.flag_off = True
if self.flag_on:
self.flag_on = False
return msg
else:
self.flag_on = True
if self.flag_off:
print("Off")
self.flag_off = False
return msg
def add_command(self, command):
self.command_queue.append(command)
def process_commands(self):
while self.command_queue:
command = self.command_queue.pop(0)
if not self.is_execute_command(command):
self.execute_command(command)
@staticmethod
def is_execute_command(command):
return False
@staticmethod
def execute_command(command):
print(f"Run: {command['id']} - {command['action']}")
UPD. Okay, so far I have made such an implementation, we passed an object of the command class during initialization, and from it I make a key-command dictionary
emulators_contact_one.connection_sim(data_path.get_data_path("setting.ini"))
emulators_command_one = CommandEmulators(emulators_contact_one)
emulators_callback_one = EmCallback(mqttc, emulators_contact_one, emulators_command_one)
def test(self, msg):
key = list(msg.keys())[0]
dict_command = {
'test': self.emulators_command_one.test2
}
func = dict_command[key]
func(msg)