I'm totally new at Spring integration DSL stuff, what I'm trying to do is fetch all the RSS feeds from mongo, register them all into the flow context and have that process all the articles fetched from the feeds. Currently I have a for-each loop that calls a function containing the following:
IntegrationFlow newFlow = IntegrationFlows
.from(Feed.inboundAdapter(new URL(url), source), e -> e.id(org + "-" + source + "-feed").poller(Pollers.fixedDelay(poll).maxMessagesPerPoll(maxMessages).errorChannel("feedErrors")))
.enrichHeaders(h -> h.header("sourceId", sourceId))
.enrichHeaders(h -> h.header("sourceName", sourceName))
.enrichHeaders(h -> h.header("source", source))
.enrichHeaders(h -> h.header("categories", categories))
.enrichHeaders(h -> h.header("org", org))
.channel(MessageChannels.executor(taskExecutor))
.handle("enricher", "enhance")
.channel(news())
.get();
this.flowContext.registration(newFlow).id(org + "-" + source + "-flow").register();
It does so splendidly with the exception of exceptions, funnily enough. When a feed is currently unavailable, has been renamed or the article from the feed itself is malformed an exception is thrown and it goes to the "feedErrors" channel as indicated in the flow definition. That part looks as following:
@Component
public class RssFeedErrorHandler {
private final Logger logger = LoggerFactory.getLogger(RssFeedErrorHandler.class);
@Bean
public IntegrationFlow errorSender() {
return IntegrationFlows.from("feedErrors")
.handle("rssFeedErrorHandler", "handleError")
.get();
}
public void handleError(Message<MessagingException> message) {
logger.error(message.toString());
}
}
Exception examples:
AdviceMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen, feedResource=null, metadataKey='google-news-politics.https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen', lastTime=1613665200000}'; nested exception is java.io.FileNotFoundException: https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen was successful, headers={id=101f3da9-42de-eb03-0da2-015952fc0a23, timestamp=1613667428995}, inputMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen, feedResource=null, metadataKey='google-news-politics.https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen', lastTime=1613665200000}'; nested exception is java.io.FileNotFoundException: https://news.google.com/rss/topics/CAAqIQgKIhtDQkFTRGdvSUwyMHZNRFZ4ZERBU0FtVnVLQUFQAQ?hl=en-GB&gl=GB&ceid=GB%3Aen, headers={id=8a470a5f-4027-371c-44fd-1250703077b0, timestamp=1613667428995}]]
AdviceMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://www.startupbootcamp.org/feed/, feedResource=null, metadataKey='startupbootcamp-startups.https://www.startupbootcamp.org/feed/', lastTime=1613015630000}'; nested exception is java.net.ConnectException: Operation timed out (Connection timed out) was successful, headers={id=6b5a1a00-905a-68b6-128b-4cdd8eb06737, timestamp=1613664837150}, inputMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://www.startupbootcamp.org/feed/, feedResource=null, metadataKey='startupbootcamp-startups.https://www.startupbootcamp.org/feed/', lastTime=1613015630000}'; nested exception is java.net.ConnectException: Operation timed out (Connection timed out), headers={id=70ef8ae7-1b75-9e38-e631-c88777741609, timestamp=1613664837150}]]
AdviceMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms, feedResource=null, metadataKey='india-times-entertainment.https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms', lastTime=1613715922000}'; nested exception is com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: Character reference "�" is an invalid XML character. was successful, headers={id=fb936878-6450-d79a-851a-ca1ceeef8182, timestamp=1613705658735}, inputMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms, feedResource=null, metadataKey='india-times-entertainment.https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms', lastTime=1613715922000}'; nested exception is com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: Character reference "�" is an invalid XML character., headers={id=24f2a11a-80a6-6c13-dbe1-9548747174aa, timestamp=1613705658734}]]
AdviceMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://www.jeffbullas.com/feed/, feedResource=null, metadataKey='jeff-bullas-marketing.https://www.jeffbullas.com/feed/', lastTime=1613574000000}'; nested exception is java.io.IOException: Server returned HTTP response code: 500 for URL: https://www.jeffbullas.com/feed/ was successful, headers={id=e6998e35-afdc-b476-c2d9-2f4b0cc46c6b, timestamp=1613710313323}, inputMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://www.jeffbullas.com/feed/, feedResource=null, metadataKey='jeff-bullas-marketing.https://www.jeffbullas.com/feed/', lastTime=1613574000000}'; nested exception is java.io.IOException: Server returned HTTP response code: 500 for URL: https://www.jeffbullas.com/feed/, headers={id=420f9b5d-1c58-cd0a-c4e2-5d640578f1c9, timestamp=1613710313322}]]
Here's two full stacks:
org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms, feedResource=null, metadataKey='india-times-entertainment.https://timesofindia.indiatimes.com/rssfeeds/1081479906.cms', lastTime=-1}'; nested exception is com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: Character reference "�" is an invalid XML character.
at org.springframework.integration.feed.inbound.FeedEntryMessageSource.getFeed(FeedEntryMessageSource.java:239)
at org.springframework.integration.feed.inbound.FeedEntryMessageSource.populateEntryList(FeedEntryMessageSource.java:202)
at org.springframework.integration.feed.inbound.FeedEntryMessageSource.doReceive(FeedEntryMessageSource.java:177)
at org.springframework.integration.feed.inbound.FeedEntryMessageSource.doReceive(FeedEntryMessageSource.java:58)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:167)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:359)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: Character reference "�" is an invalid XML character.
at com.rometools.rome.io.WireFeedInput.build(WireFeedInput.java:236)
at com.rometools.rome.io.SyndFeedInput.build(SyndFeedInput.java:150)
at org.springframework.integration.feed.inbound.FeedEntryMessageSource.getFeed(FeedEntryMessageSource.java:226)
... 20 more
Caused by: org.jdom2.input.JDOMParseException: Error on line 1: Character reference "�" is an invalid XML character.
at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:232)
at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:303)
at org.jdom2.input.SAXBuilder.build(SAXBuilder.java:1196)
at com.rometools.rome.io.WireFeedInput.build(WireFeedInput.java:233)
... 22 more
Caused by: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 15681; Character reference "�" is an invalid XML character.
at org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown Source)
at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown Source)
at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown Source)
at org.apache.xerces.impl.XMLScanner.scanCharReferenceValue(Unknown Source)
at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanCharReference(Unknown Source)
at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown Source)
at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source)
at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source)
at org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser.parse(Unknown Source)
at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:217)
... 25 more
org.springframework.messaging.MessagingException: Failed to retrieve feed for 'FeedEntryMessageSource{feedUrl=http://www.independent.co.uk/news/uk/politics/rss, feedResource=null, metadataKey='independent-politics.http://www.independent.co.uk/news/uk/politics/rss', lastTime=1613726122000}'; nested exception is com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: DOCTYPE is disallowed when the feature "http://apache.org/xml/features/disallow-doctype-decl" set to true.
at org.springframework.integration.feed.inbound.FeedEntryMessageSource.getFeed(FeedEntryMessageSource.java:239)
at org.springframework.integration.feed.inbound.FeedEntryMessageSource.populateEntryList(FeedEntryMessageSource.java:202)
at org.springframework.integration.feed.inbound.FeedEntryMessageSource.doReceive(FeedEntryMessageSource.java:177)
at org.springframework.integration.feed.inbound.FeedEntryMessageSource.doReceive(FeedEntryMessageSource.java:58)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:167)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:359)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.rometools.rome.io.ParsingFeedException: Invalid XML: Error on line 1: DOCTYPE is disallowed when the feature "http://apache.org/xml/features/disallow-doctype-decl" set to true.
at com.rometools.rome.io.WireFeedInput.build(WireFeedInput.java:236)
at com.rometools.rome.io.SyndFeedInput.build(SyndFeedInput.java:150)
at org.springframework.integration.feed.inbound.FeedEntryMessageSource.getFeed(FeedEntryMessageSource.java:226)
... 20 more
Caused by: org.jdom2.input.JDOMParseException: Error on line 1: DOCTYPE is disallowed when the feature "http://apache.org/xml/features/disallow-doctype-decl" set to true.
at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:232)
at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:303)
at org.jdom2.input.SAXBuilder.build(SAXBuilder.java:1196)
at com.rometools.rome.io.WireFeedInput.build(WireFeedInput.java:233)
... 22 more
Caused by: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 10; DOCTYPE is disallowed when the feature "http://apache.org/xml/features/disallow-doctype-decl" set to true.
at org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown Source)
at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown Source)
at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown Source)
at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown Source)
at org.apache.xerces.impl.XMLDocumentScannerImpl$PrologDispatcher.dispatch(Unknown Source)
at org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown Source)
at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source)
at org.apache.xerces.jaxp.SAXParserImpl$JAXPSAXParser.parse(Unknown Source)
at org.jdom2.input.sax.SAXBuilderEngine.build(SAXBuilderEngine.java:217)
... 25 more
So the error is thrown, it's logged and life goes on, except for some reason that poll fetch seems to be saved for a retry. This wouldn't be an issue if it retried it once, but it just keeps retrying over and over, and when enough of errors pile up it just keeps retrying those errors and nothing new is ever fetched.
For example, 500 feeds are polled, 10 of those throw an exception where the feed has been either moved so it no longer exists, it has bad xml or something else might have happened. Now the poller is stuck polling those 10 feeds over and over again until it no longer fails (which never happens). I remember reading somewhere that if exception is thrown in handleError function then it would keep retrying, but if no exception is thrown there it should move on. That doesn't seem to be the case here. I've spend 3 days now mulling over this, trying different solutions but It always just chokes up after about 4-10 minutes, depending on how long it takes to fetch enough feeds with errors.
I would say that you try to talk about many different exceptions: not available, malformed, processing error etc. I think the best practice is to distinguish those exceptions and made an appropriate business decision for each of them: we definitely can't treat "not available" as processing error and must retry until it is available back.
For the processing one I would suggest to look into a
ExpressionEvaluatingRequestHandlerAdviceon the service where your process entries of the feed: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chainFor malformed source I'd suggest to take a look into a flow removal form the context. So, you won't try to poll a wrong source any more. You definitely can do that from the error flow on that
feedErrorschannel.And so on for other errors you face with this scenario.
We probably may revise your
10 are of those throw an exception where the feeds have been removed for whatever reasonin more details since the logic in theFeedEntryMessageSourceis based on theupdatedDateof the entry. So, if we couldn't convert it properly and produce, it is really going to be pulled again next time since thelastTimestate has not been updated appropriately. But let's do that in the separate SO thread with much more details to investigate!UPDATE
Some thoughts regarding your stack traces.
The exception like
org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 15681; Character referis not good and must be treated as fatal. You definitely can't parse that RSS source any more or at least until it comes back to normal. This kind of exception could be handled like astop()for thatFeed.inboundAdapter(). So, you catch an exception in thefeedErrorschannel flow and call thestop()of caused endpoint. Or even better just remove the whole dynamic flow for this RSS source.The exception
org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 10; DOCTYPE is disallowed when the feature "http://apache.org/xml/features/disallow-doctype-decl" set to true.is also not good and probably could be treated the same fatal way with subsequentstop()for the endpoint. Or you can investigate thexerceslibrary how to allow that DTD declaration. Probably the option likesyndFeedInput(SyndFeedInput)could also give you some hooks for the XML parsing. For example I see it has this option: