spring integration how to reply to topic and messages?

28 Views Asked by At

I use spring integration mqtt,I subscribed to a topic,I can receive its message.

However, this MQTT protocol is to reply to it after receiving it.

Forexample:

@Configuration
public class MqttIntegrationConfig {
    @Autowired
    MqttService mqttService;

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883", "testClient", "wdzn/#");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"});
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("wdzn/#");
        return messageHandler;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return message -> {
            mqttService.sendToMqtt(message);
        };
    }
}


@Service
public class MqttService {

    @Autowired
    private MqttGateway mqttGateway;

    public void sendToMqtt(Message<?> message) {
        try {
            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
            MqttBase mqttBase = new MqttBase(topic);
            String sendTopic = mqttBase.getPrefix() + "/" + mqttBase.getCode() + "/" + mqttBase.getBusiness() + "/" + mqttBase.getMessageId() + "/" + "1";
            mqttGateway.sendToMqtt("",sendTopic);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}


@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}

When I receive the topic is 'wdzn/Online/50001000/Register/0', The topic I should reply to is 'wdzn/50001000/Register/0/1'

But now I can't find a way how to reply to it

How should I write my code to handle this logic correctly?

0

There are 0 best solutions below