I use AsyncHttpClient library for async non blocking requests. My case: write data to a file as it is received over the network.
For download file from remote host and save to file I used default ResponseBodyPartFactory.EAGER and AsynchronousFileChannel so as not to block the netty thread as data arrives. But as my measurements showed, in comparison with LAZY the memory consumption in the Java heap increases many times over.
So I decided to go straight to LAZY, but did not consider the consequences for the files.
This code will help to reproduce the problem.:
public static class AsyncChannelWriter {
private final CompletableFuture<Integer> startPosition;
private final AsynchronousFileChannel channel;
public AsyncChannelWriter(AsynchronousFileChannel channel) throws IOException {
this.channel = channel;
this.startPosition = CompletableFuture.completedFuture((int) channel.size());
}
public CompletableFuture<Integer> getStartPosition() {
return startPosition;
}
public CompletableFuture<Integer> write(ByteBuffer byteBuffer, CompletableFuture<Integer> currentPosition) {
return currentPosition.thenCompose(position -> {
CompletableFuture<Integer> writenBytes = new CompletableFuture<>();
channel.write(byteBuffer, position, null, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
writenBytes.complete(result);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
writenBytes.completeExceptionally(exc);
}
});
return writenBytes.thenApply(writenBytesLength -> writenBytesLength + position);
});
}
public void close(CompletableFuture<Integer> currentPosition) {
currentPosition.whenComplete((position, throwable) -> IOUtils.closeQuietly(channel));
}
}
public static void main(String[] args) throws IOException {
final String filepath = "/media/veracrypt4/files/1.jpg";
final String downloadUrl = "https://m0.cl/t/butterfly-3000.jpg";
final AsyncHttpClient client = Dsl.asyncHttpClient(Dsl.config().setFollowRedirect(true)
.setResponseBodyPartFactory(AsyncHttpClientConfig.ResponseBodyPartFactory.LAZY));
final AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get(filepath), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE);
final AsyncChannelWriter asyncChannelWriter = new AsyncChannelWriter(channel);
final AtomicReference<CompletableFuture<Integer>> atomicReferencePosition = new AtomicReference<>(asyncChannelWriter.getStartPosition());
client.prepareGet(downloadUrl)
.execute(new AsyncCompletionHandler<Response>() {
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
//if EAGER, content.getBodyByteBuffer() return HeapByteBuffer, if LAZY, return DirectByteBuffer
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = asyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
return State.CONTINUE;
}
@Override
public Response onCompleted(Response response) {
asyncChannelWriter.close(atomicReferencePosition.get());
return response;
}
});
}
in this case, the picture is broken. But if i use FileChannel instead of AsynchronousFileChannel, in both cases, the files come out normal. Can there be any nuances when working with DirectByteBuffer (in case withLazyResponseBodyPart.getBodyByteBuffer()) and AsynchronousFileChannel?
What could be wrong with my code, if everything works fine with EAGER?
UPDATE
As I noticed, if I use LAZY, and for example,i add the line
Thread.sleep (10) in the method onBodyPartReceived, like this:
@Override
public State onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
final ByteBuffer bodyByteBuffer = content.getBodyByteBuffer();
final CompletableFuture<Integer> currentPosition = atomicReferencePosition.get();
final CompletableFuture<Integer> newPosition = finalAsyncChannelWriter.write(bodyByteBuffer, currentPosition);
atomicReferencePosition.set(newPosition);
Thread.sleep(10);
return State.CONTINUE;
}
The file is saved to disk in non broken state.
As I understand it, the reason is that during these 10 milliseconds, the asynchronous thread in AsynchronousFileChannel manages to write data to the disk from this DirectByteBuffer.
It turns out that the file is broken due to the fact that this asynchronous thread uses this buffer for writing along with the netty thread.
If we take a look at source code with EagerResponseBodyPart, then we will see the following
private final byte[] bytes;
public EagerResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
bytes = byteBuf2Bytes(buf);
}
@Override
public ByteBuffer getBodyByteBuffer() {
return ByteBuffer.wrap(bytes);
}
Thus, when a piece of data arrives, it is immediately stored in the byte array. Then we can safely wrap them in HeapByteBuffer and transfer to the asynchronous thread in file channel.
But if you look at the code LazyResponseBodyPart
private final ByteBuf buf;
public LazyResponseBodyPart(ByteBuf buf, boolean last) {
super(last);
this.buf = buf;
}
@Override
public ByteBuffer getBodyByteBuffer() {
return buf.nioBuffer();
}
As you can see, we actually use in asynchronous file channel thread netty ByteBuff(in this case always PooledSlicedByteBuf) via method call nioBuffer
What can i do in this situation, how to safely pass DirectByteBuffer in an async thread without copying buffer to java heap?
I talked to the maintainer of
AsyncHttpClient. Can see hereThe main problem was that i dont's use netty ByteBuf methods
retainandrelease. In the end, I came to two solutions.First: Write the bytes in sequence to the file with tracking position with
CompletableFuture.Define wrapper class for
AsynchronousFileChannelIn fact, there probably won't be an
AtomicReferencehere, if the recording happens in one thread, and if from several, then we need to seriously approach synchronization.And main usage.
The second solution: track the position based on the received size of bytes.
In the second solution, because we don’t wait until some bytes are written to the file,
AsynchronousFileChannelcan create a lot of threads (If you use Linux, because Linux does not implement non-blocking asynchronous file IO. In Windows, the situation is much better).As my measurements showed, in the case of writing to a slow USB flash, the number of threads can reach tens of thousands, so for this you need to limit the number of threads by creating your
ExecutorServiceand transferring it toAsynchronousFileChannel.Are there obvious advantages and disadvantages of the first and second solutions? It's hard for me to say. Maybe someone can tell what is more effective.