Reactor-Core Netty, ByteBuff wrong allocation

57 Views Asked by At

I am working on a Java Netty-Reactor core project, I have some issues while receiving messages as ByteBuff. In my project I have a sender(server) that will take an object and create multiple chunks(2046) bytes, these chunks are then sent to the client where, based on a key they will be aggregated together and deserialized into the original object.

My issue is when I first send the object is working, however, when send time it throws an error, 3rd time it works.

logs

Client receiver side:

@Override
    public Mono<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        inbound.receive().retain()
                .doOnNext(byteBuf -> log.info("Bytes:{}", byteBuf))
                .map(MessengerUtils::deserialize) // Deserialize into message
                .map(this.assembler) // Assemble chunks
                .flatMap(optionalTemplate -> optionalTemplate.map(Mono::just).orElseGet(Mono::empty))
                //.flatMap(Optional::stream)
                .doOnNext(template -> inbound.withConnection(template::setOwner))
                .doOnNext(this.executor)
                .then()
                .subscribe();

        return outbound.neverComplete();
    }

Deserializer:

public static Message deserialize(ByteBuf byteBuf) {
        try {
            byte[] bytes = new byte[byteBuf.readableBytes()];
            byteBuf.readBytes(bytes);
            String messageWrapperString = SerializationUtils.deserialize(bytes);

            Message messageWrapper = objectMapper.readValue(messageWrapperString, Message.class);

            if (messageWrapper instanceof MessageKey) {
                return (MessageKey) messageWrapper;
            } else if (messageWrapper instanceof MessageChunk) {
                return (MessageChunk) messageWrapper;
            } else {
                return defaultMessage();
            }
        } catch (Exception e) {
            return defaultMessage();
        } finally {
            byteBuf.release(); // Release the ByteBuf
        }
    }

Message chunker:

public List<ByteBuf> createChunks(MessageTemplate template) {
        byte[] availableBytes = SerializationUtils.serialize(modelToString(template.getMessage()));
        final int totalMessageSize = availableBytes.length;

        List<ByteBuf> byteBufs = new ArrayList<>();
        
        final String messageId = generateMessageId();

        int chunkNumber = 0;
        int offset = 0;

        MessageKey key = new MessageKey();
        key.setMessageId(messageId);
        key.setTotalBytes(totalMessageSize);
        key.setEventType(template.getEventType());
        key.setEventAction(template.getEventAction());

        while (offset < totalMessageSize) {
            MessageChunk chunk = new MessageChunk();
            chunk.setMessageId(messageId);
            chunk.setChunkNumber(chunkNumber);

            List<Byte> byteList = new ArrayList<>();
            int chunkSizeCounter = 0;

            while (offset < totalMessageSize) {
                byteList.add(availableBytes[offset]);
                offset++;
                chunkSizeCounter++;

                chunk.setHexedBytes(formatToBytes(byteList));
                chunk.setChunkSize(chunkSizeCounter);

                byte[] serializedChunk = serializeMessage(chunk);
                ByteBuf buffer = Unpooled.copiedBuffer(serializedChunk);
                int size = buffer.readableBytes();
                buffer.release();
                
                //2046 bytes
                if (size >= DEFAULT_CHUNK_SIZE) {
                    break;
                }
            }

            ByteBuf buffer = Unpooled.copiedBuffer(serializeMessage(chunk));
            byteBufs.add(buffer);
            chunkNumber++;
        }

        key.setTotalChunks(chunkNumber);

        byte[] keyBytes = serializeMessage(key);
        ByteBuf keyBuff = Unpooled.copiedBuffer(keyBytes);
        byteBufs.add(keyBuff);

        return byteBufs;
    }

Sender:

private void sendToClients(MessageTemplate template) {
        for(Connection connection : this.connections.getClientConnections()) {
            connection.outbound()
                    .sendObject(Flux.fromIterable(MessengerUtils.createChunks(template))
                            .delayElements(Duration.ofMillis(120))
                    )
                    .then()
                    .subscribe();
        }
    }

I am not sure if the way I chunk the message and send it is the correct way or has any impact on how it works, but my guess so far is that on the receiver side, something is not working as it should. I am thinking that. the way I release the byteBuff on the client side(deserializer) may not be the correct way.

Or could be from the sender inside while loop where I am calling Unpooled.copiedBuffer(serializedChunk) in order to get the size.

Thank you!

Edit: Hello, so I found a way of doing it, since in TCP connection the receiver does not know directly how the data is sent for example either in chunks of 1024 bytes or 2200 bytes.

So the way I solved it is in my chunker I am making my chunks max 1025 bytes(1024 bytes + delimiter '\n') chunks and messageKey could be lower than 1024 but not greater.

Then in my receiver since I am adding the incoming bytes into a Buffer() after data has been added it checks for delimiters and slices the buffer based on the delimiters position then tries to map the bytes to JSON string then to my model.If we can successfully deserialize a chunk then we swift position in Buffer so the next time when we do he check we start from 0 and go until delimiter.

This method solved my initial problem where when data arrived the packages were not exactly what I sent for example: {"abc":123} failed to deserialize because sometimes instead of the full message, I'd get {"abc":123 then trying to serialize and failed for that package, second package } again failed to deserialize.

@Override
        public void handle(Buffer event) {
            synchronized (transmissionBuffer) {
                transmissionBuffer.appendBuffer(event);
            }
            processAfter();
        }

  
        private synchronized void processAfter() {
            String bufferContent = transmissionBuffer.toString();
            List<Integer> delimitersPosition = findDelimiters(bufferContent, MessengerUtils.delimiter.charAt(0));
            processBufferContent(bufferContent, delimitersPosition);
            updateTransmissionBuffer(delimitersPosition);
        }
0

There are 0 best solutions below