java.lang.IllegalStateException: Stream 39 sent too many headers EOS: false on running gRPC server with 500 clients

65 Views Asked by At

I am working on a bi-directional streaming gRPC POC where I have a gRPC client, which is spawning 500 threads to simulate 500 clients, each doing the rpc call parallelly every second. On the gRPC server side, When client make the first connect, I save the responseStreamObserver and link it with the client's ID, so the server can use the StreamObserver when needs to push message to client.

When running this setup I see below errors :

java.lang.IllegalStateException: Stream 39 sent too many headers EOS: false
and io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place

When I run a single instance/thread in the client, I see no errors. Seems to me that this is caused due to thread synchronization issue. Can anyone please help in resolving this? I'm blocked. :(

Following is my server implementation:

    public StreamObserver<DataRequest> processBidirectionalStream(StreamObserver<DataResponse> responseObserver) {
    return new StreamObserver<DataRequest>() {
                        private String clientId;
                        private int requestCount = 0;
                        private boolean isFirstRequest = true;
                        @Override
                        public void onNext(DataRequest request) {
                            clientId=request.getClientId();
                            if(isFirstRequest){
                                //Store the first StreamObserver for this client
                                clientObservers.put(clientId, responseObserver);
                                isFirstRequest = false;
                            }
                            //Increment request count for the client
                            requestCount = clientRequestCounts.getOrDefault(clientId, 0);
                            requestCount++;
                            clientRequestCounts.put(clientId, requestCount);
                            //Push the message to kafka topic for processing
                            processChunk(request.getName());
                            //Send a response back to the client after 5 requests
                            if(clientRequestCounts.get(clientId)%5==0){
                                DataResponse response = DataResponse.newBuilder()
                                        .setMessage("****Notification sent****")
                                        .build();
                                clientObservers.get(clientId).onNext(response);
                            }
                        }

                        @Override
                        public void onError(Throwable throwable) {
                            System.err.println("Encountered error in bidirectional stream: " + throwable);
                        }

                        @Override
                        public void onCompleted() {
                            System.out.println("Bidirectional streaming RPC completed");
                            clientObservers.get(clientId).onCompleted();
                        }
                    };
    }

Tried running 500 clients (as separate threads here) which are doing parallel rpc calls to the gRPC server every second. Expected gRPC server to handle the requests and send appropriate response whenever needed to each client (on ad-hoc basis, not on every request.) Seeing errors in server as below when this set-up is run:

java.lang.IllegalStateException: Stream 39 sent too many headers EOS: false
and 
io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place

Following is the stack trace when run with 10 clients for testing purposes:

Encountered error in bidirectional stream: io.grpc.StatusRuntimeException: CANCELLED: client cancelled
2024-02-26 10:56:39.588 ERROR 21736 --- [ault-executor-8] io.grpc.internal.SerializingExecutor     : Exception while executing runnable io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1StreamCreated@78959964

io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception
    at io.grpc.Status.asRuntimeException(Status.java:524) ~[grpc-api-1.30.0.jar:1.30.0]
    at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366) ~[grpc-stub-1.42.1.jar:1.42.1]
    at com.philips.fp.service.GrpcServer.sendMessageToClient(GrpcServer.java:120) ~[classes/:na]
    at com.philips.fp.service.GrpcServer.processBidirectionalStream(GrpcServer.java:54) ~[classes/:na]
    at com.philips.fp.grpc.DataProcessorGrpc$MethodHandlers.invoke(DataProcessorGrpc.java:204) ~[classes/:na]
    at io.grpc.stub.ServerCalls$StreamingServerCallHandler.startCall(ServerCalls.java:235) ~[grpc-stub-1.42.1.jar:1.42.1]
    at net.devh.boot.grpc.server.metric.MetricCollectingServerInterceptor.interceptCall(MetricCollectingServerInterceptor.java:138) ~[grpc-server-spring-boot-autoconfigure-2.9.0.RELEASE.jar:2.9.0.RELEASE]
    at io.grpc.ServerInterceptors$InterceptCallHandler.startCall(ServerInterceptors.java:244) ~[grpc-api-1.30.0.jar:1.30.0]
    at io.grpc.Contexts.interceptCall(Contexts.java:52) ~[grpc-api-1.30.0.jar:1.30.0]
    at net.devh.boot.grpc.server.scope.GrpcRequestScope.interceptCall(GrpcRequestScope.java:75) ~[grpc-server-spring-boot-autoconfigure-2.9.0.RELEASE.jar:2.9.0.RELEASE]
    at io.grpc.ServerInterceptors$InterceptCallHandler.startCall(ServerInterceptors.java:244) ~[grpc-api-1.30.0.jar:1.30.0]
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.startWrappedCall(ServerImpl.java:651) ~[grpc-core-1.30.0.jar:1.30.0]
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.startCall(ServerImpl.java:629) ~[grpc-core-1.30.0.jar:1.30.0]
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.access$1900(ServerImpl.java:416) ~[grpc-core-1.30.0.jar:1.30.0]
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1StreamCreated.runInternal(ServerImpl.java:556) ~[grpc-core-1.30.0.jar:1.30.0]
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl$1StreamCreated.runInContext(ServerImpl.java:531) ~[grpc-core-1.30.0.jar:1.30.0]
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[grpc-core-1.30.0.jar:1.30.0]
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[grpc-core-1.30.0.jar:1.30.0]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
0

There are 0 best solutions below