[Java] 兄弟们,请求个 netty 的线程问题

·

@Component
@RequiredArgsConstructor
public class WebSocketServer implements ApplicationRunner, DisposableBean {

    @Value("${netty.port}")
    private int port;
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    private final ServerBootstrap b = new ServerBootstrap();
    private final Map<String, Channel> map = new HashMap<>();

    @Override
    public void destroy() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new HttpServerCodec());
                        pipeline.addLast(new HttpObjectAggregator(65536));
                        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", true, 3000L));
                        pipeline.addLast(new WebSocketFrameHandler(map));
                    }
                });
        ChannelFuture future = b.bind(port).sync();
        System.out.println("WebSocket server started on port " + port);
        future.channel().closeFuture().sync();
    }
}

@Slf4j
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    private final Map<String, Channel> map;
    public static final AttributeKey<String> URI_ATTRIBUTE_KEY = AttributeKey.valueOf("URI");

    public WebSocketFrameHandler(Map<String, Channel> map) {
        this.map = map;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            Channel currentChannel = ctx.channel();
            String uri = handshakeComplete.requestUri();
            currentChannel.attr(URI_ATTRIBUTE_KEY).set(uri);
            Channel exist = map.get(uri);
            if (Objects.isNull(exist)) {
                map.put(uri, currentChannel);
                log.info(new Message(uri, currentChannel.id().toString(), 1).toString());
            } else {
                //无效
                exist.close().sync();//明确在 close 执行完成后在重新上线,但是好像没生效一样,直接就输出上线! 1= 上线,0= 下线
                map.put(uri, currentChannel);
                log.info(new Message(uri, currentChannel.id().toString(), 1).toString());
                //无效
                //exist.close().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));
                //无效
                //exist.close().sync().addListener(future -> log.info(new Message(uri, currentChannel.id().toString(), 1).toString()));
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        String uri = channel.attr(URI_ATTRIBUTE_KEY).get();
        log.info(new Message(uri, channel.id().toString(), 0).toString());
    }
}

日志输出:

2025-09-21 03:16:54.669  INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='86aeadad', status=上线}
2025-09-21 03:16:58.837  INFO 30008 --- [ntLoopGroup-5-2] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='fa3a5fbb', status=上线}
2025-09-21 03:16:58.838  INFO 30008 --- [ntLoopGroup-5-1] org.example.WebSocketFrameHandler        : Message{uri='/ws/abc_123', sessionId='86aeadad', status=下线}

我用 netty 提供的 websocket 服务,想达到新设备踢出老设备的在线状态,
但是控制不好状态

预期的效果:

  • 1.86aeadad 上线成功
  • 2.fa3a5fbb 登录检查
  • 3.86aeadad 踢下线
  • 4.fa3a5fbb 上线

日志输出:

  • 1.86aeadad 上线
  • 2.86aeadad 下线
  • 3.fa3a5fbb 上线

但是现在看结果是错误的顺序。

  • 1.86aeadad 上线
  • 2.fa3a5fbb 上线
  • 3.86aeadad 下线

想在这个服务里保证状态的正确,我尝试使用自定义 eventGroup 并且设置线程数 1 来执行代码也不行,也尝试跟 ai 沟通了半宿也研究明白

netty-all 的版本是 4.2.6.Final

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *