I'm trying to get a better understanding of transaction management with Spring Integration and a JMS inbound.
I read the docs and found other posts pertaining post-commit action for file polling, but so far I haven't been able to do something similar with JMS.
If I define the rudimentary "read in - write out" following flow:
@Bean
public IntegrationFlow inOutFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(new ActiveMQQueue("inQueue")))
.handle(Jms.outboundAdapter(connectionFactory)
.destination("outQueue"))
.get();
}
How do I get to operate on the SI message after its contents are sent to the outbound queue?
I added a transactionManager to the flow:
@Bean
public IntegrationFlow inOutFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(new ActiveMQQueue("inQueue"))
.configureListenerContainer(c -> c.sessionTransacted(true)
.transactionManager(jmsTransactionManager)))
.handle(Jms.outboundAdapter(connectionFactory).destination("outQueue"))
.get();
@Bean
public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
var manager = new JmsTransactionManager(connectionFactory);
manager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return manager;
}
The file-polling example uses a TransactionSynchronizationFactory with a ExpressionEvaluatingTransactionSynchronizationProcessor under the hood, but I see no similar option for the configureListenerContainer method.
EDIT
I introduced a pollable channel in between, as suggested in this post:
@Bean
public IntegrationFlow inFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
.destination(new ActiveMQQueue("inQueue")))
.channel("queueChannel")
.get();
}
@Bean
public MessageChannel queueChannel() {
return MessageChannels.queue().get();
}
@Bean
public IntegrationFlow outFlow() {
return IntegrationFlows.from("queueChannel")
.handle(Jms.outboundAdapter(connectionFactory)
.destination("outQueue"))
}
When the message is sent to the intermediary channel, a new thread is started, the ActiveMQ message is fully consumed and removed from its queue, but its contents are still available in the integration message for further processings.
There is no transaction synchronization support on
MessageProducerSupportendpoint. The file example is based on a polling channel adapter and its logic is slightly different.On of the approach is to have a custom
AbstractRequestHandlerAdviceon thatJms.outboundAdapter()to do:before calling
doInvoke(). So, this way you will be able to attach your synchronization to existing JMS transaction.Another way is to try to use a
@TransactionalEventListener: https://docs.spring.io/spring-framework/reference/data-access/transaction/event.html