Spring Intergration MQTT, send response to gateway

62 Views Asked by At

Service:

Message<MQTTResponse> response = mqttGateway.sendToMqtt("topic", jsonData);
    log.info("Receive reply{}",String.valueOf(response.getPayload()));

I have this MessagingGateway:

@MessagingGateway(defaultRequestChannel = "mqttOutputChannel", defaultReplyChannel = "mqttResponseChannel")
public interface MqttGateway {
    Message<MQTTResponse> sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data);
}

The constrain: sendToMqtt will send a request to a topic, the response of that topic will come from another topic. Which I will capture with mqttInputChannel

@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMessage(Message<String> message) throws JsonProcessingException {
    if (Objects.equals(message.getHeaders().get("mqtt_receivedTopic"), topicAck)) {
        ObjectMapper objectMapper = new ObjectMapper();
        MQTTResponse payloadMap = objectMapper.readValue(message.getPayload(), MQTTResponse.class);
        log.info("Payload: {}", payloadMap);

        MessageChannel mqttResponseChannel = mqttResponseChannel();
        Message<MQTTResponse> responseMessage = MessageBuilder.
           withPayload(payloadMap)
           .setReplyChannel(mqttResponseChannel)
           .build();
        log.info("Response Message: {}", responseMessage);
        mqttResponseChannel.send(responseMessage);
    }
}

After the line log.info("Response Message: {}", responseMessage);, it lost connection immediately: Lost connection: MqttException.

And after a while: java.lang.NullPointerException: Cannot invoke "org.springframework.messaging.Message.getPayload()" because "response" is null

I can't send the response received from handleMessage function back to the service with the reply channel mqttResponseChannel.

1

There are 1 best solutions below

0
Artem Bilan On

It is not going to work that simple way. The @MessagingGateway is a request-reply pattern which does correlation between request and reply via replyChannel header with the TemporaryReplyChannel value.

When you receive message from another process, in your case the MQTT Inbound Channel Adapter, there is definitely not that info in the headers. And you cannot just send message to the mqttResponseChannel expecting it to be correlated with the request. Just because this message does not have that replyChannel header.

Usually for this kind of asynchronous correlation when request-reply is still blocking I suggest a solution with an aggregator. Which is really not so simple:

You send your request to the MQTT and to an aggregator with some unique correlation key which would be present some way in the request to MQTT.

When you receive a message from reply topic, you send to that aggregator as well. This reply message must have same correlation key info as a request. The aggregator then performs a correlation between request and reply. In the release function you extract headers from request message (usually the first in a group). And just return a reply message with those headers added. That would include the mentioned replyChannel for correlation with originator gateway. Your aggregator must not have an outputChannel and the framework will choose replyChannel header for the message to produce.

The crucial part of this pattern is to preserve the correlation key in the MQTT request and reply.

See more info about an aggregator in docs: https://docs.spring.io/spring-integration/reference/aggregator.html