In the current Spring Boot environment with ActiveMQ, we have to read the messages from the queue via 'pulling'. When I put the jmsTemplate.receive() within an transaction, I expected that any exception would put the message back on the queue. Alas, when invoking the next jmsTemplate.receive the next message is read.
How to have the message read by jmsTemplate.receive() put back on the queue in case of specific Exceptions? In the ActiveMQ desktop I can see that the message is not put on the Deadletter queue.
My test configuration is first set via this class:
@Configuration
@EnableJms
public class JmsReceiverConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = activeMQConnectionFactory();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory( activeMQConnectionFactory);
return connectionFactory;
}
@Bean
public PlatformTransactionManager transactionManager() {
JmsTransactionManager transactionManager = new JmsTransactionManager();
transactionManager.setConnectionFactory(activeMQConnectionFactory());
return transactionManager;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerUrl);
activeMQConnectionFactory.setTrustAllPackages(true);
return activeMQConnectionFactory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
}
The Spring Boot application has transactions enabled.
@SpringBootApplication
@EnableTransactionManagement
public class CamelMicroserviceAApplication implements CommandLineRunner {
Logger logger = LoggerFactory.getLogger(CamelMicroserviceAApplication.class);
@Autowired
private ProcessingBean processingBean;
public static void main(String[] args) {
SpringApplication.run(CamelMicroserviceAApplication.class, args);
}
@Override
public void run(String... args) {
processingBean.startProcessingMessages();
}
}
For testing the transaction management for the receiving, this processingBean is:
@Component
public class ProcessingBean {
Logger logger = LoggerFactory.getLogger(ProcessingBean.class);
private int maxRunningCounter = 30;
private ProcessOneMessage processOneMessage;
@Autowired
public ProcessingBean(ProcessOneMessage processOneMessage) {
this.processOneMessage = processOneMessage;
}
public void startProcessingMessages() {
while (maxRunningCounter-- > 0) {
logger.info("Going to process a message");
try {
Thread.sleep(1000);
processOneMessage.processingMessage();
} catch (FailingMessageProcessingException e) {
logger.info("Failed message processing exception. Message should stay on the queue.. ");
} catch (InterruptedException e) {
logger.error("Interrupted!");
Thread.currentThread().interrupt();
}
}
}
}
The ProcessOneMessage is transactional and is using the jmsTemplate.receive to read the message from the queue:
@Component
public class ProcessOneMessage {
Logger logger = LoggerFactory.getLogger(ProcessOneMessage.class);
public static final String ENTRY_QUEUE = "entry-queue";
public static final String SUCCESS_QUEUE = "success-queue";
private JmsTemplate jmsTemplate;
private int invocationCounter = 0;
@Autowired
public ProcessOneMessage(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
@Transactional
public void processingMessage() throws FailingMessageProcessingException {
invocationCounter++;
Message message = jmsTemplate.receive( ENTRY_QUEUE);
logger.info( "Incoming message {} with invocation {}", message, invocationCounter);
switch( invocationCounter % 3) {
case 0:
throw new FailingMessageProcessingException( "Simulating an error with invocation " + invocationCounter);
case 1,2:
jmsTemplate.convertAndSend(SUCCESS_QUEUE, "test success queue");
}
}
}
Via Apache Camel messages are generated and sent to the ENTRY_QUEUE.
When looking at the processed messages: I can see that the message processing with the Exception is not put back on the queue. That's what I expected, similar t onMessage().
My testing environment is setup via Apache Camel:
@Component
public class PullMessageInfra extends RouteBuilder {
@Override
public void configure() throws Exception {
AtomicInteger messageNumber = new AtomicInteger(0);
errorHandler( deadLetterChannel( "activemq:dead-letter-queue"));
from("timer:active-mq-timer?period=3000")
.transform(new Expression() {
public <T> T evaluate(Exchange exchange, Class<T> type) {
return (T) ("Message_" + messageNumber.incrementAndGet());
}
})
.to("activemq:entry-queue");
from( "activemq:success-queue")
.to( "log:success-queue");
from( "activemq:dead-letter-queue")
.to( "log:dead-letter-queue");
}
}