Handling underlying MQTT and STOMP connections with Spring Integration adapters

46 Views Asked by At

We have some Spring Integration flows to process messages that arrive through MQTT or STOMP. For this we are using the adapters MqttPahoMessageDrivenChannelAdapter and StompInboundChannelAdapter. In the case of MQTT we observed that if an exception is thrown at any of the endpoints in the flow, the adapter closes the connection and no more messages are received. Likewise, if we restart the broker, the connection with it is not established again.

To deal with the exception problem, we set the error channel name to the adapters with the value "errorChannel" that Spring handles by default. Our intention is to only log the exception, but not close the underlying connection. Is this the proper way to handle exceptions throughout the flow?

Regarding to the reconnection issue we have different approaches for each transfer protocol.

  • For MQTT we set to true the automaticReconnect of the ConnectionOptions:
var clientFactory = new DefaultMqttPahoClientFactory();
clientFactory.getConnectionOptions().setAutomaticReconnect(true);

var adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", MqttAsyncClient.generateClientId(), clientFactory, "/topic/myTopic");
adapter.setErrorChannelName("errorChannel");
  • For STOMP we set the TaskScheduler in context to the ReactorNettyTcpStompClient:
var stompClient = new ReactorNettyTcpStompClient(host, port);
stompClient.setTaskScheduler(taskScheduler);

var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient);

var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue");
adapter.setErrorChannelName("errorChannel");

Is this the best approach to deal with this?

1

There are 1 best solutions below

2
Artem Bilan On

Yes, the errorChannel option is the good way to suppress that exception to be thrown to the MQTT client. It is not necessary has to be that global errorChannel which might be used in many different places. setAutomaticReconnect(true) is really recommended for Inbound Channel Adapter.

The TaskScheduler for ReactorNettyTcpStompClient is not for reconnects. See its Javadocs. I think the reconnection logic is out of use in the ReactorNettyTcpStompClient:

public CompletableFuture<StompSession> connectAsync(@Nullable StompHeaders connectHeaders, StompSessionHandler handler) {
    ConnectionHandlingStompSession session = createSession(connectHeaders, handler);
    this.tcpClient.connectAsync(session);
    return session.getSession();
}

Where a reconnect would be done via another variant:

CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);