V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
shuang
V2EX  ›  外包

200 元有偿求助,使用 Java 的 rsocket 上传文件

  •  
  •   shuang · 73 天前 · 1752 次点击
    这是一个创建于 73 天前的主题,其中的信息可能已经有所发展或是发生改变。

    服务端是第三方的,我方需要按照接口文档上传文件。
    1 、分片上传,实体为 Flux<DataBuffer>
    2 、需要携带 header ,媒体类型为 application/json
    目前可以确认,服务端是 ok 的,问题出在客户端上传文件的代码。

    以下是客户端上传文件的代码:

    CompositeByteBuf compositeMetadata = ByteBufAllocator.DEFAULT.compositeBuffer();
    
    // 1. 创建路由元数据
    ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(
            ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute()));
     compositeMetadata.addComponent(true, ByteBufAllocator.DEFAULT.buffer().writeBytes(routeMetadata));
    
    // 2. 按文档要求添加请求 header
    Map<String, String> uploadFileHeader = new HashMap<>();
    uploadFileHeader.put("token", token);
    uploadFileHeader.put("fileType", "jpg");
    uploadFileHeader.put("fileName", "random-file-name");
    ByteBuf customMetadata = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader));
    CompositeMetadataCodec.encodeAndAddMetadata(compositeMetadata, ByteBufAllocator.DEFAULT, 
            WellKnownMimeType.APPLICATION_JSON, customMetadata);
    
    // 读取本地文件
    Flux<DataBuffer> dataBufferFlux = DataBufferUtils.read(Paths.get(filePath), new DefaultDataBufferFactory(), 1024 * 8);
    
    // 合并 Payloads  将每个 DataBuffer 转换为 Payload ,并附加 metadata
    Flux<Payload> requestPayloads = dataBufferFlux.map(dataBuffer -> {
        // 将每个 DataBuffer 转换为 Payload ,并附加 metadata
        return ByteBufPayload.create(Unpooled.wrappedBuffer(dataBuffer.asByteBuffer()), compositeMetadata);
    });
    
    rsocket.requestChannel(requestPayloads)
            .doOnNext(payload -> log.error("=====> doOnNext"))
            .doOnError(error -> log.error("=====> doOnError", error))
            .doOnComplete(() -> log.info("=====> doOnComplete"))
            .subscribe();
    
    

    注:由于对 rsocket 完全不熟,所以以上代码任何地方都有可能是错的。

    目前一直报错,缺少请求头:Missing header 'upload-file-header' for method parameter type , 从报错分析,已经请求到接口了,说明路由 metadata 没问题,但是请求头传递不正确。文件数据流是否传递正确还未知。

    希望寻求有过 rsocket 相关开发经验的人,帮忙看下代码哪里有问题。
    解决后发微信红包 200 元作为报酬。

    第 1 条附言  ·  71 天前
    各位大神,如果觉得报酬低了,都好商量,重点在于解决问题。
    第 2 条附言  ·  66 天前
    已解决,结帖。感谢各位大神的帮助,好几位给红包都不收,太实诚了,respect !
    17 条回复    2025-06-17 10:57:23 +08:00
    sioncheng
        1
    sioncheng  
       73 天前
    有点好奇,从报错信息来看,上传方是不是没有明确接收方需要的 upload-file-header 信息。还有,是不是可以先一般 java 代码去实现上传功能,确保明确了解了接收方的接口文档,然后再将一般 java 代码改为 rsocket 。
    lervard358
        2
    lervard358  
       73 天前
    我接了 怎么联系 加我  YWxwaGEtZW5naW5lZXJpbmc=
    larisboy
        3
    larisboy  
       72 天前
    uploadFileHeader 加上 upload-file-header 看看
    shuang
        4
    shuang  
    OP
       72 天前 via Android
    @larisboy 不知道怎么加,试了几种写法都不对
    shuang
        5
    shuang  
    OP
       72 天前 via Android
    @sioncheng 目前问题就在于如何用 rsocket 与服务端交互,不知道这个请求头该如何传递
    lbbdefy
        6
    lbbdefy  
       72 天前
    ByteBuf 大小端的问题要先确认
    sioncheng
        7
    sioncheng  
       71 天前
    @shuang 再探讨下。我意思是 rsocket 只是一个技术手段吧,rsocket 能做到的,其他 java 方式应该也能做到;理解清楚对方的接收协议才是本质,对方是标准的 multipart/form-data 协议还是自定义协议呢,这样才能对症解决问题吧。
    shuang
        8
    shuang  
    OP
       71 天前 via Android
    @sioncheng
    没太明白你的意思。
    对方的接口文档里就是要求按照 rsocket 的方式上传文件。你所说的 multipart/form-data ,应该是常规的 http 协议的文件上传,服务端是 tcp 协议的。两者技术手段不同。
    shuang
        9
    shuang  
    OP
       71 天前 via Android
    @lbbdefy 第一次听说这个词,我去搜一下什么是大小端
    sioncheng
        10
    sioncheng  
       71 天前
    @shuang 懂了,就是必须 rsocket ,并且对方也不是常见的 http multipart/form-data 协议。头疼,哈哈。
    shuang
        11
    shuang  
    OP
       71 天前
    ```
    // 元数据
    CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer();

    // 1. 创建路由元数据
    ByteBuf routeContent = TaggingMetadataCodec.createTaggingContent(
    ByteBufAllocator.DEFAULT,
    Collections.singletonList(platformConfig.getFileUploadRoute()));

    CompositeMetadataCodec.encodeAndAddMetadata(
    composite,
    ByteBufAllocator.DEFAULT,
    WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
    routeContent);

    // 2. 创建上传文件头
    ByteBuf headerContent = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, getHeaderContent(token));
    CompositeMetadataCodec.encodeAndAddMetadata(
    composite,
    ByteBufAllocator.DEFAULT,
    WellKnownMimeType.fromString("message/x.upload-file-header"),
    headerContent);

    // 读取本地文件
    Flux<DataBuffer> dataBufferFlux = DataBufferUtils.read(Paths.get(filePath), new DefaultDataBufferFactory(), 1024 * 8);

    Flux<Payload> requestPayloads = dataBufferFlux.map(buf -> ByteBufPayload.create(
    Unpooled.wrappedBuffer(buf.asByteBuffer()),
    composite
    ));

    rsocket.requestChannel(requestPayloads)
    .doOnNext(payload -> log.error("=====> doOnNext"))
    .doOnError(error -> log.error("=====> doOnError", error))
    .doOnComplete(() -> log.info("=====> doOnComplete"))
    .subscribe();
    ```

    报错说 Missing header 'upload-file-header' ,我又换了种写法,还是不行
    skyyan
        12
    skyyan  
       71 天前
    第三方提供的 api 接口文档能发下不
    kvolongoto
        13
    kvolongoto  
       70 天前
    // 1. 创建路由元数据 (保持不变)
    ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(
    ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute()));
    compositeMetadata.addComponent(true, ByteBufAllocator.DEFAULT.buffer().writeBytes(routeMetadata));

    // 2. 添加服务器要求的 upload-file-header (新增)
    ByteBuf uploadFileHeaderBuf = ByteBufUtil.writeUtf8(
    ByteBufAllocator.DEFAULT, "your-header-value-here"); // 替换为实际值
    CompositeMetadataCodec.encodeAndAddMetadata(
    compositeMetadata,
    ByteBufAllocator.DEFAULT,
    "upload-file-header", // 必须与服务器注解名称一致
    uploadFileHeaderBuf
    );

    // 3. 添加其他元数据 (JSON 格式)
    Map<String, String> uploadFileHeader = new HashMap<>();
    uploadFileHeader.put("token", token);
    uploadFileHeader.put("fileType", "jpg");
    uploadFileHeader.put("fileName", "random-file-name");
    ByteBuf customMetadata = ByteBufUtil.writeUtf8(
    ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader));
    CompositeMetadataCodec.encodeAndAddMetadata(
    compositeMetadata,
    ByteBufAllocator.DEFAULT,
    WellKnownMimeType.APPLICATION_JSON,
    customMetadata
    );
    shuang
        14
    shuang  
    OP
       70 天前
    @kvolongoto
    感谢答复。
    没看明白,your-header-value-here 这里应该传什么
    shuang
        15
    shuang  
    OP
       70 天前
    @kvolongoto
    文档里说:请求 header 参数,使用键值对,媒体类型为:application/json 。参数有 token 、fileName 、fileType
    shuang
        16
    shuang  
    OP
       66 天前
    已解决。分享一下:

    方案一:使用底层的 rsocket ,更灵活,但代码比较繁琐,适合对 rsocket 原理和 api 比较熟悉的人。
    RSocketClientConfig.java 关键代码:
    @Bean
    public RSocket rsocket() {
    ClientTransport transport = TcpClientTransport.create(platformConfig.getServerHost(), platformConfig.getServerPort());

    RSocket rsocket = RSocketConnector.create()
    // 设置 metadata MIME Type ,方便服务端根据 MIME 类型确定 metadata 内容
    .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
    .dataMimeType(WellKnownMimeType.APPLICATION_JSON.getString())
    // 认证相关的参数
    .setupPayload(getSetupPayload())
    // 接收服务器发送的响应
    .acceptor(new SocketAcceptorImpl())
    // 设置重连策略
    .reconnect(Retry.backoff(2, Duration.ofMillis(500)))
    .connect(transport)
    .block();

    // 检查连接是否成功
    if (rsocket == null || rsocket.isDisposed()) {
    throw new IllegalStateException("RSocket 连接失败");
    }

    return rsocket;
    }

    上传附件的单元测试代码:
    @Test
    public void testFileSimpleUpload() {
    String token = getToken();

    // 复合元数据
    CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer();

    // 1. 创建路由元数据
    ByteBuf routeContent = TaggingMetadataCodec.createTaggingContent(
    ByteBufAllocator.DEFAULT,
    Collections.singletonList(platformConfig.getFileUploadRoute()));
    CompositeMetadataCodec.encodeAndAddMetadata(
    compositeByteBuf,
    ByteBufAllocator.DEFAULT,
    WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
    routeContent);

    // 2. 创建上传文件头
    Map<String, String> uploadFileHeader = new HashMap<>();
    uploadFileHeader.put("platformCode", platformConfig.getPlatformCode());
    uploadFileHeader.put("token", token);
    String fileName = fileResource.getFilename();
    uploadFileHeader.put("fileType", fileName.substring(fileName.lastIndexOf(".") + 1));
    uploadFileHeader.put("fileName", UUID.randomUUID().toString().replaceAll("-", ""));
    ByteBuf headerContent = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader));
    CompositeMetadataCodec.encodeAndAddMetadata(
    compositeByteBuf,
    ByteBufAllocator.DEFAULT,
    WellKnownMimeType.APPLICATION_JSON,
    headerContent);

    // 2. 读取本地文件
    Flux<Payload> requestPayloads = DataBufferUtils
    .read(fileResource, new DefaultDataBufferFactory(), 1024 * 8)
    .map(buf -> ByteBufPayload.create(Unpooled.wrappedBuffer(buf.asByteBuffer())))
    .startWith(ByteBufPayload.create(Unpooled.EMPTY_BUFFER, compositeByteBuf));

    rsocket.requestChannel(requestPayloads)
    .doOnNext(payload -> log.info("=====> doOnNext {}", payload.getDataUtf8()))
    .doOnError(error -> log.error("=====> doOnError", error))
    .doOnComplete(() -> log.info("=====> doOnComplete"))
    .blockLast(Duration.ofSeconds(10));
    }
    shuang
        17
    shuang  
    OP
       66 天前
    方案二:使用 spring 封装的 RSocketRequester ,代码简洁易懂
    RSocketClientConfig.java 关键代码:
    @Bean
    public RSocketRequester rsocketRequester(RSocketRequester.Builder builder) {
    return RSocketRequester.wrap(
    rsocket(),
    MimeTypeUtils.APPLICATION_JSON,
    MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()),
    rsocketStrategies());
    }
    @Bean
    public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
    .encoders(encoders -> encoders.add(new Jackson2JsonEncoder()))
    .decoders(decoders -> decoders.add(new Jackson2JsonDecoder()))
    .build();
    }

    上传附件的单元测试代码:
    @Test
    public void uploadFile() {
    this.rSocketRequester
    .route(platformConfig.getFileUploadRoute())
    .metadata(spec -> spec.metadata(getUploadFileHeader(), MimeTypeUtils.APPLICATION_JSON))
    .data(DataBufferUtils.read(fileResource, new DefaultDataBufferFactory(), 1024 * 8))
    .retrieveFlux(String.class)
    .doOnNext(payload -> log.info("=====> doOnNext {}", payload))
    .doOnError(error -> log.error("=====> doOnError", error))
    .doOnComplete(() -> log.info("=====> doOnComplete"))
    .blockLast();
    }
    关于   ·   帮助文档   ·   自助推广系统   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2568 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 13:55 · PVG 21:55 · LAX 06:55 · JFK 09:55
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.