Stream bytes from gRPC onNext to the client using StreamingOutput in Java

342 Views Asked by At

I'm new to handling files via InputStream/OutputStream.

I have a requirement to stream the contents of a CSV as they are generated so the end point can be memory-efficient.

Here's what I have done so far:

  1. As the request comes in from the client (terminal at this point), the server sends a request to another service (csv service) through a gRPC call.
  2. The csv service processes the gRPC request and starts to generate the CSV file.
  3. For every row, it sends the contents in the form of bytes using the onNext method.
recordObserver.onNext(Response.newBuilder().setData(ByteString.copyFrom(csvRow)).build());
  1. The primary server (the client of the gRPC request) receives the each onNext bytes present in the data chunk.

The problem I encounter is how do I stream the bytes I receive in the onNext method back to the original client (terminal) so an the entire CSV file can be populated in a streaming fashion?

I have this so far but as soon as the request is sent from the terminal, the connection is closed and also the outputStream.write(bytes); in the onNext method throws an exception when the first data byte from the server is received.

@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Path(DOWNLOAD_CSV)
StreamingOutput downloadCsv(@PathParam("Id") UUID Id) {
  return outputStream -> {
            try {
               
                mainService.downloadCsv(Id, outputStream);
                outputStream.flush();
            } catch (Exception e) {
                throw new WebApplicationException("Error occurred while attempting to download");
            }
        };

}

This is the client handling the gRPC response

    public void downloadCsv(UUID Id, OutputStream outputStream) {
    Request request = Request.newBuilder()
            .setId(Id)
            .build();

    csvServiceStub.downloadCsv(request, new StreamObserver<Response>() {
        @Override
        public void onNext(Response value) {
            try {
                byte[] bytes = value.getBytes().toByteArray();
                outputStream.write(bytes);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        @Override
        public void onError(Throwable t) {
        }
        @Override
        public void onCompleted() {
        }
    });
}

Here's the request I'm sending through the terminal:

curl 'https://server-address/52097f0e-5dd8-49bd-98f1-69c31a73b62a/download/csv \
  -X 'GET' \
  -H 'authority: server-address' \
  -H 'accept: application/octet-stream' \
  -H 'accept-language: en-US,en;q=0.9,ja;q=0.8' \
  -H 'authorization: OBFUSCATED' \
  -H 'content-type: application/json' \
  -H 'cookie: __zlcmid=1BzlEPGJ1UvbeK4' \
  -H 'dnt: 1' \
  -H 'origin: server-address' \
  -H 'referer: server-address' \
  --compressed

I'm not sure what I'm doing it wrong here but as I mentioned, I'm facing:

  1. Connection in the terminal closes as I hit this endpoint. No errors reported here. My expectation is that it should be able to give out the binary output.
  2. In the onNext method, the outputStream.write(bytes); throws an exception.
1

There are 1 best solutions below

0
underachiever On

Answering my own question: I had to use forEachRemaining while receiving the bytes using BlockingStub.