V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
montaro2017
V2EX  ›  Java

用 netty 编写代理服务器,切换出口 IP,不能及时生效

  •  
  •   montaro2017 · 18 小时 25 分钟前 · 1034 次点击

    公司有一台服务器,有很多个公网 IP ,就想着能不能利用起来。

    然后现在有一个任务是用浏览器打开指定网址,返回网页源代码,我就打算把这个服务器做成代理服务器。

    本来是计划每个 IP 做一个代理服务器,代理服务器根据入口 IP 用对应的 IP 连接目标服务器。

    开启每个浏览器的时候设置代理地址,比如 1.2.3.4:8639, 1.2.3.5:8639 这样 。

    然后发现多开浏览器非常吃性能,要充分利用所有的公网 IP 得开几十个浏览器,这时候已经卡到动不了了,肯定不行。

    所以我就想开 4 个浏览器,每个浏览器设置一个代理,然后通过接口去切换代理后端的出口。

    代理服务器是用 netty 写的,逻辑改成了绑定不同端口,然后通过接口指定端口号和出口 IP ,来切换不同端口对应代理的出口 IP 。

    其实就是存了一个 Map<Integer, String>,调接口修改这个 map ,netty 代理服务器连接目标服务器的时候使用出口地址去连接。

    现在问题在于,调用接口切换出口 IP 后,日志显示已经使用新的出口 IP 了,但是访问查询 IP 的网站,还是使用之前的 IP ,好像要等一段时间才生效,这是什么问题,求各位大佬指教

    @Log4j2
    public class ProxyFrontendHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    
        private final AddressFunction addressFunction;
    
        public ProxyFrontendHandler(AddressFunction addressFunction) {
            this.addressFunction = addressFunction;
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
            if (HttpMethod.CONNECT.equals(req.method())) {
                handleConnectRequest(ctx, req);
                return;
            }
            handleHttpRequest(ctx, req);
        }
    
        private void handleConnectRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
            List<String> split = StrUtil.split(req.uri(), ":");
            String host = CollUtil.getFirst(split);
            int port = Convert.toInt(CollUtil.get(split, 1), 443);
    
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(ctx.channel().eventLoop())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new RelayHandler(ctx.channel()));
                        }
                    });
    
            ChannelFuture connectFuture;
            InetSocketAddress remoteAddress = new InetSocketAddress(host, port);
            InetSocketAddress sourceAddress = addressFunction.apply(ctx);
            if (sourceAddress != null) {
                log.info("Using outbound: {} | host: {}", sourceAddress, remoteAddress.getHostString());
                connectFuture = bootstrap.connect(remoteAddress, sourceAddress);
            } else {
                connectFuture = bootstrap.connect(remoteAddress);
            }
            connectFuture.addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    Channel outboundChannel = future.channel();
                    DefaultFullHttpResponse response = new DefaultFullHttpResponse(
                            HttpVersion.HTTP_1_1,
                            HttpResponseStatus.OK
                    );
                    response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
                    ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
                        try {
                            ctx.pipeline().remove(HttpServerCodec.class);
                            ctx.pipeline().remove(HttpObjectAggregator.class);
                            ctx.pipeline().addLast(new RelayHandler(outboundChannel));
                        } catch (Exception ignored) {
                        }
                    });
                } else {
                    sendErrorResponse(ctx, "无法连接到目标服务器");
                    closeOnFlush(ctx.channel());
                }
            });
        }
    
        private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
            String host = req.headers().get(HttpHeaderNames.HOST);
            if (host == null) {
                sendErrorResponse(ctx, "缺少 Host 头");
                closeOnFlush(ctx.channel());
                return;
            }
            String[] hostParts = host.split(":");
            String targetHost = hostParts[0];
            int targetPort = hostParts.length > 1 ? Integer.parseInt(hostParts[1]) : 80;
            // 修改请求 URI 为绝对路径
            req.setUri(req.uri().replace("http://" + host, ""));
            req.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
    
            // 复制请求以避免在异步操作期间被释放
            FullHttpRequest copiedReq = req.copy();
            // 创建到目标服务器的连接
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(ctx.channel().eventLoop())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new HttpClientCodec());
                            ch.pipeline().addLast(new HttpObjectAggregator(1024 * 1024)); // 增加到 1MB
                            ch.pipeline().addLast(new RelayHandler(ctx.channel()));
                        }
                    });
            ChannelFuture connectFuture;
            InetSocketAddress remoteAddress = new InetSocketAddress(targetHost, targetPort);
            InetSocketAddress sourceAddress = addressFunction.apply(ctx);
            if (sourceAddress != null) {
                log.info("Using outbound: {} | host: {}", sourceAddress, remoteAddress.getHostString());
                connectFuture = bootstrap.connect(remoteAddress, sourceAddress);
            } else {
                connectFuture = bootstrap.connect(remoteAddress);
            }
            connectFuture.addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    future.channel().writeAndFlush(copiedReq);
                } else {
                    closeOnFlush(ctx.channel());
                }
                if (copiedReq.refCnt() != 0) {
                    copiedReq.release();
                }
            });
        }
    
        private void sendErrorResponse(ChannelHandlerContext ctx, String message) {
            FullHttpResponse response = new DefaultFullHttpResponse(
                    HttpVersion.HTTP_1_1,
                    HttpResponseStatus.BAD_GATEWAY,
                    Unpooled.wrappedBuffer(message.getBytes())
            );
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
            ctx.writeAndFlush(response);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            if (cause instanceof SocketException) {
                closeOnFlush(ctx.channel());
                return;
            }
            log.error(cause.getMessage());
            closeOnFlush(ctx.channel());
        }
    
        private void closeOnFlush(Channel ch) {
            if (ch.isActive()) {
                ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
            }
        }
    }
    
    @Log4j2
    public class RelayHandler extends ChannelInboundHandlerAdapter {
    
        private final Channel relayChannel;
    
        public RelayHandler(Channel relayChannel) {
            this.relayChannel = relayChannel;
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.read();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            if (relayChannel.isActive()) {
                relayChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
                    if (future.isSuccess()) {
                        ctx.read(); // 继续读取数据
                    } else {
                        future.channel().close();
                    }
                });
            } else {
                closeOnFlush(ctx.channel());
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            log.error(cause);
            closeOnFlush(ctx.channel());
        }
    
        private void closeOnFlush(Channel ch) {
            if (ch.isActive()) {
                ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
            }
        }
    }
    
    
    第 1 条附言  ·  17 小时 53 分钟前

    日志显示已经从110切换到120,但刷新了几次访问还是显示110

    第 2 条附言  ·  13 小时 48 分钟前

    后续来了,用了一种比较迂回的解决方法,PAC + 浏览器Proxy Re-apply。

    写了一个接口返回PAC,切换IP时,返回对应代理的PAC,然后浏览器打开chrome://net-internals/#proxy,点一下Re-apply,这样就会获取新的PAC并使用代理,然后再访问目标网址。

    15 条回复    2025-10-30 20:38:41 +08:00
    aladdinding
        1
    aladdinding  
       18 小时 17 分钟前
    浏览器连接池的问题吧
    Ipsum
        2
    Ipsum  
       18 小时 2 分钟前 via Android
    我猜没有保存 conn ,切换时,没有关闭旧链接。
    montaro2017
        3
    montaro2017  
    OP
       17 小时 59 分钟前
    @Ipsum #2
    ChannelFuture connectFuture;
    InetSocketAddress remoteAddress = new InetSocketAddress(targetHost, targetPort);
    InetSocketAddress sourceAddress = addressFunction.apply(ctx);
    if (sourceAddress != null) {
    log.info("Using outbound: {} | host: {}", sourceAddress, remoteAddress.getHostString());
    connectFuture = bootstrap.connect(remoteAddress, sourceAddress);
    } else {
    connectFuture = bootstrap.connect(remoteAddress);
    }
    这里在连接目标服务器的时候获取了出口地址,然后指定用这个地址去连接的,日志有打印出来,只是连接的时候不知道为什么用的还是之前的出口地址
    montaro2017
        4
    montaro2017  
    OP
       17 小时 46 分钟前
    @aladdinding #1 还真是这个问题,我用 curl 试了,IP 立马就变了
    montaro2017
        5
    montaro2017  
    OP
       17 小时 37 分钟前
    @aladdinding #1 但为啥我日志里显示是用新的出口 IP
    cq65617875
        6
    cq65617875  
       17 小时 13 分钟前
    浏览器的缓存 用 curl
    cq65617875
        7
    cq65617875  
       17 小时 12 分钟前
    @cq65617875 或者尝试每次测试都新起一个隐私窗口
    montaro2017
        8
    montaro2017  
    OP
       17 小时 5 分钟前
    @cq65617875 #7 就是因为每次开新窗口开销大,才选择代理后端切换的,浏览器是用 selenium 控制的
    5waker
        9
    5waker  
       16 小时 56 分钟前
    ```rust
    5waker
        10
    5waker  
       16 小时 51 分钟前
    @montaro2017 我刚好昨天也遇到了类似的问题,我的做法是在一个长连接里不断检测 header ,然后根据内容再做转发
    ```rust
    loop {
    // 读取到完整头部
    let header_end = loop {
    if let Some(pos) = headers_end_pos(&buf) {
    break Some(pos);
    }
    let n = client_stream.read(&mut tmp).await?;
    if n == 0 {
    // 客户端关闭或无更多数据
    if buf.is_empty() {
    return Ok(());
    } else {
    return Ok(());
    }
    }
    buf.extend_from_slice(&tmp[..n]);
    if buf.len() > 128 * 1024 {
    error!("请求头过大,终止连接");
    return Ok(());
    }
    };
    let header_end = header_end.unwrap();
    let headers_vec: Vec<u8> = buf[..header_end].to_vec();

    let virtual_env = parse_virtual_env_from_headers(&headers_vec);
    let content_length = parse_content_length(&headers_vec);
    let chunked = is_chunked(&headers_vec);

    // 读取完整正文
    let body_len = if let Some(cl) = content_length {
    while buf.len() < header_end + cl {
    let n = client_stream.read(&mut tmp).await?;
    if n == 0 {
    error!("Content-Length 指定但连接提前关闭");
    return Ok(());
    }
    buf.extend_from_slice(&tmp[..n]);
    }
    cl
    } else if chunked {
    loop {
    if let Some(end) = chunked_body_end_pos(&buf[header_end..]) {
    break end;
    }
    let n = client_stream.read(&mut tmp).await?;
    if n == 0 {
    error!("chunked 正文未完整但连接已关闭");
    return Ok(());
    }
    buf.extend_from_slice(&tmp[..n]);
    }
    } else {
    0
    };

    let request_end = header_end + body_len;
    let body = &buf[header_end..request_end];
    let mut new_headers = rewrite_connection_close(&headers_vec);
    new_headers.extend_from_slice(body);

    // 路由:每个请求一个目标连接;响应仅回写到客户端
    if let Some(env) = virtual_env {
    let ctrl_opt = {
    let envs = (*VIRTUAL_ENVS).lock().unwrap();
    envs.get(&env).cloned()
    };
    if let Some(mut ctrl) = ctrl_opt {
    match ctrl.open_stream().await {
    Ok(mut sub) => {
    sub.write_all(&new_headers).await?;
    let _ = tokio::io::copy(&mut sub, &mut client_stream).await;
    }
    Err(e) => {
    error!("打开虚拟环境 {} 的子流失败: {:?}", env, e);
    let mut upstream_stream = TcpStream::connect(upstream_addr).await?;
    upstream_stream.write_all(&new_headers).await?;
    let _ = tokio::io::copy(&mut upstream_stream, &mut client_stream).await;
    }
    }
    } else {
    let mut upstream_stream = TcpStream::connect(upstream_addr).await?;
    upstream_stream.write_all(&new_headers).await?;
    let _ = tokio::io::copy(&mut upstream_stream, &mut client_stream).await;
    }
    } else {
    let mut upstream_stream = TcpStream::connect(upstream_addr).await?;
    upstream_stream.write_all(&new_headers).await?;
    let _ = tokio::io::copy(&mut upstream_stream, &mut client_stream).await;
    }

    // 删除已消费的请求字节,保留后续请求(若已到达)
    if request_end < buf.len() {
    let remaining = buf.split_off(request_end);
    buf = remaining;
    } else {
    buf.clear();
    }
    }
    ```
    montaro2017
        11
    montaro2017  
    OP
       16 小时 46 分钟前
    @5waker #10 我要用浏览器自动化来获取网页源代码,所以只能用代理服务器
    Gilfoyle26
        12
    Gilfoyle26  
       16 小时 15 分钟前
    用 c 写,解决一切烦恼
    ronyin
        13
    ronyin  
       16 小时 11 分钟前
    这是搞爬虫么。。。
    montaro2017
        14
    montaro2017  
    OP
       15 小时 56 分钟前
    @ronyin #13 对
    testFor
        15
    testFor  
       7 小时 37 分钟前
    写这个 netty 不如 go 好用
    关于   ·   帮助文档   ·   自助推广系统   ·   博客   ·   API   ·   FAQ   ·   Solana   ·   927 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 20:15 · PVG 04:15 · LAX 13:15 · JFK 16:15
    ♥ Do have faith in what you're doing.