How will I be able to process received CAN message in Python using can-isotp 2.0.3 module?

54 Views Asked by At

I have a simple code as per below that configures the rxfn as well. However, the rxfn does not get called. It does not print on my console. Please help.

Send seems to be working fine since I am able to control my Controller with a known command. That is why it is commented out below.

from typing import Optional

import isotp
import keyboard
from loguru import logger
import can
import time


def cantp_error_handler(error):
    # Called from a different thread, needs to be thread safe
    logger.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))


def cantp_txfn(isotp_msg: isotp.CanMessage):
    # all set_something functions and my_hardware_something are fictive.
    message = can.Message(arbitration_id=isotp_msg.arbitration_id, data=isotp_msg.data, is_extended_id=False)
    logger.debug(f"Send: {message}")
    pcan_bus.send(message)


def cantp_rxfn(timeout: float) -> Optional[isotp.CanMessage]:
    # All my_hardware_something and get_something() function are fictive of course.
    msg = pcan_bus.recv(timeout)  # Blocking read are encouraged for better timing.
    if msg:
        print(f"Received CAN Message: {msg}")
        return isotp.CanMessage(arbitration_id=msg.arbitration_id, data=msg.data)
    else:
        return None


logger.info("Initialize CAN Interface")
config = {
    'interface': 'pcan',
    'channel': 'PCAN_USBBUS1',
    'bitrate': 1_000_000,
    'f_clock': 20_000_000,
    'data_bitrate': None,
    'fd': False,
}
try:
    pcan_bus = can.ThreadSafeBus(**config)  # Use thread safe version (blocks during Rx and Tx)
except Exception as can_thread_error:
    logger.error(f"Could not connect to CAN Bus: {can_thread_error}")
else:
    logger.info("CAN connection successful.")

addr = isotp.Address(isotp.AddressingMode.Normal_29bits, rxid=0x301, txid=0x300)
can_tp_params = {
    'stmin': 0xF8,  # 800us between consecutive frames
    'blocking_send': True,
}
stack = isotp.CanStack(pcan_bus,
                       rxfn=cantp_rxfn,
                       txfn=cantp_txfn,
                       address=addr,
                       error_handler=cantp_error_handler,
                       params=can_tp_params)

try:
    stack.start()
    print("Press 's' to stop the loop.")

    # stack.send([0x51, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x34, 0x05],
    #            send_timeout=2)  # Non-blocking send, does not raise exception.

    while True:
        # Check if the 's' key is pressed
        if keyboard.is_pressed('s'):
            print("Stopping the loop.")
            break

        # Your main loop logic goes here
        if stack.available():
            print("CAN stack received a message.")
            stack.recv()

    logger.info("Payload transmission done.")  # May have failed, use the error_handler to know
finally:
    stack.stop()
    pcan_bus.shutdown()

I've tried looking at the documentation https://can-isotp.readthedocs.io/en/latest/isotp/implementation.html#transport-layer, but no luck. I also looked at the examples but no luck receiving... or at least printing from the callback. I am able to receive with the .available() function from the transport library.

P.S. The results... results of code above

0

There are 0 best solutions below