Spring ActiveMQ - expecting jmsTemplate.receive() to be transactional

83 Views Asked by At

In the current Spring Boot environment with ActiveMQ, we have to read the messages from the queue via 'pulling'. When I put the jmsTemplate.receive() within an transaction, I expected that any exception would put the message back on the queue. Alas, when invoking the next jmsTemplate.receive the next message is read.

How to have the message read by jmsTemplate.receive() put back on the queue in case of specific Exceptions? In the ActiveMQ desktop I can see that the message is not put on the Deadletter queue.

My test configuration is first set via this class:

@Configuration
@EnableJms
public class JmsReceiverConfig {

  @Value("${spring.activemq.broker-url}")
  private String brokerUrl;

  @Bean
  public JmsTemplate jmsTemplate() {
    JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
    jmsTemplate.setSessionTransacted(true);
    return jmsTemplate;
  }

  @Bean
  public CachingConnectionFactory cachingConnectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = activeMQConnectionFactory();
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory( activeMQConnectionFactory);
    return connectionFactory;
  }

  @Bean
  public PlatformTransactionManager transactionManager() {
    JmsTransactionManager transactionManager = new JmsTransactionManager();
    transactionManager.setConnectionFactory(activeMQConnectionFactory());
    return transactionManager;
  }

  @Bean
  public ActiveMQConnectionFactory activeMQConnectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL(brokerUrl);
    activeMQConnectionFactory.setTrustAllPackages(true);
    return activeMQConnectionFactory;
  }

  @Bean
  public MessageConverter jacksonJmsMessageConverter() {
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setTargetType(MessageType.TEXT);
    converter.setTypeIdPropertyName("_type");
    return converter;
  }
}

The Spring Boot application has transactions enabled.

@SpringBootApplication
@EnableTransactionManagement
public class CamelMicroserviceAApplication implements CommandLineRunner {

  Logger logger = LoggerFactory.getLogger(CamelMicroserviceAApplication.class);

  @Autowired
  private ProcessingBean processingBean;

  public static void main(String[] args) {
    SpringApplication.run(CamelMicroserviceAApplication.class, args);
  }

  @Override
  public void run(String... args) {
    processingBean.startProcessingMessages();
  }
}

For testing the transaction management for the receiving, this processingBean is:

@Component
public class ProcessingBean {
  Logger logger = LoggerFactory.getLogger(ProcessingBean.class);
  private int maxRunningCounter = 30;
  private ProcessOneMessage processOneMessage;

  @Autowired
  public ProcessingBean(ProcessOneMessage processOneMessage) {
    this.processOneMessage = processOneMessage;
  }

  public void startProcessingMessages() {
    while (maxRunningCounter-- > 0) {
      logger.info("Going to process a message");
      try {
        Thread.sleep(1000);
        processOneMessage.processingMessage();
      } catch (FailingMessageProcessingException e) {
        logger.info("Failed message processing exception. Message should stay on the queue.. ");
      } catch (InterruptedException e) {
        logger.error("Interrupted!");
        Thread.currentThread().interrupt();
      }
    }

  }
}

The ProcessOneMessage is transactional and is using the jmsTemplate.receive to read the message from the queue:

@Component
public class ProcessOneMessage {
  Logger logger = LoggerFactory.getLogger(ProcessOneMessage.class);

  public static final String ENTRY_QUEUE = "entry-queue";
  public static final String SUCCESS_QUEUE = "success-queue";

  private JmsTemplate jmsTemplate;
  private int invocationCounter = 0;

  @Autowired
  public ProcessOneMessage(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
  }

  @Transactional
  public void processingMessage() throws FailingMessageProcessingException {
    invocationCounter++;
    Message message = jmsTemplate.receive( ENTRY_QUEUE);
    logger.info( "Incoming message {} with invocation {}", message, invocationCounter);
    switch( invocationCounter % 3) {
      case 0:
        throw new FailingMessageProcessingException( "Simulating an error with invocation " + invocationCounter);
      case 1,2:
        jmsTemplate.convertAndSend(SUCCESS_QUEUE, "test success queue");
    }
  }
}

Via Apache Camel messages are generated and sent to the ENTRY_QUEUE.

When looking at the processed messages: I can see that the message processing with the Exception is not put back on the queue. That's what I expected, similar t onMessage().

My testing environment is setup via Apache Camel:

@Component
public class PullMessageInfra extends RouteBuilder {

  @Override
  public void configure() throws Exception {
    AtomicInteger messageNumber = new AtomicInteger(0);
    errorHandler( deadLetterChannel( "activemq:dead-letter-queue"));
    from("timer:active-mq-timer?period=3000")
            .transform(new Expression() {
              public <T> T evaluate(Exchange exchange, Class<T> type) {
                return (T) ("Message_" + messageNumber.incrementAndGet());
              }
            })
            .to("activemq:entry-queue");

    from( "activemq:success-queue")
            .to( "log:success-queue");
    from( "activemq:dead-letter-queue")
            .to( "log:dead-letter-queue");
  }

}
0

There are 0 best solutions below