Update a pymodbus server context while the server is running

949 Views Asked by At

I'm currently using a tool called Modbus Slave that worked pretty well with a gui. However, I would like to have some automated tests that's why I would like to use pymodbus. The setup I would like to have with pymodbus is the following :

  • On a STM32 I have a serial modbus master running.
  • I would like to have two pymodbus slave running with a "data storage", meaning on my Slave ID N°1 I want to have 3100 registers, and on the Slave ID N°248 I want to have 3100 registers for example.
  • Then, I would like to be able to have my modbus master to write in the data registers of the modbus slave Id N°1 or the Slave ID N°248.
  • Be able to update the context of the Slave ID N°1 for example while it's running.

What's in Bold is working. As I have not that much experience with python/pymodbus, I would like to know what is the best way to proceed ?

EDIT:

Current code of my pymodbus sync server running with a context of 3100 registers with 0 as default value, for Slave ID 1 and 248.

#!/usr/bin/env python3
"""

Server sync

"""

from threading import Thread

from pymodbus.device import ModbusDeviceIdentification
from pymodbus.version import version

from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSlaveContext,
)
# --------------------------------------------------------------------------- #
# configure the service logging
# --------------------------------------------------------------------------- #
import logging
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
# --------------------------------------------------------------------------- #
# import the various client implementations
# --------------------------------------------------------------------------- #
from pymodbus.server.sync import (
    StartSerialServer,
)
from pymodbus.transaction import (
    ModbusRtuFramer,
)


def run_sync_server():
    """Run server setup."""
    port = "COM6"
    datablock = ModbusSequentialDataBlock(0, [0x00] * 3100)
    context = {
        0x01: ModbusSlaveContext(
            hr=datablock,
        ),
        0xF8: ModbusSlaveContext(
            hr=datablock,
        ),
    }

    # Build data storage
    store = ModbusServerContext(slaves=context, single=False)

    """Run server."""
    txt = f"### start server, listening on {port} - serial"
    print(txt)
    server = StartSerialServer(
        context=store,  # Data storage
        # identity=identity,  # server identify
        timeout=0.01,  # waiting time for request to complete
        port=port,  # serial port
        # custom_functions=[],  # allow custom handling
        framer=ModbusRtuFramer,  # The framer strategy to use
        # handler=None,  # handler for each session
        stopbits=1,  # The number of stop bits to use
        bytesize=8,  # The bytesize of the serial messages
        parity="N",  # Which kind of parity to use
        baudrate=256000,  # The baud rate to use for the serial device
    )
    return server

if __name__ == "__main__":
    
    server = run_sync_server()
    server.shutdown()

Concerning the update_server I have this for now :

#!/usr/bin/env python3
# pylint: disable=missing-any-param-doc,differing-param-doc
"""Pymodbus Server With Updating Thread.

This is an example of having a background thread updating the
context while the server is operating. This can also be done with
a python thread::

    from threading import Thread
    Thread(target=updating_writer, args=(context,)).start()
"""
from threading import Thread
import time
import logging
import re
from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSlaveContext,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.server.asynchronous import StartSerialServer
from pymodbus.version import version


# --------------------------------------------------------------------------- #
# configure the service logging
# --------------------------------------------------------------------------- #
log = logging.getLogger()
log.setLevel(logging.DEBUG)

# --------------------------------------------------------------------------- #
# define your callback process
# --------------------------------------------------------------------------- #


def updating_writer(extra):
    """Run for tests
    :param arguments: The input arguments to the call
    """
    print("Check register")
    slave_id = 0x1
    value = context[slave_id].getValues(0x3, 3001, count=1)

    time.sleep(5)
    while value[0] != 0:
        value = context[slave_id].getValues(0x3, 3001, count=1)
        time.sleep(5)
    context[slave_id].setValues(0x6, 1000, [0x10])

if __name__ == "__main__":

    datablock=ModbusSequentialDataBlock(0x00, [0x00] * 3100)
    context={
        0x01: ModbusSlaveContext(
            hr=datablock,
        ),
        0xF8: ModbusSlaveContext(
            hr=datablock,
        ),
    }
    store = ModbusServerContext(slaves=context, single=False)

    Thread(target=updating_writer, args=(store,)).start()

I had to do lots of fix on pymodbus examples cause they are not up to date with the pymodbus lib (2.5.3).

0

There are 0 best solutions below