兄弟们,请求个 netty 的线程问题

13 小时 16 分钟前
 zhazi
@Component
@RequiredArgsConstructor
public class WebSocketServer implements ApplicationRunner, DisposableBean {

    @Value("${netty.port}")
    private int port;
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    private final ServerBootstrap b = new ServerBootstrap();
    private final Map<String, Channel> map = new HashMap<>();

    @Override
    public void destroy() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new HttpServerCodec());
                        pipeline.addLast(new HttpObjectAggregator(65536));
                        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", true, 3000L));
                        pipeline.addLast(new WebSocketFrameHandler(map));
                    }
                });
        ChannelFuture future = b.bind(port).sync();
        System.out.println("WebSocket server started on port " + port);
        future.channel().closeFuture().sync();
    }
}

@Slf4j
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    private final Map<String, Channel> map;
    public static final AttributeKey<String> URI_ATTRIBUTE_KEY = AttributeKey.valueOf("URI");

    public WebSocketFrameHandler(Map<String, Channel> map) {
        this.map = map;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            Channel currentChannel = ctx.channel();
            String uri = handshakeComplete.requestUri();
            currentChannel.attr(URI_ATTRIBUTE_KEY).set(uri);
            Channel exist = map.get(uri);
            if (Objects.isNull(exist)) {
                map.put(uri, currentChannel);
                log.info(new Message(uri, currentChannel.id().toString(), 1).toString());
            } else {
                //无效
                exist.close().sync();//明确在 close 执行完成后在重新上线,但是好像没生效一样,直接就输出上线! 1= 上线,0= 下线
                map.put(uri, currentChannel);
                log.info(new Message(uri, currentChannel.id().toString(), 1).toString());
                //无效
                //exist.close().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));
                //无效
                //exist.close().sync().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        String uri = channel.attr(URI_ATTRIBUTE_KEY).get();
        log.info(new Message(uri, channel.id().toString(), 0).toString());
    }
}

日志输出:

2025-09-21 03:16:54.669  INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='86aeadad', status=上线}
2025-09-21 03:16:58.837  INFO 30008 --- [ntLoopGroup-5-2] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='fa3a5fbb', status=上线}
2025-09-21 03:16:58.838  INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='86aeadad', status=下线}

我用 netty 提供的 websocket 服务,想达到新设备踢出老设备的在线状态, 但是控制不好状态

预期的效果:

日志输出:

但是现在看结果是错误的顺序。

想在这个服务里保证状态的正确,我尝试使用自定义 eventGroup 并且设置线程数 1 来执行代码也不行,也尝试跟 ai 沟通了半宿也研究明白

netty-all 的版本是 4.2.6.Final

542 次点击
所在节点    Java
2 条回复
hsiafan
9 小时 13 分钟前
Channel 的 close 应该只是会 fire 一个 ChannelInactive 的 event ,不会直接调用 channelInactive 这个方法。channelInactive 要由 eventloop 再去找对应 ChannelPipeline 来调用,这时候已经晚了。

你这个代码只是日志打印的顺序看起来错乱而已,该 close 的 Channel 实际已经 close 了。
zizon
5 小时 9 分钟前
你把事情放到 exist.close()的回调里去做呗.

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://ex.noerr.eu.org/t/1160806

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX