We have some Spring Integration flows to process messages that arrive through MQTT or STOMP. For this we are using the adapters MqttPahoMessageDrivenChannelAdapter and StompInboundChannelAdapter.
In the case of MQTT we observed that if an exception is thrown at any of the endpoints in the flow, the adapter closes the connection and no more messages are received. Likewise, if we restart the broker, the connection with it is not established again.
To deal with the exception problem, we set the error channel name to the adapters with the value "errorChannel" that Spring handles by default. Our intention is to only log the exception, but not close the underlying connection. Is this the proper way to handle exceptions throughout the flow?
Regarding to the reconnection issue we have different approaches for each transfer protocol.
- For MQTT we set to
truetheautomaticReconnectof theConnectionOptions:
var clientFactory = new DefaultMqttPahoClientFactory();
clientFactory.getConnectionOptions().setAutomaticReconnect(true);
var adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", MqttAsyncClient.generateClientId(), clientFactory, "/topic/myTopic");
adapter.setErrorChannelName("errorChannel");
- For STOMP we set the
TaskSchedulerin context to theReactorNettyTcpStompClient:
var stompClient = new ReactorNettyTcpStompClient(host, port);
stompClient.setTaskScheduler(taskScheduler);
var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient);
var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue");
adapter.setErrorChannelName("errorChannel");
Is this the best approach to deal with this?
Yes, the
errorChanneloption is the good way to suppress that exception to be thrown to the MQTT client. It is not necessary has to be that globalerrorChannelwhich might be used in many different places.setAutomaticReconnect(true)is really recommended for Inbound Channel Adapter.The
TaskSchedulerforReactorNettyTcpStompClientis not for reconnects. See its Javadocs. I think the reconnection logic is out of use in theReactorNettyTcpStompClient:Where a reconnect would be done via another variant: