SocketChannel. invalid stream header: 00000000

644 Views Asked by At

I want to serialize 'Message' object, I can successfully transfer it as bytes array through socketChannel. After that, I change the object's properties (so that it may have larger size), and then there's a problem in sending object back to the client. Once I try to obtain the object on the client side, I get an exception, it occurs when I deserealize Message obj in getResponse() method:

org.apache.commons.lang3.SerializationException: java.io.StreamCorruptedException: invalid stream header: 00000000

But, somehow, this applies only for the first client (After the exception is thrown, connection with the first client is over) and when I start a new client (not closing server) I can successfully transfer the object back and forth, furthermore, it works for any new clients.

This is my minimal debuggable version:

import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client {

    private SocketChannel server;

    public void start() throws IOException {
        try {
            server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
            server.configureBlocking(false);
        } catch (IOException e) {
            System.err.println("Server isn't responding");
            System.exit(0);
        }

        Scanner scRequest = new Scanner(System.in);
        Scanner scState = new Scanner(System.in);


        System.out.println("Enter request:");
        String request = scRequest.nextLine();

        while (!request.equals("exit")) {
            try {
                // In my actual project class Person is a way different (But it's still a POJO)
                // I included it here to make sure I can get it back after sending to the server
                System.out.println("Enter a number:");
                Person person = new Person(scState.nextInt());
                sendRequest(request, person);

                System.out.println("\nEnter request:");
                request = scRequest.nextLine();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        stop();
    }

    public void sendRequest(String sMessage, Person person) {
        Message message = new Message(sMessage, person);
        ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        try {
            server.write(requestBuffer);
            requestBuffer.clear();
            getResponse();
        } catch (Exception e) {
            System.out.println(e.getMessage());
            System.err.println("Connection lost");
            System.exit(0);
        }
    }

    public void getResponse() throws Exception {
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);

        int read = server.read(responseBuffer);
        responseBuffer.clear();
        if(read == -1) {
            throw new Exception();
        }

        byte[] bytes = new byte[responseBuffer.limit()];
        responseBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        System.out.println(message);
    }

    public void stop() throws IOException {
        server.close();
    }

    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.start();
    }
}
import org.apache.commons.lang3.SerializationUtils;

import java.io.*;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {

    public void start() throws IOException {

        Selector selector = Selector.open();
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost", 5454));
        serverSocket.configureBlocking(false);
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server started");

        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                if (key.isAcceptable()) {
                    register(selector, serverSocket);
                }
                if (key.isReadable()) {
                    try {
                        getRequest(key);
                    } catch (Exception e) {
                        System.err.println(e.getMessage());
                    }
                }
                iter.remove();
            }
        }
    }

    private void getRequest(SelectionKey key) throws Exception {
        SocketChannel client = (SocketChannel) key.channel();

        ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
        int read = client.read(requestBuffer);
        requestBuffer.clear();

        if(read == -1) {
            key.cancel();
            throw new Exception("Client disconnected at: " +
                    ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
        }

        byte[] bytes = new byte[requestBuffer.limit()];
        requestBuffer.get(bytes);

        Message message = SerializationUtils.deserialize(bytes);
        sendResponse(client, message);
    }

    private void sendResponse(SocketChannel client, Message message) throws IOException {

        message.setResult("Some result");

        ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
        while (responseBuffer.hasRemaining()) {
            client.write(responseBuffer);
        }
        responseBuffer.clear();
    }

    private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
        SocketChannel client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
        System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
    }

    public static void main(String[] args) throws Exception {
        new Server().start();
    }
}

I try to send this object as a bytes array:

import java.io.Serializable;
import java.util.Formatter;

public class Message implements Serializable {

    private String command;
    private Person person;
    private String result;

    public Message(String command, Person person) {
        this.command = command;
        this.person = person;
    }

    public String getCommand() {
        return command;
    }
    public void setCommand(String executedCommand) {
        this.command = executedCommand;
    }
    public Person getPerson() {
        return person;
    }
    public void setPerson(Person person) {
        this.person = person;
    }
    public String getResult() {
        return result;
    }
    public void setResult(String result) {
        this.result = result;
    }

    @Override
    public String toString() {
        return new Formatter()
                .format("Command: %s\nAttached object: %s\nResult: %s",
                        command, person, result)
                .toString();
    }
}

I include instance of this class inside Message obj:

public class Person implements Serializable {
    private final int state;

    public Person(int state) {
        this.state = state;
    }

    @Override
    public String toString() {
        return "Person state: " + state;
    }
}

I have no idea what is going wrong, hope for your help.

UPD: I used 'org.apache.commons:commons-lang3:3.5' dependency to serialize an object into bytes array

1

There are 1 best solutions below

2
kriegaex On BEST ANSWER

I have never used Java NIO channels before, so I am not an expert. But I found out several things:

General:

  • In order to debug your code, it is helpful to use e.printStackTrace() instead of just System.out.println(e.getMessage()).

Client:

  • SocketChannel server in the client should be configured as blocking, otherwise it might read 0 bytes because there is no server response yet, which causes your problem.
  • You should always call ByteBuffer.clear() before reading something, not afterwards.
  • After reading, the position in the byte buffer has to be reset to 0 via responseBuffer.position(0) before calling get(byte[]), otherwise it will read undefined bytes after the ones just read.
  • You should size your byte arrays according to the number of bytes read, not the byte buffer size. It might work the other way around, but it is inefficient.

Server:

  • You should always call ByteBuffer.clear() before reading something, not afterwards.
  • After reading, the position in the byte buffer has to be reset to 0 via responseBuffer.position(0) before calling get(byte[]), otherwise it will read undefined bytes after the ones just read.
  • When catching exceptions during getRequest(key) calls, you should close the corresponding channel, otherwise after a client disconnects the server will indefinitely try to read from it, spamming your console log with error messages. My modification handles that case and also prints a nice log message telling which client (remote socket address) was closed.

Caveat: There is nothing in your code dealing with the situation that a request or response written into the channel on the one side is bigger than the maximum ByteBuffer size on the other side. Similarly, in theory a (de)serialised byte[] could also end up being bigger than the byte buffer.

Here are my diffs:

Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java
===================================================================
--- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (revision Staged)
+++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Client.java (date 1612321383172)
@@ -15,7 +15,7 @@
   public void start() throws IOException {
     try {
       server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
-      server.configureBlocking(false);
+      server.configureBlocking(true);
     }
     catch (IOException e) {
       System.err.println("Server isn't responding");
@@ -56,22 +56,24 @@
       getResponse();
     }
     catch (Exception e) {
-      System.out.println(e.getMessage());
+      e.printStackTrace();
+//      System.out.println(e.getMessage());
       System.err.println("Connection lost");
       System.exit(0);
     }
   }
 
   public void getResponse() throws Exception {
-    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
+    responseBuffer.clear();
 
     int read = server.read(responseBuffer);
-    responseBuffer.clear();
     if (read == -1) {
-      throw new Exception();
+      throw new Exception("EOF, cannot read server response");
     }
 
-    byte[] bytes = new byte[responseBuffer.limit()];
+    byte[] bytes = new byte[read];
+    responseBuffer.position(0);
     responseBuffer.get(bytes);
 
     Message message = SerializationUtils.deserialize(bytes);
Index: src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java
===================================================================
--- a/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (revision Staged)
+++ b/src/main/java/de/scrum_master/stackoverflow/q65890087/Server.java (date 1612323386278)
@@ -35,7 +35,11 @@
             getRequest(key);
           }
           catch (Exception e) {
-            System.err.println(e.getMessage());
+            e.printStackTrace();
+//            System.err.println(e.getMessage());
+            SocketChannel client = (SocketChannel) key.channel();
+            System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
+            client.close();
           }
         }
         iter.remove();
@@ -45,15 +49,16 @@
 
   private void getRequest(SelectionKey key) throws Exception {
     SocketChannel client = (SocketChannel) key.channel();
-    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024 * 64);
+    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
+    requestBuffer.clear();
     int read = client.read(requestBuffer);
-    requestBuffer.clear();
     if (read == -1) {
       key.cancel();
       throw new Exception("Client disconnected at: " +
         ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
     }
-    byte[] bytes = new byte[requestBuffer.limit()];
+    byte[] bytes = new byte[read];
+    requestBuffer.position(0);
     requestBuffer.get(bytes);
     Message message = SerializationUtils.deserialize(bytes);
     sendResponse(client, message);

Just for completeness' sake, here are the full classes after I changed them:

import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class Client {

  private SocketChannel server;

  public void start() throws IOException {
    try {
      server = SocketChannel.open(new InetSocketAddress("localhost", 5454));
      server.configureBlocking(true);
    }
    catch (IOException e) {
      System.err.println("Server isn't responding");
      System.exit(0);
    }

    Scanner scRequest = new Scanner(System.in);
    Scanner scState = new Scanner(System.in);

    System.out.println("Enter request:");
    String request = scRequest.nextLine();

    while (!request.equals("exit")) {
      try {
        // In my actual project class Person is a way different (But it's still a POJO)
        // I included it here to make sure I can get it back after sending to the server
        System.out.println("Enter a number:");
        Person person = new Person(scState.nextInt());
        sendRequest(request, person);

        System.out.println("\nEnter request:");
        request = scRequest.nextLine();
      }
      catch (Exception e) {
        e.printStackTrace();
      }
    }

    stop();
  }

  public void sendRequest(String sMessage, Person person) {
    Message message = new Message(sMessage, person);
    ByteBuffer requestBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
    try {
      server.write(requestBuffer);
      requestBuffer.clear();
      getResponse();
    }
    catch (Exception e) {
      e.printStackTrace();
//      System.out.println(e.getMessage());
      System.err.println("Connection lost");
      System.exit(0);
    }
  }

  public void getResponse() throws Exception {
    ByteBuffer responseBuffer = ByteBuffer.allocate(1024 * 1024);
    responseBuffer.clear();

    int read = server.read(responseBuffer);
    if (read == -1) {
      throw new Exception("EOF, cannot read server response");
    }

    byte[] bytes = new byte[read];
    responseBuffer.position(0);
    responseBuffer.get(bytes);

    Message message = SerializationUtils.deserialize(bytes);
    System.out.println(message);
  }

  public void stop() throws IOException {
    server.close();
  }

  public static void main(String[] args) throws IOException {
    Client client = new Client();
    client.start();
  }
}
import org.apache.commons.lang3.SerializationUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {
  public void start() throws IOException {
    Selector selector = Selector.open();
    ServerSocketChannel serverSocket = ServerSocketChannel.open();
    serverSocket.bind(new InetSocketAddress("localhost", 5454));
    serverSocket.configureBlocking(false);
    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println("Server started");

    while (true) {
      selector.select();
      Set<SelectionKey> selectedKeys = selector.selectedKeys();
      Iterator<SelectionKey> iter = selectedKeys.iterator();
      while (iter.hasNext()) {
        SelectionKey key = iter.next();
        if (key.isAcceptable()) {
          register(selector, serverSocket);
        }
        if (key.isReadable()) {
          try {
            getRequest(key);
          }
          catch (Exception e) {
            e.printStackTrace();
//            System.err.println(e.getMessage());
            SocketChannel client = (SocketChannel) key.channel();
            System.err.println("Closing client connection at: " + client.socket().getRemoteSocketAddress());
            client.close();
          }
        }
        iter.remove();
      }
    }
  }

  private void getRequest(SelectionKey key) throws Exception {
    SocketChannel client = (SocketChannel) key.channel();
    ByteBuffer requestBuffer = ByteBuffer.allocate(1024 * 1024);
    requestBuffer.clear();
    int read = client.read(requestBuffer);
    if (read == -1) {
      key.cancel();
      throw new Exception("Client disconnected at: " +
        ((SocketChannel) key.channel()).socket().getRemoteSocketAddress());
    }
    byte[] bytes = new byte[read];
    requestBuffer.position(0);
    requestBuffer.get(bytes);
    Message message = SerializationUtils.deserialize(bytes);
    sendResponse(client, message);
  }

  private void sendResponse(SocketChannel client, Message message) throws IOException {
    message.setResult("Some result");
    ByteBuffer responseBuffer = ByteBuffer.wrap(SerializationUtils.serialize(message));
    while (responseBuffer.hasRemaining()) {
      client.write(responseBuffer);
    }
    responseBuffer.clear();
  }

  private void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
    SocketChannel client = serverSocket.accept();
    client.configureBlocking(false);
    client.register(selector, SelectionKey.OP_READ);
    System.out.println("New client at: " + client.socket().getRemoteSocketAddress());
  }

  public static void main(String[] args) throws Exception {
    new Server().start();
  }
}