Spring integration DSL lastTime not updating on error

159 Views Asked by At

I'm totally new at Spring integration DSL stuff, what I'm trying to do is fetch all the RSS feeds from mongo, register them all into the flow context and have that process all the articles fetched from the feeds. Currently I have a for-each loop that calls a function containing the following:

IntegrationFlow newFlow = IntegrationFlows
            .from(Feed.inboundAdapter(new URL(url), source), e -> e.id(org + "-" + source + "-feed").poller(Pollers.fixedDelay(poll).maxMessagesPerPoll(maxMessages).errorChannel("feedErrors")))
            .enrichHeaders(h -> h.header("sourceId", sourceId))
            .enrichHeaders(h -> h.header("sourceName", sourceName))
            .enrichHeaders(h -> h.header("source", source))
            .enrichHeaders(h -> h.header("categories", categories))
            .enrichHeaders(h -> h.header("org", org))
            .channel(MessageChannels.executor(taskExecutor))
            .handle("enricher", "enhance")
            .channel(news())
            .get();
        
this.flowContext.registration(newFlow).id(org + "-" + source + "-flow").register();

It does so splendidly with the exception of exceptions, funnily enough. When a feed is currently unavailable, has been renamed or the article from the feed itself is malformed an exception is thrown and it goes to the "feedErrors" channel as indicated in the flow definition. That part looks as following:

@Component
public class RssFeedErrorHandler {

    private final Logger logger = LoggerFactory.getLogger(RssFeedErrorHandler.class);

    @Bean
    public IntegrationFlow errorSender() {
        return IntegrationFlows.from("feedErrors")
            .handle("rssFeedErrorHandler", "handleError")
            .get();
    }

    public void handleError(Message<MessagingException> message) {
        logger.error(message.toString());
    }
}

Exception examples:

AdviceMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen, feedResource=null, metadataKey='google-news-politics.https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen', lastTime=1613665200000}'; nested exception is java.io.FileNotFoundException: https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen was successful, headers={id=101f3da9-42de-eb03-0da2-015952fc0a23, timestamp=1613667428995}, inputMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen, feedResource=null, metadataKey='google-news-politics.https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen', lastTime=1613665200000}'; nested exception is java.io.FileNotFoundException: https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen, headers={id=8a470a5f-4027-371c-44fd-1250703077b0, timestamp=1613667428995}]]
AdviceMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://www.startupbootcamp.org/feed/, feedResource=null, metadataKey='startupbootcamp-startups.https://www.startupbootcamp.org/feed/', lastTime=1613015630000}'; nested exception is java.net.ConnectException: Operation timed out (Connection timed out) was successful, headers={id=6b5a1a00-905a-68b6-128b-4cdd8eb06737, timestamp=1613664837150}, inputMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://www.startupbootcamp.org/feed/, feedResource=null, metadataKey='startupbootcamp-startups.https://www.startupbootcamp.org/feed/', lastTime=1613015630000}'; nested exception is java.net.ConnectException: Operation timed out (Connection timed out), headers={id=70ef8ae7-1b75-9e38-e631-c88777741609, timestamp=1613664837150}]]
AdviceMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms, feedResource=null, metadataKey='india-times-entertainment.https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms', lastTime=1613715922000}'; nested exception is com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: Character reference "&#55357" is an invalid XML character. was successful, headers={id=fb936878-6450-d79a-851a-ca1ceeef8182, timestamp=1613705658735}, inputMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms, feedResource=null, metadataKey='india-times-entertainment.https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms', lastTime=1613715922000}'; nested exception is com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: Character reference "&#55357" is an invalid XML character., headers={id=24f2a11a-80a6-6c13-dbe1-9548747174aa, timestamp=1613705658734}]]
AdviceMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://www.jeffbullas.com/feed/, feedResource=null, metadataKey='jeff-bullas-marketing.https://www.jeffbullas.com/feed/', lastTime=1613574000000}'; nested exception is java.io.IOException: Server returned HTTP response code: 500 for URL: https://www.jeffbullas.com/feed/ was successful, headers={id=e6998e35-afdc-b476-c2d9-2f4b0cc46c6b, timestamp=1613710313323}, inputMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://www.jeffbullas.com/feed/, feedResource=null, metadataKey='jeff-bullas-marketing.https://www.jeffbullas.com/feed/', lastTime=1613574000000}'; nested exception is java.io.IOException: Server returned HTTP response code: 500 for URL: https://www.jeffbullas.com/feed/, headers={id=420f9b5d-1c58-cd0a-c4e2-5d640578f1c9, timestamp=1613710313322}]]

Here's two full stacks:

org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms, feedResource=null, metadataKey='india-times-entertainment.https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms', lastTime=-1}'; nested exception is com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: Character reference "&#55357" is an invalid XML character.
        at org.springframework.integration.feed.inbound.FeedEntryMessageSource.getFeed(FeedEntryMessageSource.java:239)
        at org.springframework.integration.feed.inbound.FeedEntryMessageSource.populateEntryList(FeedEntryMessageSource.java:202)
        at org.springframework.integration.feed.inbound.FeedEntryMessageSource.doReceive(FeedEntryMessageSource.java:177)
        at org.springframework.integration.feed.inbound.FeedEntryMessageSource.doReceive(FeedEntryMessageSource.java:58)
        at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:167)
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:359)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: Character reference "&#55357" is an invalid XML character.
        at com.rometools.rome.io.WireFeedInput.build(WireFeedInput.java:236)
        at com.rometools.rome.io.SyndFeedInput.build(SyndFeedInput.java:150)
        at org.springframework.integration.feed.inbound.FeedEntryMessageSource.getFeed(FeedEntryMessageSource.java:226)
        ... 20 more
Caused by: org.jdom2.input.JDOMParseException: Error on line 1: Character reference "&#55357" is an invalid XML character.
        at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:232)
        at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:303)
        at org.jdom2.input.SAXBuilder.build(SAXBuilder.java:1196)
        at com.rometools.rome.io.WireFeedInput.build(WireFeedInput.java:233)
        ... 22 more
Caused by: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 15681; Character reference "&#55357" is an invalid XML character.
        at org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown Source)
        at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown Source)
        at org.apache.xerces.impl.XMLScanner.scanCharReferenceValue(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanCharReference(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
        at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
        at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source)
        at org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser.parse(Unknown Source)
        at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:217)
        ... 25 more


org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=http://www.independent.co.uk/news/uk/politics/rss, feedResource=null, metadataKey='independent-politics.http://www.independent.co.uk/news/uk/politics/rss', lastTime=1613726122000}'; nested exception is com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: DOCTYPE is disallowed when the feature "http://apache.org/xml/features/disallow-doctype-decl" set to true.
        at org.springframework.integration.feed.inbound.FeedEntryMessageSource.getFeed(FeedEntryMessageSource.java:239)
        at org.springframework.integration.feed.inbound.FeedEntryMessageSource.populateEntryList(FeedEntryMessageSource.java:202)
        at org.springframework.integration.feed.inbound.FeedEntryMessageSource.doReceive(FeedEntryMessageSource.java:177)
        at org.springframework.integration.feed.inbound.FeedEntryMessageSource.doReceive(FeedEntryMessageSource.java:58)
        at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:167)
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:359)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: DOCTYPE is disallowed when the feature "http://apache.org/xml/features/disallow-doctype-decl" set to true.
        at com.rometools.rome.io.WireFeedInput.build(WireFeedInput.java:236)
        at com.rometools.rome.io.SyndFeedInput.build(SyndFeedInput.java:150)
        at org.springframework.integration.feed.inbound.FeedEntryMessageSource.getFeed(FeedEntryMessageSource.java:226)
        ... 20 more
Caused by: org.jdom2.input.JDOMParseException: Error on line 1: DOCTYPE is disallowed when the feature "http://apache.org/xml/features/disallow-doctype-decl" set to true.
        at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:232)
            at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:303)
        at org.jdom2.input.SAXBuilder.build(SAXBuilder.java:1196)
        at com.rometools.rome.io.WireFeedInput.build(WireFeedInput.java:233)
            ... 22 more
Caused by: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 10; DOCTYPE is disallowed when the feature "http://apache.org/xml/features/disallow-doctype-decl" set to true.
        at org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown Source)
        at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
        at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentScannerImpl$PrologDispatcher.dispatch(Unknown Source)
        at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
        at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
        at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source)
        at org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser.parse(Unknown Source)
        at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:217)
        ... 25 more

So the error is thrown, it's logged and life goes on, except for some reason that poll fetch seems to be saved for a retry. This wouldn't be an issue if it retried it once, but it just keeps retrying over and over, and when enough of errors pile up it just keeps retrying those errors and nothing new is ever fetched.

For example, 500 feeds are polled, 10 of those throw an exception where the feed has been either moved so it no longer exists, it has bad xml or something else might have happened. Now the poller is stuck polling those 10 feeds over and over again until it no longer fails (which never happens). I remember reading somewhere that if exception is thrown in handleError function then it would keep retrying, but if no exception is thrown there it should move on. That doesn't seem to be the case here. I've spend 3 days now mulling over this, trying different solutions but It always just chokes up after about 4-10 minutes, depending on how long it takes to fetch enough feeds with errors.

1

There are 1 best solutions below

9
Artem Bilan On

I would say that you try to talk about many different exceptions: not available, malformed, processing error etc. I think the best practice is to distinguish those exceptions and made an appropriate business decision for each of them: we definitely can't treat "not available" as processing error and must retry until it is available back.

For the processing one I would suggest to look into a ExpressionEvaluatingRequestHandlerAdvice on the service where your process entries of the feed: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain

For malformed source I'd suggest to take a look into a flow removal form the context. So, you won't try to poll a wrong source any more. You definitely can do that from the error flow on that feedErrors channel.

And so on for other errors you face with this scenario.

We probably may revise your 10 are of those throw an exception where the feeds have been removed for whatever reason in more details since the logic in the FeedEntryMessageSource is based on the updatedDate of the entry. So, if we couldn't convert it properly and produce, it is really going to be pulled again next time since the lastTime state has not been updated appropriately. But let's do that in the separate SO thread with much more details to investigate!

UPDATE

Some thoughts regarding your stack traces.

The exception like org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 15681; Character refer is not good and must be treated as fatal. You definitely can't parse that RSS source any more or at least until it comes back to normal. This kind of exception could be handled like a stop() for that Feed.inboundAdapter(). So, you catch an exception in the feedErrors channel flow and call the stop() of caused endpoint. Or even better just remove the whole dynamic flow for this RSS source.

The exception org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 10; DOCTYPE is disallowed when the feature "http://apache.org/xml/features/disallow-doctype-decl" set to true. is also not good and probably could be treated the same fatal way with subsequent stop() for the endpoint. Or you can investigate the xerces library how to allow that DTD declaration. Probably the option like syndFeedInput(SyndFeedInput) could also give you some hooks for the XML parsing. For example I see it has this option:

/**
 * Since ROME 1.5.1 we fixed a security vulnerability by disallowing Doctype declarations by default. 
 * This change breaks the compatibility with at least RSS 0.91N because it requires a Doctype declaration. 
 * You are able to allow Doctype declarations again with this property. You should only activate it 
 * when the feeds that you process are absolutely trustful. 
 *  
 * @param allowDoctypes true when Doctype declarations should be allowed again, false otherwise
 */
public void setAllowDoctypes(boolean allowDoctypes) {