Getting netty closedchannelsexception

18 Views Asked by At

I am trying to write a mqttserver using netty using below code

public class MqttServer {
    private final int port;

    public MqttServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        MQTTMessageHandler mqttMessageHandler = new MQTTMessageHandler();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MqttDecoder());
                            ch.pipeline().addLast(MqttEncoder.INSTANCE);
                            ch.pipeline().addLast(mqttMessageHandler);
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);;

            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 1883; // Change the port if needed
        new MqttServer(port).run();
    }
}

Not below is how I have written mqtthandler which handle publish and subscribe

@ChannelHandler.Sharable
public class MQTTMessageHandler extends SimpleChannelInboundHandler<MqttMessage> {
    Map<String, List<Channel>> channelMap = new HashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MqttMessage message) throws Exception {
        // Handle different types of MQTT messages
        switch (message.fixedHeader().messageType()) {
            case CONNECT:
                handleConnectMessage(ctx, (MqttConnectMessage) message);
                break;
            case PUBLISH:
                handlePublishMessage(ctx, (MqttPublishMessage) message);
                break;
            case SUBSCRIBE:
                handleSubscribeMessage(ctx, (MqttSubscribeMessage) message);
                break;
            case UNSUBSCRIBE:
                handleUnsubscribeMessage(ctx, (MqttUnsubscribeMessage) message);
                break;
            case DISCONNECT:
                handleDisconnectMessage(ctx);
                break;
            default:
                // Handle unsupported message types or other cases
                break;
        }
    }

    private void handleConnectMessage(ChannelHandlerContext ctx, MqttConnectMessage msg) {
        // Process the CONNECT message
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK,false, MqttQoS.AT_LEAST_ONCE, false,0);
        MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false);
        ctx.write(new MqttConnAckMessage(mqttFixedHeader , mqttConnAckVariableHeader));
        ctx.flush();
        System.out.println("Received CONNECT message");
}
private void handlePublishMessage(ChannelHandlerContext ctx, MqttPublishMessage msg) {
        // Process the PUBLISH message
        System.out.println("Received PUBLISH message");
        String topic = msg.variableHeader().topicName();
        byte[] payload = new byte[msg.payload().readableBytes()];
        msg.payload().getBytes(0, payload);
        if(channelMap.containsKey(topic)){
           List<Channel> channels = channelMap.get(topic);
           for(Channel channel : channels){
               channel.write(msg);
               channel.flush();
           }
        }
    }

    private void handleSubscribeMessage(ChannelHandlerContext ctx, MqttSubscribeMessage msg) {
        System.out.println("Received SUBSCRIBE message");

        // Process the SUBSCRIBE message

        // Retrieve list of topic subscriptions from the SUBSCRIBE message
        List<MqttTopicSubscription> subscriptions = msg.payload().topicSubscriptions();

        for (MqttTopicSubscription subscription:
             subscriptions) {
            String topic = subscription.topicName();
            if(channelMap.containsKey(topic)){
                channelMap.get(topic).add(ctx.channel());
            }
            else{
                List<Channel> channels = new ArrayList<>();
                channels.add(ctx.channel());
                channelMap.put(topic, channels);
            }
        }

        // Prepare the SUBACK message with appropriate QoS levels
        MqttFixedHeader fixedHeader = new MqttFixedHeader(
                MqttMessageType.SUBACK,
                false,
                MqttQoS.AT_LEAST_ONCE,
                false,
                0
        );
        MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(msg.variableHeader().messageId());
        MqttSubAckPayload payload = generateSubAckPayload(subscriptions);

        // Create the SUBACK message
        MqttSubAckMessage subAckMessage = new MqttSubAckMessage(fixedHeader, variableHeader, payload);

        // Send the SUBACK message back to the client
        ctx.writeAndFlush(subAckMessage);
    }

    
private MqttSubAckPayload generateSubAckPayload(List<MqttTopicSubscription> subscriptions) {
        List<Integer> collect = subscriptions.stream().map(s -> s.qualityOfService().value()).collect(Collectors.toList());
        MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(collect);
        return mqttSubAckPayload;
    }

    private void handleUnsubscribeMessage(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) {
        MqttUnsubscribePayload payload = msg.payload();
        MqttFixedHeader mqttFixedHeader = null;
        MqttMessageIdVariableHeader variableHeader = null ;
        for(String topic : payload.topics()){
            if(channelMap.containsKey(topic)){
                channelMap.get(topic).remove(ctx.channel());
                ctx.writeAndFlush(new MqttUnsubAckMessage(mqttFixedHeader, variableHeader));
            }
        }

        // Process the UNSUBSCRIBE message
        System.out.println("Received UNSUBSCRIBE message");
    }

    private void handleDisconnectMessage(ChannelHandlerContext ctx) {
        // Process the DISCONNECT message
        System.out.println("Received DISCONNECT message");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Handle exceptions
        cause.printStackTrace();
        ctx.close();
    }
}

Now subscribe is working fine and I am recieving message at client side but when I try to publish a message I get java.nio.channels.CloseChannelException.

I am storing the channel in a map during subscribe method and then writing to same channel while publish message

0

There are 0 best solutions below