We are writing a Batch Job which takes a file as input from an FTP, generates some new files and writes them to an S3 bucket, and for this we are using Spring Integration.
The file in the FTP is an extraction from a DB and is updated each night.
The problem is that, when we start the app the first time, it connects well to the FTP, downloads the file, and uploads the generation result S3. Then we delete the downloaded file locally and wait to the next generation of the file in the FTP to restart the process. But it never downloads the file again.
Any idea?
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(ftpReader(),
spec -> spec.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(period)))
.enrichHeaders(Map.of("CORRELATION_ID", "rcm"))
.aggregate(aggregatorSpec -> aggregatorSpec
.correlationStrategy(message -> message.getHeaders().get("CORRELATION_ID"))
.releaseStrategy(group -> group.getMessages().size() == 2))
.transform(stockUnmarshaller)
.transform(stockTransformer)
.transform(stockMarshaller)
.transform(picturesDownloader)
.transform(picturesZipper)
.transform(stockIndexer)
.handle(directoryCleaner)
.nullChannel();
}
@Bean
public FtpInboundChannelAdapterSpec ftpReader() {
return Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.remoteDirectory(rootFolder)
.autoCreateLocalDirectory(true)
.localDirectory(new File(localDirectory));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sessionFactory = new DefaultFtpSessionFactory();
sessionFactory.setHost(host);
sessionFactory.setUsername(userName);
sessionFactory.setPassword(password);
sessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
return sessionFactory;
}
Thanks in advance.
EDIT:
I use enrichHeaders to ensure that the pipeline is triggered if we have exactly 2 files. Maybe the headers are not removed and the condition will be always greater than 2? Maybe it's the wrong manner to proceed?
Thanks again.
The inbound channel adapter has two filters
.filterand.localFilter.The first filters the remote files before downloading, the second filters files on the file system.
By default the
filteris aFtpPersistentAcceptOnceFileListFilterwhich will only fetch new or changed files.By default, the
localFilteris anFileSystemPersistentAcceptOnceFileListFilterwhich, again, will only pass a file a second time if it's timestamp has changed.So the file will only be reprocessed if its timestamp changes.
I suggest you run in a debugger to see why it is not passing the filter.