V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
zhazi
V2EX  ›  Java

兄弟们,请求个 netty 的线程问题

  •  
  •   zhazi · 10 小时 40 分钟前 · 470 次点击
    @Component
    @RequiredArgsConstructor
    public class WebSocketServer implements ApplicationRunner, DisposableBean {
    
        @Value("${netty.port}")
        private int port;
        private final EventLoopGroup bossGroup = new NioEventLoopGroup();
        private final EventLoopGroup workerGroup = new NioEventLoopGroup();
        private final ServerBootstrap b = new ServerBootstrap();
        private final Map<String, Channel> map = new HashMap<>();
    
        @Override
        public void destroy() {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new HttpServerCodec());
                            pipeline.addLast(new HttpObjectAggregator(65536));
                            pipeline.addLast(new WebSocketServerProtocolHandler("/ws", true, 3000L));
                            pipeline.addLast(new WebSocketFrameHandler(map));
                        }
                    });
            ChannelFuture future = b.bind(port).sync();
            System.out.println("WebSocket server started on port " + port);
            future.channel().closeFuture().sync();
        }
    }
    
    @Slf4j
    public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    
        private final Map<String, Channel> map;
        public static final AttributeKey<String> URI_ATTRIBUTE_KEY = AttributeKey.valueOf("URI");
    
        public WebSocketFrameHandler(Map<String, Channel> map) {
            this.map = map;
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
                WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
                Channel currentChannel = ctx.channel();
                String uri = handshakeComplete.requestUri();
                currentChannel.attr(URI_ATTRIBUTE_KEY).set(uri);
                Channel exist = map.get(uri);
                if (Objects.isNull(exist)) {
                    map.put(uri, currentChannel);
                    log.info(new Message(uri, currentChannel.id().toString(), 1).toString());
                } else {
                    //无效
                    exist.close().sync();//明确在 close 执行完成后在重新上线,但是好像没生效一样,直接就输出上线! 1= 上线,0= 下线
                    map.put(uri, currentChannel);
                    log.info(new Message(uri, currentChannel.id().toString(), 1).toString());
                    //无效
                    //exist.close().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));
                    //无效
                    //exist.close().sync().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));
                }
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            Channel channel = ctx.channel();
            String uri = channel.attr(URI_ATTRIBUTE_KEY).get();
            log.info(new Message(uri, channel.id().toString(), 0).toString());
        }
    }
    

    日志输出:

    2025-09-21 03:16:54.669  INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='86aeadad', status=上线}
    2025-09-21 03:16:58.837  INFO 30008 --- [ntLoopGroup-5-2] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='fa3a5fbb', status=上线}
    2025-09-21 03:16:58.838  INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='86aeadad', status=下线}
    

    我用 netty 提供的 websocket 服务,想达到新设备踢出老设备的在线状态, 但是控制不好状态

    预期的效果:

    • 1.86aeadad 上线成功
    • 2.fa3a5fbb 登录检查
    • 3.86aeadad 踢下线
    • 4.fa3a5fbb 上线

    日志输出:

    • 1.86aeadad 上线
    • 2.86aeadad 下线
    • 3.fa3a5fbb 上线

    但是现在看结果是错误的顺序。

    • 1.86aeadad 上线
    • 2.fa3a5fbb 上线
    • 3.86aeadad 下线

    想在这个服务里保证状态的正确,我尝试使用自定义 eventGroup 并且设置线程数 1 来执行代码也不行,也尝试跟 ai 沟通了半宿也研究明白

    netty-all 的版本是 4.2.6.Final

    2 条回复    2025-09-21 11:36:56 +08:00
    hsiafan
        1
    hsiafan  
       6 小时 37 分钟前   ❤️ 1
    Channel 的 close 应该只是会 fire 一个 ChannelInactive 的 event ,不会直接调用 channelInactive 这个方法。channelInactive 要由 eventloop 再去找对应 ChannelPipeline 来调用,这时候已经晚了。

    你这个代码只是日志打印的顺序看起来错乱而已,该 close 的 Channel 实际已经 close 了。
    zizon
        2
    zizon  
       2 小时 33 分钟前
    你把事情放到 exist.close()的回调里去做呗.
    关于   ·   帮助文档   ·   自助推广系统   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2593 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 20ms · UTC 06:10 · PVG 14:10 · LAX 23:10 · JFK 02:10
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.