In a customized IntegrationFlow, I use MappingJackson2MessageConverter for subscriptions to generate corresponding POJOs from JSON. This works perfectly so far.
However, some topics are retained and can be deleted. This currently leads to an exception:
ERROR org.springframework.integration.handler.LoggingHandler [MQTT Call: ...]
...
Caused by: org.springframework.integration.transformer.MessageTransformationException: failed to transform message, failedMessage=GenericMessage [payload=byte[0], headers={...}]
at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:119)
...
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: No content to map due to end-of-input
at [Source: (byte[])""; line: 1, column: 0], failedMessage=GenericMessage [payload=byte[0], headers={...}]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235)
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)
...
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
at [Source: (byte[])""; line: 1, column: 0]
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4916)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4818)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3866)
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:221)
...
Is there any way to intercept this case and process the deleted topic from the header? I could easily live with getting null as a result. However, as this is not possible with messaging because it would terminate the flow, I am looking for an alternative solution. Would routeByException() be an option or is there a better solution?
My setup looks more or less like this:
<T> IntegrationFlowRegistration subscriber(
final MqttConnectionOptions options,
final String id,
final String topic,
final Class<T> type,
final Class<?> view,
final GenericHandler<T> handler
) {
final var adapter = new Mqttv5PahoMessageDrivenChannelAdapter(options, id, topic);
final IntegrationFlowBuilder builder = IntegrationFlow
.from(adapter)
.transform(new PojoTransformer<>(type, view))
.handle(handler);
return this.flowContext.registration(builder.get()).register();
}
public class PojoTransformer<T> extends AbstractTransformer {
private final Class<T> type;
private final Class<?> view;
public PojoTransformer(
final Class<T> type,
final Class<?> view
) {
this.type = type;
this.view = view;
}
@Override
protected Object doTransform(final Message<?> message) {
// final Object o = message.getPayload();
// if (o instanceof final byte[] bytes && bytes.length == 0 || o instanceof final String s && s.isBlank()) {
// return null;
// }
return new MappingJackson2MessageConverter().fromMessage(message, this.type, this.view);
}
}
I came finally up with basically something like this:
Nevertheless, I am still interested in other and possibly more elegant solutions.