I implemented the necessary functionality on consumer and it works fine.
@Service
@RequiredArgsConstructor
public class RabbitMQConsumer {
private static final Logger logger = LoggerFactory.getLogger(RabbitMQConsumer.class);
private final EventHandlerStrategy handlerStrategy;
@RabbitListener(queues = "#{vehicleQueue.name}")
public void consumePayload(@Payload String encodedMessage,
@Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String receivedRoutingKey){
try {
String payload = new String(Base64.getDecoder().decode(encodedMessage)).trim(); // **1. Decoding string message**
JsonObject jsonMessage = JsonParser.parseString(payload).getAsJsonObject(); // **2. Parsing JSON string to json object**
String prefixRoutingKey = receivedRoutingKey.split("\\.")[0];
VehicleType vehicleType = VehicleProfile // **3. Getting vehicle type from inbound service**
.getTopicConverter(prefixRoutingKey)
.map(converter -> converter.convert(prefixRoutingKey))
.orElseThrow(() -> new MessageException(String.format("Not found vehicle type '%s'", prefixRoutingKey)));
jsonMessage.addProperty("vehicleType", vehicleType.name());
int type = jsonMessage.get("fuelTypeCode").getAsInt();
handlerStrategy.getEventStrategy(type).ifPresentOrElse(iEventHandler -> iEventHandler.consumeEvent(jsonMessage), // **4. send event**
() -> logger.error("There are no event handler found '{}'", type));
} catch (IllegalArgumentException e){ // **5. Error handing**
logger.debug("[{}] Failed to decode base64 string", encodedMessage);
} catch (JsonSyntaxException e){
logger.error("an error occurred while parsing event", e);
} catch (MessageException e) {
logger.error("Event exception: ", e);
}
}
}
I'm not thrilled with how the implementation looks. Too many different responsibilities in one place. Its looks messy and it will be hard to tests all of this.
Please see the code comment lines:
- Decoding string message
- Parsing JSON string to json object
- Getting vehicle type from inbound service
- Send event
- Error handing
My question is how to correctly split all of this ? Is there any patterns, practices , books or article to avoid that mess in the future ?
Is there a good idea to move decoding and parsing into Spring AMQP Message Converters or interseptors and get in listener ready JsonObject ? How can i implement this and how to handle error ?
@RabbitListener(queues = "#{vehicleQueue.name}")
public void consumePayload(@Payload JsonObject encodedMessage)
Yes, you can encapsulate that logic (up to
jsonMessage.addProperty("vehicleType", vehicleType.name());) into a customMessageConverter. The@RabbitListenerhas specific attribute on the matter:The error handling also can be done separately. See respective attribute on the
@RabbitListener:In the end your code would indeed look just like that:
More info in docs: https://docs.spring.io/spring-amqp/reference/amqp/receiving-messages/async-annotation-driven.html