服务端是第三方的,我方需要按照接口文档上传文件。
1 、分片上传,实体为 Flux<DataBuffer>
2 、需要携带 header ,媒体类型为 application/json
目前可以确认,服务端是 ok 的,问题出在客户端上传文件的代码。
以下是客户端上传文件的代码:
CompositeByteBuf compositeMetadata = ByteBufAllocator.DEFAULT.compositeBuffer();
// 1. 创建路由元数据
ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(
ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute()));
compositeMetadata.addComponent(true, ByteBufAllocator.DEFAULT.buffer().writeBytes(routeMetadata));
// 2. 按文档要求添加请求 header
Map<String, String> uploadFileHeader = new HashMap<>();
uploadFileHeader.put("token", token);
uploadFileHeader.put("fileType", "jpg");
uploadFileHeader.put("fileName", "random-file-name");
ByteBuf customMetadata = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader));
CompositeMetadataCodec.encodeAndAddMetadata(compositeMetadata, ByteBufAllocator.DEFAULT,
WellKnownMimeType.APPLICATION_JSON, customMetadata);
// 读取本地文件
Flux<DataBuffer> dataBufferFlux = DataBufferUtils.read(Paths.get(filePath), new DefaultDataBufferFactory(), 1024 * 8);
// 合并 Payloads 将每个 DataBuffer 转换为 Payload ,并附加 metadata
Flux<Payload> requestPayloads = dataBufferFlux.map(dataBuffer -> {
// 将每个 DataBuffer 转换为 Payload ,并附加 metadata
return ByteBufPayload.create(Unpooled.wrappedBuffer(dataBuffer.asByteBuffer()), compositeMetadata);
});
rsocket.requestChannel(requestPayloads)
.doOnNext(payload -> log.error("=====> doOnNext"))
.doOnError(error -> log.error("=====> doOnError", error))
.doOnComplete(() -> log.info("=====> doOnComplete"))
.subscribe();
注:由于对 rsocket 完全不熟,所以以上代码任何地方都有可能是错的。
目前一直报错,缺少请求头:Missing header 'upload-file-header' for method parameter type , 从报错分析,已经请求到接口了,说明路由 metadata 没问题,但是请求头传递不正确。文件数据流是否传递正确还未知。
希望寻求有过 rsocket 相关开发经验的人,帮忙看下代码哪里有问题。
解决后发微信红包 200 元作为报酬。
1
sioncheng 73 天前
有点好奇,从报错信息来看,上传方是不是没有明确接收方需要的 upload-file-header 信息。还有,是不是可以先一般 java 代码去实现上传功能,确保明确了解了接收方的接口文档,然后再将一般 java 代码改为 rsocket 。
|
2
lervard358 73 天前
我接了 怎么联系 加我 YWxwaGEtZW5naW5lZXJpbmc=
|
3
larisboy 72 天前
uploadFileHeader 加上 upload-file-header 看看
|
![]() |
6
lbbdefy 72 天前
ByteBuf 大小端的问题要先确认
|
7
sioncheng 71 天前
@shuang 再探讨下。我意思是 rsocket 只是一个技术手段吧,rsocket 能做到的,其他 java 方式应该也能做到;理解清楚对方的接收协议才是本质,对方是标准的 multipart/form-data 协议还是自定义协议呢,这样才能对症解决问题吧。
|
![]() |
8
shuang OP @sioncheng
没太明白你的意思。 对方的接口文档里就是要求按照 rsocket 的方式上传文件。你所说的 multipart/form-data ,应该是常规的 http 协议的文件上传,服务端是 tcp 协议的。两者技术手段不同。 |
![]() |
11
shuang OP ```
// 元数据 CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer(); // 1. 创建路由元数据 ByteBuf routeContent = TaggingMetadataCodec.createTaggingContent( ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute())); CompositeMetadataCodec.encodeAndAddMetadata( composite, ByteBufAllocator.DEFAULT, WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, routeContent); // 2. 创建上传文件头 ByteBuf headerContent = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, getHeaderContent(token)); CompositeMetadataCodec.encodeAndAddMetadata( composite, ByteBufAllocator.DEFAULT, WellKnownMimeType.fromString("message/x.upload-file-header"), headerContent); // 读取本地文件 Flux<DataBuffer> dataBufferFlux = DataBufferUtils.read(Paths.get(filePath), new DefaultDataBufferFactory(), 1024 * 8); Flux<Payload> requestPayloads = dataBufferFlux.map(buf -> ByteBufPayload.create( Unpooled.wrappedBuffer(buf.asByteBuffer()), composite )); rsocket.requestChannel(requestPayloads) .doOnNext(payload -> log.error("=====> doOnNext")) .doOnError(error -> log.error("=====> doOnError", error)) .doOnComplete(() -> log.info("=====> doOnComplete")) .subscribe(); ``` 报错说 Missing header 'upload-file-header' ,我又换了种写法,还是不行 |
12
skyyan 71 天前
第三方提供的 api 接口文档能发下不
|
13
kvolongoto 70 天前
// 1. 创建路由元数据 (保持不变)
ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent( ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute())); compositeMetadata.addComponent(true, ByteBufAllocator.DEFAULT.buffer().writeBytes(routeMetadata)); // 2. 添加服务器要求的 upload-file-header (新增) ByteBuf uploadFileHeaderBuf = ByteBufUtil.writeUtf8( ByteBufAllocator.DEFAULT, "your-header-value-here"); // 替换为实际值 CompositeMetadataCodec.encodeAndAddMetadata( compositeMetadata, ByteBufAllocator.DEFAULT, "upload-file-header", // 必须与服务器注解名称一致 uploadFileHeaderBuf ); // 3. 添加其他元数据 (JSON 格式) Map<String, String> uploadFileHeader = new HashMap<>(); uploadFileHeader.put("token", token); uploadFileHeader.put("fileType", "jpg"); uploadFileHeader.put("fileName", "random-file-name"); ByteBuf customMetadata = ByteBufUtil.writeUtf8( ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader)); CompositeMetadataCodec.encodeAndAddMetadata( compositeMetadata, ByteBufAllocator.DEFAULT, WellKnownMimeType.APPLICATION_JSON, customMetadata ); |
![]() |
14
shuang OP |
![]() |
15
shuang OP @kvolongoto
文档里说:请求 header 参数,使用键值对,媒体类型为:application/json 。参数有 token 、fileName 、fileType |
![]() |
16
shuang OP 已解决。分享一下:
方案一:使用底层的 rsocket ,更灵活,但代码比较繁琐,适合对 rsocket 原理和 api 比较熟悉的人。 RSocketClientConfig.java 关键代码: @Bean public RSocket rsocket() { ClientTransport transport = TcpClientTransport.create(platformConfig.getServerHost(), platformConfig.getServerPort()); RSocket rsocket = RSocketConnector.create() // 设置 metadata MIME Type ,方便服务端根据 MIME 类型确定 metadata 内容 .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()) .dataMimeType(WellKnownMimeType.APPLICATION_JSON.getString()) // 认证相关的参数 .setupPayload(getSetupPayload()) // 接收服务器发送的响应 .acceptor(new SocketAcceptorImpl()) // 设置重连策略 .reconnect(Retry.backoff(2, Duration.ofMillis(500))) .connect(transport) .block(); // 检查连接是否成功 if (rsocket == null || rsocket.isDisposed()) { throw new IllegalStateException("RSocket 连接失败"); } return rsocket; } 上传附件的单元测试代码: @Test public void testFileSimpleUpload() { String token = getToken(); // 复合元数据 CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(); // 1. 创建路由元数据 ByteBuf routeContent = TaggingMetadataCodec.createTaggingContent( ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute())); CompositeMetadataCodec.encodeAndAddMetadata( compositeByteBuf, ByteBufAllocator.DEFAULT, WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, routeContent); // 2. 创建上传文件头 Map<String, String> uploadFileHeader = new HashMap<>(); uploadFileHeader.put("platformCode", platformConfig.getPlatformCode()); uploadFileHeader.put("token", token); String fileName = fileResource.getFilename(); uploadFileHeader.put("fileType", fileName.substring(fileName.lastIndexOf(".") + 1)); uploadFileHeader.put("fileName", UUID.randomUUID().toString().replaceAll("-", "")); ByteBuf headerContent = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader)); CompositeMetadataCodec.encodeAndAddMetadata( compositeByteBuf, ByteBufAllocator.DEFAULT, WellKnownMimeType.APPLICATION_JSON, headerContent); // 2. 读取本地文件 Flux<Payload> requestPayloads = DataBufferUtils .read(fileResource, new DefaultDataBufferFactory(), 1024 * 8) .map(buf -> ByteBufPayload.create(Unpooled.wrappedBuffer(buf.asByteBuffer()))) .startWith(ByteBufPayload.create(Unpooled.EMPTY_BUFFER, compositeByteBuf)); rsocket.requestChannel(requestPayloads) .doOnNext(payload -> log.info("=====> doOnNext {}", payload.getDataUtf8())) .doOnError(error -> log.error("=====> doOnError", error)) .doOnComplete(() -> log.info("=====> doOnComplete")) .blockLast(Duration.ofSeconds(10)); } |
![]() |
17
shuang OP 方案二:使用 spring 封装的 RSocketRequester ,代码简洁易懂
RSocketClientConfig.java 关键代码: @Bean public RSocketRequester rsocketRequester(RSocketRequester.Builder builder) { return RSocketRequester.wrap( rsocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()), rsocketStrategies()); } @Bean public RSocketStrategies rsocketStrategies() { return RSocketStrategies.builder() .encoders(encoders -> encoders.add(new Jackson2JsonEncoder())) .decoders(decoders -> decoders.add(new Jackson2JsonDecoder())) .build(); } 上传附件的单元测试代码: @Test public void uploadFile() { this.rSocketRequester .route(platformConfig.getFileUploadRoute()) .metadata(spec -> spec.metadata(getUploadFileHeader(), MimeTypeUtils.APPLICATION_JSON)) .data(DataBufferUtils.read(fileResource, new DefaultDataBufferFactory(), 1024 * 8)) .retrieveFlux(String.class) .doOnNext(payload -> log.info("=====> doOnNext {}", payload)) .doOnError(error -> log.error("=====> doOnError", error)) .doOnComplete(() -> log.info("=====> doOnComplete")) .blockLast(); } |