minio oss 分块上传_youngboy的博客-程序员宅基地_minio 分片上传


minio 分块上传

s3协议中是有定义分块上传的接口,minio支持s3协议,所以minio是支持分块上传文件的,只是没有前端直传方案,各种java client有支持分段上传,但是这个分段的方法是私有的意思就是对用户是隐藏的 当文件大小超过5mb是就会自动选择分块上传

minio没有提供前端直传方案,所以也没有提供像七牛云一样的游览器端的sdk,官网提供的javascript代码是nodejs端使用的,也就是说你的文件需要经过应用转发一遍再由应用转存到minio中而不能直接像七牛云一样使用一次性token直接在前端分块上传文件

如果需要前端直传minio需要自己开发一个网关代理minio的分块上传接口并实现类似七牛云的一次性token鉴权功能

分块实现思路

前端分块

可以使用现成的支持s3的框架,或者自己手写
Uppy 框架
https://uppy.io/

后端分块上传接口

  • 分块上传接口
  • 合并文件接口

分块上传涉及到哪些接口参考s3 uploadPart https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html

上传逻辑

  1. 调用创建批量上传接口,获取id用来后续合并文件
  2. 前端调用minio分块上传接口
  3. 前端判断是否上传完毕,上传完毕前端调用合并文件接口

接口鉴权

大多数前端上传文件都是不需要登录的,这时就需要一个好的鉴权方案,防止接口被滥用,阿里oss等厂商使用的是临时token,后台通过接口返回临时token,token包含了可以操作的bucket,文件夹,文件读写权限等等,前端使用临时token上传文件,token有效期可以后台设置,这样就把控制权交给了后台

选择实现方式

最好添加到网关层面,只需要代理s3部分接口并实现鉴权即可
比如 nginx + lua 脚本 kong + lua插件

Java 分块代码分析

MinioClient.java 4371 行 搜索关键字 calculateMultipartSize

/**
   * Executes put object. If size of object data is <= 5MiB, single put object is used
   * else multipart put object is used.
   *
   * @param bucketName
   *          Bucket name.
   * @param objectName
   *          Object name in the bucket.
   * @param size
   *          Size of object data.
   * @param data
   *          Object data.
   */
  private void putObject(String bucketName, String objectName, Long size, Object data,
      Map<String, String> headerMap, ServerSideEncryption sse,  String contentType)
    throws InvalidBucketNameException, NoSuchAlgorithmException, IOException,
           InvalidKeyException, NoResponseException, XmlPullParserException, ErrorResponseException,
           InternalException, InvalidArgumentException, InsufficientDataException, InvalidResponseException {
    boolean unknownSize = false;

    if (size == null) {
      unknownSize = true;
      size = MAX_OBJECT_SIZE;
    }

    if (headerMap == null) {
      headerMap = new HashMap<>();
    }
    // Add content type if not already set
    // Content Type being passed as argument has higher precedence
    // than the one passed in headerMap if any.
    if (contentType == null) {
      if (headerMap.get("Content-Type") == null) {
        headerMap.put("Content-Type", "application/octet-stream");
      }
    } else {
      headerMap.put("Content-Type", contentType);
    }

    if (sse != null) {
      checkWriteRequestSse(sse);
      headerMap.putAll(sse.headers());
    }


    if (size <= MIN_MULTIPART_SIZE) {
      putObject(bucketName, objectName, data, size.intValue(), headerMap, null, 0);
      return;
    }

    /* Multipart upload 分块上传 */
    int[] rv = calculateMultipartSize(size);
    int partSize = rv[0];
    int partCount = rv[1];
    int lastPartSize = rv[2];
    Part[] totalParts = new Part[partCount];

    // initiate new multipart upload.
    String uploadId = initMultipartUpload(bucketName, objectName, headerMap);

    try {
      int expectedReadSize = partSize;
      for (int partNumber = 1; partNumber <= partCount; partNumber++) {
        if (partNumber == partCount) {
          expectedReadSize = lastPartSize;
        }

        // For unknown sized stream, check available size. 未知大小的数据 一般是直播监控等场景使用
        int availableSize = 0;
        if (unknownSize) {
          // Check whether data is available one byte more than expectedReadSize.
          availableSize = getAvailableSize(data, expectedReadSize + 1);
          // If availableSize is less or equal to expectedReadSize, then we reached last part.
          if (availableSize <= expectedReadSize) {
            // If it is first part, do single put object.
            if (partNumber == 1) {
              putObject(bucketName, objectName, data, availableSize, headerMap, null, 0);
              return;
            }
            expectedReadSize = availableSize;
            partCount = partNumber;
          }
        }

        Map<String, String> encryptionHeaders = null;
        // In multi-part uploads, set encryption headers in the case of SSE-C.
        if (sse != null && sse.type() == ServerSideEncryption.Type.SSE_C) {
          encryptionHeaders = sse.headers();
        }

        String etag = putObject(bucketName, objectName, data, expectedReadSize, encryptionHeaders,
                                uploadId, partNumber);
        totalParts[partNumber - 1] = new Part(partNumber, etag);
      }
      // All parts have been uploaded, complete the multipart upload. 所有的数据块上传成功就会合并文件
      completeMultipart(bucketName, objectName, uploadId, totalParts);
    } catch (RuntimeException e) {
      abortMultipartUpload(bucketName, objectName, uploadId);
      throw e;
    } catch (Exception e) {
      abortMultipartUpload(bucketName, objectName, uploadId);
      throw e;
    }
  }

进阶

断点续传协议

tus协议
https://tus.io/

前端上传文件sdk

Uppy 框架
https://uppy.io/


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM