I am trying to put and read file from the remote file system using winSCP through an SFTP connection. The leaf node of the file system is s3 object store which contain the files (for eg: xyz.txt). Below is the overridden method of File Channel class.
XYZFileSystemProvider
public class XYZFileSystemProvider extends FileSystemProvider {
@Override
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs)
throws IOException {
// TODO Auto-generated method stub
Collection<XYZOptions.OpenMode> modes = XYZOptions.OpenMode.fromOpenOptions(options);
if (modes.isEmpty()) {
modes = EnumSet.of(XYZOptions.OpenMode.Read, MFEOptions.OpenMode.Write);
}
// TODO: process file attributes
return new XYZFileSystemChannel(path, modes);
}
}
XYZFileSystemChannel
public class XYZFileSystemChannel extends XYZRemotePathChannel{
public XYZFileSystemChannel(XYZPath p, Collection<XYZOptions.OpenMode> modes) throws IOException {
this(Objects.requireNonNull(p, "No target path").toString(), p.getFileSystem(), modes);
}
public XYZFileSystemChannel(String remotePath, XYZFileSystem fs, Collection<XYZOptions.OpenMode> modes) throws IOException {
super(remotePath, fs, true, modes);
}
}
XYZRemotePathChannel
public class XYZRemotePathChannel extends FileChannel {
private AmazonS3Component getAmazonS3Instance() {
return SpringContext.getBean(AmazonS3Component.class);
}
private final String path;
private final Collection<XYZOptions.OpenMode> modes;
private final boolean closeOnExit;
private XYZFileSystem fileSystem;
private final AtomicLong posTracker = new AtomicLong(0L);
public static final Set<XYZOptions.OpenMode> READ_MODES =
Collections.unmodifiableSet(EnumSet.of(XYZOptions.OpenMode.Read));
private final Object lock = new Object();
private final AtomicReference<Thread> blockingThreadHolder = new AtomicReference<>(null);
public XYZRemotePathChannel(String path, XYZFileSystem fileSystem, boolean closeOnExit,
Collection<XYZOptions.OpenMode> modes) throws IOException {
this.path = ValidateUtils.checkNotNullAndNotEmpty(path, "No remote file path specified");
this.modes = Objects.requireNonNull(modes, "No channel modes specified");
this.closeOnExit = closeOnExit;
this.fileSystem = fileSystem;
}
@Override
public int read(ByteBuffer dst) throws IOException {
// TODO Auto-generated method stub
log.debug("Position of dst is : {}",dst.position());
log.debug("Reading the bytes of the file : {}", dst);
//Some code to be done here in order to read dst and send bytes of the file recieved from s3 store
return (int) doRead(Collections.singletonList(dst), -1);
}
protected long doRead(List<ByteBuffer> buffers, long position) throws IOException {
log.debug("Do Reading the bytes of the file of list of buffer : {} and position :{}", buffers , position);
ensureOpen(READ_MODES);
synchronized (lock) {
boolean completed = false;
boolean eof = false;
long curPos = (position >= 0L) ? position : posTracker.get();
byte[] bytes = new byte[(int) curPos];
try {
long totalRead = 0;
beginBlocking();
String [] parts = this.path.toString().replaceFirst("^/", "").split("/");
String bucket = parts[parts.length-2];
String fileName = parts[parts.length-1];
InputStream fileContent = getAmazonS3Instance().getFileFromBucket(bucket, fileName);
log.debug("Contens of the file: {} from bucket: {} are : {}", fileName , bucket, fileContent);
//Some code to be done here to return the content byte length??
int fileLenght = fileContent.read(bytes, 1, (int) curPos);
log.debug("After reading the file content the file length is : {}" , fileLenght );
return fileLenght;
} finally {
if (position < 0L) {
posTracker.set(curPos);
}
endBlocking(completed);
}
}
}
private void endBlocking(boolean completed) throws AsynchronousCloseException {
blockingThreadHolder.set(null);
end(completed);
}
private void beginBlocking() {
begin();
blockingThreadHolder.set(Thread.currentThread());
}
@Override
public FileChannel position(long newPosition) throws IOException {
// TODO Auto-generated method stub
log.debug("Setting the position of the file : {}", newPosition);
if (newPosition < 0L) {
throw new IllegalArgumentException("position(" + this.path + ") illegal file channel position: " + newPosition);
}
ensureOpen(Collections.emptySet());
posTracker.set(newPosition);
return this;
}
private void ensureOpen(Collection<XYZOptions.OpenMode> reqModes) throws IOException {
if (!isOpen()) {
throw new ClosedChannelException();
}
if (GenericUtils.size(reqModes) > 0) {
for (XYZOptions.OpenMode m : reqModes) {
if (this.modes.contains(m)) {
return;
}
}
throw new IOException("ensureOpen(" + this.path + ") current channel modes (" + this.modes
+ ") do contain any of the required: " + reqModes);
}
}
}
XYZOptions
public class XYZOptions {
enum OpenMode {
Read, Write, Append, Create, Truncate, Exclusive;
public static final Set<OpenOption> SUPPORTED_OPTIONS = Collections
.unmodifiableSet(EnumSet.of(StandardOpenOption.READ, StandardOpenOption.APPEND,
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE));
public static Set<OpenMode> fromOpenOptions(Collection<? extends OpenOption> options) {
if (GenericUtils.isEmpty(options)) {
return Collections.emptySet();
}
Set<OpenMode> modes = EnumSet.noneOf(OpenMode.class);
for (OpenOption option : options) {
if (option == StandardOpenOption.READ) {
modes.add(Read);
} else if (option == StandardOpenOption.APPEND) {
modes.add(Append);
} else if (option == StandardOpenOption.CREATE) {
modes.add(Create);
} else if (option == StandardOpenOption.TRUNCATE_EXISTING) {
modes.add(Truncate);
} else if (option == StandardOpenOption.WRITE) {
modes.add(Write);
} else if (option == StandardOpenOption.CREATE_NEW) {
modes.add(Create);
modes.add(Exclusive);
} else if (option == StandardOpenOption.SPARSE) {
continue;
} else {
throw new IllegalArgumentException("Unsupported open option: " + option);
}
}
return modes;
}
}
}
I am able to fetch the file from the s3 store but nor sure how to read and pass all the contents while someone drag and drop from remote file location to their own system using winSCP. I know i am missing some code at the mentioned place but not sure how to achieve it.