Access message payload after transaction commit

25 Views Asked by At

I'm trying to get a better understanding of transaction management with Spring Integration and a JMS inbound.

I read the docs and found other posts pertaining post-commit action for file polling, but so far I haven't been able to do something similar with JMS.

If I define the rudimentary "read in - write out" following flow:

@Bean
public IntegrationFlow inOutFlow() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination(new ActiveMQQueue("inQueue")))
            .handle(Jms.outboundAdapter(connectionFactory)
                    .destination("outQueue"))
            .get();
}

How do I get to operate on the SI message after its contents are sent to the outbound queue?

I added a transactionManager to the flow:

@Bean
public IntegrationFlow inOutFlow() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination(new ActiveMQQueue("inQueue"))
                    .configureListenerContainer(c -> c.sessionTransacted(true)
                            .transactionManager(jmsTransactionManager)))
            .handle(Jms.outboundAdapter(connectionFactory).destination("outQueue"))
            .get();

@Bean
public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
    var manager = new JmsTransactionManager(connectionFactory);
    manager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return manager;
}

The file-polling example uses a TransactionSynchronizationFactory with a ExpressionEvaluatingTransactionSynchronizationProcessor under the hood, but I see no similar option for the configureListenerContainer method.

EDIT

I introduced a pollable channel in between, as suggested in this post:

@Bean
public IntegrationFlow inFlow() {
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination(new ActiveMQQueue("inQueue")))
            .channel("queueChannel")
            .get();
}

@Bean
public MessageChannel queueChannel() {
    return MessageChannels.queue().get();
}

@Bean
public IntegrationFlow outFlow() {
    return IntegrationFlows.from("queueChannel")
                    .handle(Jms.outboundAdapter(connectionFactory)
                               .destination("outQueue"))
}

When the message is sent to the intermediary channel, a new thread is started, the ActiveMQ message is fully consumed and removed from its queue, but its contents are still available in the integration message for further processings.

1

There are 1 best solutions below

2
Artem Bilan On

There is no transaction synchronization support on MessageProducerSupport endpoint. The file example is based on a polling channel adapter and its logic is slightly different.

On of the approach is to have a custom AbstractRequestHandlerAdvice on that Jms.outboundAdapter() to do:

if (this.transactionSynchronizationFactory != null && resource != null &&
            TransactionSynchronizationManager.isActualTransactionActive()) {

        TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(resource);
        if (synchronization != null) {
            TransactionSynchronizationManager.registerSynchronization(synchronization);

before calling doInvoke(). So, this way you will be able to attach your synchronization to existing JMS transaction.

Another way is to try to use a @TransactionalEventListener: https://docs.spring.io/spring-framework/reference/data-access/transaction/event.html