I am currently processing .csv files from a remote SFTP server, transforming them and placing them on a Kafka topic. I know we are able to delete the files after it is processed using an Sftp.inboundAdapter. I was curious if there was a way we can get this functionality with the Sftp.InboundStreamingAdapter? Is there another flow I would need to add in, or is there some way I can delete using the properties provided?
I can provide more code if the entire flow if needed. Thanks
@Bean
public IntegrationFlow sftpFileTransferFlow(SessionFactory<SftpClient.DirEntry> sftpSessionFactory,
IntegrationFlowProperties properties,
MessageChannel inboundFilesMessageChannel) {
return IntegrationFlow
.from(Sftp.inboundStreamingAdapter(new RemoteFileTemplate<>(sftpSessionFactory))
.filter(new SftpRegexPatternFileListFilter(properties.getRemoteFilePattern()))
.remoteDirectory(properties.getRemoteDirectory()),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedRate(5000)))
.log(LoggingHandler.Level.DEBUG, "DataSftpToKafkaIntegrationFlow",
"headers['file_remoteDirectory'] + + T(java.io.File).separator + headers['file_remoteFile']")
.channel(inboundFilesMessageChannel)
.get();
}
EDIT:
@Bean
public IntegrationFlow readCsvFileFlow(MessageChannel inboundFilesMessageChannel,
QueueChannel kafkaPojoMessageChannel) {
return IntegrationFlow.from(inboundFilesMessageChannel)
.split(Files.splitter()
.markers(true)
.charset(StandardCharsets.UTF_8)
.firstLineAsHeader("myHeaders")
.applySequence(true))
// .transform(new StreamToEmailInteractionConverter()) // TODO: Figure this out so it doesn't get put on topic as .csv
.transform(new ObjectToJsonTransformer())
.log(LoggingHandler.Level.DEBUG,
"DataSftpToKafkaIntegrationFlow",
m -> "Payload: " + m.getPayload())
.channel(kafkaPojoMessageChannel)
.get();
}
@Bean
public IntegrationFlow publishToKafkaFlow(KafkaTemplate<String, String> kafkaTemplate,
MessageChannel kafkaProducerErrorRecordChannel,
QueueChannel kafkaPojoMessageChannel) {
return IntegrationFlow.from(kafkaPojoMessageChannel)
.log(LoggingHandler.Level.DEBUG,
"DataSftpToKafkaIntegrationFlow", e -> "Payload: " + e.getPayload())
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.topic(KAFKA_TOPIC),
e -> e.id("KafkaProducer"))
.routeByException(r -> r
.channelMapping(KafkaProducerException.class, kafkaProducerErrorRecordChannel)
.defaultOutputChannel("errorChannel"))
.get();
}
After this entire flow is completed I would like to delete the file we just processed from the remote SFTP server.
The
AbstractInboundFileSynchronizingMessageSourceworks in two phases: Copy remote files into local dir, and then emit messages for those local files. ThedeleteRemoteFilesoption is for copying from remote to local phase. That's where we don't need remote files any more and we deal only with local copies downstream.The streaming source opens an
InputStreamfor remote files and keeps it that way until you close it manually. That's why we don't have a explicit option to delete remote files.You can perform remote file removal using an
SftpOutboundGatewaywith anrmcommand. This gateway could be as a second subscriber to thatinboundFilesMessageChannelwhen you make it as aPublishSubscribeChanneland when you have closed already thatIntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCEfrom message headers.However I think we can come up with an option like
removeFileOnCloseon theAbstractRemoteFileStreamingMessageSourceand and performrmwhenIntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCEis closed.