I have a simple code as per below that configures the rxfn as well. However, the rxfn does not get called. It does not print on my console. Please help.
Send seems to be working fine since I am able to control my Controller with a known command. That is why it is commented out below.
from typing import Optional
import isotp
import keyboard
from loguru import logger
import can
import time
def cantp_error_handler(error):
# Called from a different thread, needs to be thread safe
logger.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))
def cantp_txfn(isotp_msg: isotp.CanMessage):
# all set_something functions and my_hardware_something are fictive.
message = can.Message(arbitration_id=isotp_msg.arbitration_id, data=isotp_msg.data, is_extended_id=False)
logger.debug(f"Send: {message}")
pcan_bus.send(message)
def cantp_rxfn(timeout: float) -> Optional[isotp.CanMessage]:
# All my_hardware_something and get_something() function are fictive of course.
msg = pcan_bus.recv(timeout) # Blocking read are encouraged for better timing.
if msg:
print(f"Received CAN Message: {msg}")
return isotp.CanMessage(arbitration_id=msg.arbitration_id, data=msg.data)
else:
return None
logger.info("Initialize CAN Interface")
config = {
'interface': 'pcan',
'channel': 'PCAN_USBBUS1',
'bitrate': 1_000_000,
'f_clock': 20_000_000,
'data_bitrate': None,
'fd': False,
}
try:
pcan_bus = can.ThreadSafeBus(**config) # Use thread safe version (blocks during Rx and Tx)
except Exception as can_thread_error:
logger.error(f"Could not connect to CAN Bus: {can_thread_error}")
else:
logger.info("CAN connection successful.")
addr = isotp.Address(isotp.AddressingMode.Normal_29bits, rxid=0x301, txid=0x300)
can_tp_params = {
'stmin': 0xF8, # 800us between consecutive frames
'blocking_send': True,
}
stack = isotp.CanStack(pcan_bus,
rxfn=cantp_rxfn,
txfn=cantp_txfn,
address=addr,
error_handler=cantp_error_handler,
params=can_tp_params)
try:
stack.start()
print("Press 's' to stop the loop.")
# stack.send([0x51, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x34, 0x05],
# send_timeout=2) # Non-blocking send, does not raise exception.
while True:
# Check if the 's' key is pressed
if keyboard.is_pressed('s'):
print("Stopping the loop.")
break
# Your main loop logic goes here
if stack.available():
print("CAN stack received a message.")
stack.recv()
logger.info("Payload transmission done.") # May have failed, use the error_handler to know
finally:
stack.stop()
pcan_bus.shutdown()
I've tried looking at the documentation https://can-isotp.readthedocs.io/en/latest/isotp/implementation.html#transport-layer, but no luck. I also looked at the examples but no luck receiving... or at least printing from the callback. I am able to receive with the .available() function from the transport library.
P.S. The results... results of code above