Spring Retry Recovery attached to a service activator is not part of the processing chain

51 Views Asked by At

I am using a spring integration chain of service activators to process an incoming message from a queue. One of the service activators messagePersister that persists the incoming message. If this service activator fails, there is a retry advice that tries the operation 3 more times. This part works fine, but if all retries fail, we have a recovery method that persists the message in alternate form (also triggers some notifications etc). This recovery method and the orginal persister method return objects of the same class that then need to be processed by the preprocessor - the next service activator in the chain. However, it looks like, using the recovery options causes the message to leave the chain, and the return object from the recovery service activator does not go down the chain. Similarly, if the recovery method throws an exception, it does not go to the redboxExceptionChannel which is the exception for the adapter that is listening to the incoming queue.

 <int-jms:message-driven-channel-adapter
    id="inputChannelAdapter"
    connection-factory="jmsConnectionFactory"
    destination-name="REDBOX_IN"
    channel="redboxIncomingChannel"
    max-concurrent-consumers="1"
    auto-startup="true"
    acknowledge="transacted"
    receive-timeout="20000"
    error-channel="redboxExceptionChannel"/>

  <int:chain id="redboxIncomingChannelProcessingChain"
    input-channel="redboxIncomingChannel"
    output-channel="redboxOutgoingMessageChannel">
    <int:service-activator ref="messagePersister"
      method="persistAndAddClientMessageIdToHeader">
      <int:request-handler-advice-chain>
        <int:retry-advice max-attempts="4" recovery-channel="persistenceRetriesExhaustedChannel" >
          <int:exponential-back-off initial="800"
            multiplier="3"
            maximum="25000"/>
        </int:retry-advice>
      </int:request-handler-advice-chain>
    </int:service-activator>
    <int:service-activator ref="redboxPreProcessor" method="validate"/>
    <int:service-activator ref="redboxProcessor" method="evaluateRules"/>
  </int:chain>

  <int:service-activator ref="messagePersister"
    method="retriesExhausted" input-channel="persistenceRetriesExhaustedChannel"  />

I was expecting the recovery method to be part of the chain that triggered the retries.

2

There are 2 best solutions below

3
Artem Bilan On BEST ANSWER

The behavior is correct. The ErrorMessageSendingRecoverer has the logic like this:

@Override
public Object recover(RetryContext context) {
    publish(context.getLastThrowable(), context);
    return null;
}

So, it just does not return. There is no knowledge at that point that your service activator is reply producing.

You can fix the problem this way:

add a <gateway> at that point and extract your service-activator with retry into an independent component with an input-channel as the request-channel from the mentioned gateway.

Then your messagePersister.retriesExhausted must look into a MessagingException.failedMessage to copy its headers before returning from this method. This way the replyChannel will be present and endpoint would know where to send a result of your method. This replyChannel is where that gateway is waiting for reply. So, you got a normal reply from original service activator and compensation one from persistenceRetriesExhaustedChannel subscriber.

UPDATE

Regarding errors from the recoverer sub-flow. According to my testing it works as expected:

@SpringBootApplication
public class So78089892Application {

    public static void main(String[] args) {
        SpringApplication.run(So78089892Application.class, args);
    }

    @Bean
    ApplicationRunner sendToJms(JmsTemplate jmsTemplate) {
        return args -> jmsTemplate.convertAndSend("REDBOX_IN", "test data");
    }

    @Bean
    JmsMessageDrivenChannelAdapterSpec<?> inputChannelAdapter(ConnectionFactory jmsConnectionFactory) {
        return Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
                .destination("REDBOX_IN")
                .outputChannel("redboxIncomingChannel")
                .errorChannel("redboxExceptionChannel");
    }

    @ServiceActivator(inputChannel = "redboxExceptionChannel")
    void handleErrors(Exception exception) {
        System.out.println("Error Received: \n" + exception);
    }

    @Bean
    IntegrationFlow redboxIncomingChannelProcessingChain(RequestHandlerRetryAdvice retryAdvice) {
        return IntegrationFlow
                .from("redboxIncomingChannel")
                .gateway((subFlow) -> subFlow
                        .handle((p, h) -> {
                            throw new RuntimeException("Persistence failed");
                        }), e -> e.advice(retryAdvice))
                .get();
    }

    @Bean
    RequestHandlerRetryAdvice retryAdvice(MessageChannel persistenceRetriesExhaustedChannel) {
        RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
        requestHandlerRetryAdvice.setRecoveryCallback(
                new ErrorMessageSendingRecoverer(persistenceRetriesExhaustedChannel));
        return requestHandlerRetryAdvice;

    }

    @Bean
    DirectChannel persistenceRetriesExhaustedChannel() {
        return new DirectChannel();
    }

    @ServiceActivator(inputChannel = "persistenceRetriesExhaustedChannel")
    void retriesExhausted(Exception exception) {
        throw new RuntimeException("Cannot recover", exception);
    }

}

As you see in the last retriesExhausted() method I deliberately throw some exception based on the one coming from just failed handler with retry advice.

In the end I got logs from that handleErrors() method like this:

Error Received: 
org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@53b41cc8], failedMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to handle, failedMessage=GenericMessage [payload=test data, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=ActiveMQQueue[REDBOX_IN], id=5ff5cbc6-f585-c113-a9c3-40a741d0cc7f, priority=4, jms_timestamp=1709326985025, jms_messageId=ID:18fa9c02-d80f-11ee-8409-00155d933a76, timestamp=1709326985042}], headers={id=9eb9a300-d058-da1c-8315-432c2ae0cb34, timestamp=1709326985055}]

(Sorry for Java DSL variant: I haven't worked with XML config for a while).

We might have some difference in the configuration. For example, your persistenceRetriesExhaustedChannel is not a DirectChannel...

0
VPN236 On

Accepted Artem's answer. My final implementaiton looks somewhat like this

<int-jms:message-driven-channel-adapter
  id="inputChannelAdapter"
  connection-factory="jmsConnectionFactory"
  destination-name="REDBOX_IN"
  channel="redboxIncomingChannel"
  max-concurrent-consumers="1"
  auto-startup="true"
  acknowledge="transacted"
  receive-timeout="20000"
  error-channel="redboxExceptionChannel"/>

<int:chain id="redboxIncomingChannelProcessingChain"
  input-channel="redboxIncomingChannel"
  output-channel="redboxOutgoingMessageChannel">
    <int:gateway request-channel="persisterChannel" />
    <!-- the response goes to the next activator --> 
    <int:service-activator ref="messagePersister"
      method="checkPersistenceSuccessful"> 
    <int:service-activator ref="redboxPreProcessor" method="validate"/>
    <int:service-activator ref="redboxProcessor" method="evaluateRules"/>
</int:chain>

<int:chain input-channel="persisterChannel">
  <int:service-activator ref="messagePersister"
    method="persistAndAddClientMessageIdToHeader">
    <int:request-handler-advice-chain>
      <int:retry-advice max-attempts="4" recovery- channel="persistenceRetriesExhaustedChannel" >
      <int:exponential-back-off initial="800"
        multiplier="3"
        maximum="25000"/>
      </int:retry-advice>
    </int:request-handler-advice-chain>
  </int:service-activator>
</int:chain>

<int:service-activator ref="messagePersister"
method="retriesExhausted" input-channel="persistenceRetriesExhaustedChannel"  />

If the persistAndAddClientMessageIdToHeader in messagePersister fails 4 times (max attempts) then the error is sent to the recovery channel persistenceRetriesExhaustedChannel and in the corresponding service activator method retriesExhausted we are able to either successfully process the message using an alternate mechanism, or we give up and propagate the exception (but don't throw it yet). The copy headers in the recovery method ensures that the results of the recovery are sent to the checkPersistenceSuccessful method and that then throws an exception if needed. We had to do this because if the recovery method threw the exception, the exception.getFailedMessage() returns the ErrorMessage that was sent to the recovery method and not the original message.

public Message<Client> retriesExhausted(MessagingException exception) {
  // TRY recovery - if successful add correct header
  return MessageBuilder.withPayload((Client)exception.getFailedMessage().getPayload())
    .copyHeaders(exception.getFailedMessage().getHeaders())
    .setHeader(CLIENT_ID,exception)
    .build();
    
// else add the exception to the header and return the payload that failed
return MessageBuilder.withPayload((Client) exception.getFailedMessage().getPayload())
    .copyHeaders(exception.getFailedMessage().getHeaders())
    .setHeader(SPRING_RETRIES_EXHAUSTED,exception)
    .build();}

// this service activator throws the exception if needed, else, sends the 
// message down the chain
public Message<Client> checkPeristenceSuccessful(Message<Client> message){ 
  MessagingException messagingException;
if((messagingException = message.getHeaders().get(SPRING_RETRIES_EXHAUSTED,MessagingException.class)) !=null)
{
  throw new ClientException(messagingException.getCause());
}
return message;}