How can I identify a complete response when receive lot of packets in concurrent TCP requests

427 Views Asked by At

I have a single TCP connection to a server, but possibly have multiple requests at the same time. Most of the time the response will be so big that I would constantly receive lot of data chunks. It's possible for me to check the data length in order to determine it is the END OF STREAM. But with multiple requests, sometimes packets "mixed" with another requests that causes a lot of failures.

For example,

for a normal request:

  • -> request #1 1/3
  • -> request #1 2/3
  • -> request #1 3/3
  • -> request #1 completed

in real life:

  • -> request #1 1/3
  • -> request #1 2/3
  • -> request #2 1/3
  • -> request #2 2/3
  • -> failure at some point

For now, my only thought is to make it serially by serving the request one after one. However it can't fully solve my problem because sometimes I got subscribed events comes in without control.

How can I solve this TCP problem in general?

I'm showing some of the code below (in case somebody knows erlang and elixir)

# Create TCP connection
{:ok, socket} =
      :gen_tcp.connect(host_charlist, port, [:binary, active: true, keepalive: true])

# Send request
def handle_call({:send, msg}, from, state) do
  :ok = :gen_tcp.send(state.socket, msg)
  new_state = %{state | from: from, msg: ""}

  {:noreply, new_state}
end

# When it receive packet
def handle_info({:tcp, _socket, msg}, state) do
  new_state = %{state | msg: state.msg <> msg}
  current_msg_size = byte_size(new_state.msg)
  defined_msg_size = Response.get_body_size(new_state.msg) # this msg size can read from the first packet's header

  cond do
    current_msg_size == defined_msg_size ->
      GenServer.reply(new_state.from, {:ok, new_state.msg})
      {:noreply, %{new_state | msg: ""}}

    current_msg_size > defined_msg_size ->
      GenServer.reply(new_state.from, {:error, "Message size exceed."})
      {:noreply, %{new_state | msg: ""}}

    true ->
      {:noreply, new_state}
  end
end
2

There are 2 best solutions below

4
José M On BEST ANSWER

At TCP level, in a connection, request and response do no exist, it's a single tube transferring bytes from one side to the other in order.

In order to handle interleaving over a single connection you have to handle it one level up the stack.

Possible solutions include:

  1. Serializing
  2. Framing: You frame the responses and guarantee somehow that frames are sent completely without interleaving in the server, then your receiver can inspect each frame (maybe spanning multiple receives) and assign it to the corresponding request.
  3. One connection for each request: Let the OS handle the interleaving on the wire at the expense of a socket and the handshake each time
7
Chen Yu On

You can change the tcp option from active = true to active = false or to active = once

:gen_tcp.connect(host_charlist, port, [:binary, active: true, keepalive: true])

Then you can control the receiving of message by yourself, not by flooded by incoming messages.

1、receive upper's calling, process your client request, set active = false, waiting for server's response.

2、Try not to receive only related message, ignore these in the process's message queue or save these in the buffer.

3、when receive server's response, process it, then set active = once.

4、if timeout, set active = once.

https://www.erlang.org/doc/man/gen_tcp.html#controlling_process-2

controlling_process(Socket, Pid) -> ok | {error, Reason} Types Socket = socket() Pid = pid() Reason = closed | not_owner | badarg | inet:posix() Assigns a new controlling process Pid to Socket. The controlling process is the process that receives messages from the socket. If called by any other process than the current controlling process, {error, not_owner} is returned. If the process identified by Pid is not an existing local pid, {error, badarg} is returned. {error, badarg} may also be returned in some cases when Socket is closed during the execution of this function.

If the socket is set in active mode, this function will transfer any messages in the mailbox of the caller to the new controlling process. If any other process is interacting with the socket while the transfer is happening, the transfer may not work correctly and messages may remain in the caller's mailbox. For instance changing the sockets active mode before the transfer is complete may cause this.