Shared Subscription with Spring Integration Mqttv5 not Receiving Messages

382 Views Asked by At

I have a spring boot (v. 3.0.5) project using spring-integration-mqtt (v. 6.0.4) and also use paho mqttv5 client. I want to setup a shared subscription via the ClientManager and the Integration DSL. But I cannot get it to work.

@Bean
fun clientManager(): ClientManager<IMqttAsyncClient, MqttConnectionOptions> {
    val connectionOptions = MqttConnectionOptions()
    connectionOptions.serverURIs = arrayOf("tcp://example.org:1883")
    
    val clientManager = Mqttv5ClientManager(connectionOptions, "testClient")
    clientManager.setPersistence(MqttDefaultFilePersistence())
    return clientManager
}

@Bean
fun mqttTestInFlow(clientManager: ClientManager<IMqttAsyncClient, MqttConnectionOptions>): IntegrationFlow {
    val messageProducer = Mqttv5PahoMessageDrivenChannelAdapter(
        clientManager,
        "\$share/testGroup/foo/test",
    )

    return IntegrationFlow.from(messageProducer)
        .channel("mqttInputChannel")
        .get()
}

@ServiceActivator(inputChannel = "mqttInputChannel")
fun handler(message: Message<String>) {
    println("Received message: ${message.payload}")
}

I can see in the logs from my mosquitto broker that the subscription is created and also that messages published to foo/test are published to the testClient (the spring service). But my handler never receives these messages. When I remove the $share/testGroup from the topic string then everything works just fine.

0

There are 0 best solutions below