@Component
@RequiredArgsConstructor
public class WebSocketServer implements ApplicationRunner, DisposableBean {
@Value("${netty.port}")
private int port;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ServerBootstrap b = new ServerBootstrap();
private final Map<String, Channel> map = new HashMap<>();
@Override
public void destroy() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
@Override
public void run(ApplicationArguments args) throws Exception {
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", true, 3000L));
pipeline.addLast(new WebSocketFrameHandler(map));
}
});
ChannelFuture future = b.bind(port).sync();
System.out.println("WebSocket server started on port " + port);
future.channel().closeFuture().sync();
}
}
@Slf4j
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private final Map<String, Channel> map;
public static final AttributeKey<String> URI_ATTRIBUTE_KEY = AttributeKey.valueOf("URI");
public WebSocketFrameHandler(Map<String, Channel> map) {
this.map = map;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
Channel currentChannel = ctx.channel();
String uri = handshakeComplete.requestUri();
currentChannel.attr(URI_ATTRIBUTE_KEY).set(uri);
Channel exist = map.get(uri);
if (Objects.isNull(exist)) {
map.put(uri, currentChannel);
log.info(new Message(uri, currentChannel.id().toString(), 1).toString());
} else {
//无效
exist.close().sync();//明确在 close 执行完成后在重新上线,但是好像没生效一样,直接就输出上线! 1= 上线,0= 下线
map.put(uri, currentChannel);
log.info(new Message(uri, currentChannel.id().toString(), 1).toString());
//无效
//exist.close().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));
//无效
//exist.close().sync().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
String uri = channel.attr(URI_ATTRIBUTE_KEY).get();
log.info(new Message(uri, channel.id().toString(), 0).toString());
}
}
日志输出:
2025-09-21 03:16:54.669 INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler : Message{uri='/ws/abc_123', sessionId='86aeadad', status=上线}
2025-09-21 03:16:58.837 INFO 30008 --- [ntLoopGroup-5-2] org.example.WebSocketFrameHandler : Message{uri='/ws/abc_123', sessionId='fa3a5fbb', status=上线}
2025-09-21 03:16:58.838 INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler : Message{uri='/ws/abc_123', sessionId='86aeadad', status=下线}
我用 netty 提供的 websocket 服务,想达到新设备踢出老设备的在线状态, 但是控制不好状态
预期的效果:
日志输出:
但是现在看结果是错误的顺序。
想在这个服务里保证状态的正确,我尝试使用自定义 eventGroup 并且设置线程数 1 来执行代码也不行,也尝试跟 ai 沟通了半宿也研究明白
netty-all 的版本是 4.2.6.Final
1
hsiafan 6 小时 37 分钟前 ![]() Channel 的 close 应该只是会 fire 一个 ChannelInactive 的 event ,不会直接调用 channelInactive 这个方法。channelInactive 要由 eventloop 再去找对应 ChannelPipeline 来调用,这时候已经晚了。
你这个代码只是日志打印的顺序看起来错乱而已,该 close 的 Channel 实际已经 close 了。 |
![]() |
2
zizon 2 小时 33 分钟前
你把事情放到 exist.close()的回调里去做呗.
|