Converting object to JSON to MQTT payload using Spring Integration

47 Views Asked by At

I am trying to send JSON as payload through MQTT messages using Spring Integration. This is my outbound handler:

    @Bean
    @ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
    public MessageHandler mqttOutbound(final ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager,
            final MqttClientPersistence persistence,
            final MqttHeaderMapper mqttHeaderMapper,
            final JacksonJsonMessageConverter jacksonJsonMessageConverter) {
        final var messageHandler = new Mqttv5PahoMessageHandler(clientManager);
        messageHandler.setHeaderMapper(mqttHeaderMapper);
        messageHandler.setPersistence(persistence);
        messageHandler.setAsync(true);
        messageHandler.setAsyncEvents(false);
        final var defaultPahoMessageConverter = new DefaultPahoMessageConverter(MqttQoS.AT_LEAST_ONCE.value(), false, StandardCharsets.UTF_8.name());
        final var bytesMessageMapper = new ConvertingBytesMessageMapper(jacksonJsonMessageConverter);
        defaultPahoMessageConverter.setBytesMessageMapper(bytesMessageMapper);
        messageHandler.setConverter(defaultPahoMessageConverter);
        return messageHandler;
    }

However, this ends up asserting in org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler#onInit on

        else {
            Assert.state(!(getConverter() instanceof MqttMessageConverter),
                    "MessageConverter must not be an MqttMessageConverter");
        }

How else can I do this?

I tried to study the Spring Integration samples source by I did not glean anything from it.

1

There are 1 best solutions below

0
Artem Bilan On

Yes. We just don't have enough resources on our side to cover all the features we implement for the community with samples. So, indeed there is no MQTT v5 sample over there.

As you see Mqttv5PahoMessageHandler cannot be supplied with the MqttMessageConverter. Instead regular instance of MessageConverter must be used. In your case the provided jacksonJsonMessageConverter must be used directly:

messageHandler.setConverter(jacksonJsonMessageConverter);

See more info in docs: https://docs.spring.io/spring-integration/reference/mqtt.html#mqtt-v5