I use spring integration mqtt,I subscribed to a topic,I can receive its message.
However, this MQTT protocol is to reply to it after receiving it.
Forexample:
@Configuration
public class MqttIntegrationConfig {
@Autowired
MqttService mqttService;
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883", "testClient", "wdzn/#");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"});
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("wdzn/#");
return messageHandler;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
mqttService.sendToMqtt(message);
};
}
}
@Service
public class MqttService {
@Autowired
private MqttGateway mqttGateway;
public void sendToMqtt(Message<?> message) {
try {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
MqttBase mqttBase = new MqttBase(topic);
String sendTopic = mqttBase.getPrefix() + "/" + mqttBase.getCode() + "/" + mqttBase.getBusiness() + "/" + mqttBase.getMessageId() + "/" + "1";
mqttGateway.sendToMqtt("",sendTopic);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {
void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
When I receive the topic is 'wdzn/Online/50001000/Register/0', The topic I should reply to is 'wdzn/50001000/Register/0/1'
But now I can't find a way how to reply to it
How should I write my code to handle this logic correctly?