minio oss 分塊上傳_youngboy的博客-程序員宅基地_minio 分片上傳


minio 分塊上傳

s3協議中是有定義分塊上傳的接口,minio支持s3協議,所以minio是支持分塊上傳文件的,只是沒有前端直傳方案,各種java client有支持分段上傳,但是這個分段的方法是私有的意思就是對用戶是隱藏的 當文件大小超過5mb是就會自動選擇分塊上傳

minio沒有提供前端直傳方案,所以也沒有提供像七牛雲一樣的游覽器端的sdk,官網提供的javascript代碼是nodejs端使用的,也就是說你的文件需要經過應用轉發一遍再由應用轉存到minio中而不能直接像七牛雲一樣使用一次性token直接在前端分塊上傳文件

如果需要前端直傳minio需要自己開發一個網關代理minio的分塊上傳接口並實現類似七牛雲的一次性token鑒權功能

分塊實現思路

前端分塊

可以使用現成的支持s3的框架,或者自己手寫
Uppy 框架
https://uppy.io/

后端分塊上傳接口

  • 分塊上傳接口
  • 合並文件接口

分塊上傳涉及到哪些接口參考s3 uploadPart https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html

上傳邏輯

  1. 調用創建批量上傳接口,獲取id用來后續合並文件
  2. 前端調用minio分塊上傳接口
  3. 前端判斷是否上傳完畢,上傳完畢前端調用合並文件接口

接口鑒權

大多數前端上傳文件都是不需要登錄的,這時就需要一個好的鑒權方案,防止接口被濫用,阿里oss等廠商使用的是臨時token,后台通過接口返回臨時token,token包含了可以操作的bucket,文件夾,文件讀寫權限等等,前端使用臨時token上傳文件,token有效期可以后台設置,這樣就把控制權交給了后台

選擇實現方式

最好添加到網關層面,只需要代理s3部分接口並實現鑒權即可
比如 nginx + lua 腳本 kong + lua插件

Java 分塊代碼分析

MinioClient.java 4371 行 搜索關鍵字 calculateMultipartSize

/**
   * Executes put object. If size of object data is <= 5MiB, single put object is used
   * else multipart put object is used.
   *
   * @param bucketName
   *          Bucket name.
   * @param objectName
   *          Object name in the bucket.
   * @param size
   *          Size of object data.
   * @param data
   *          Object data.
   */
  private void putObject(String bucketName, String objectName, Long size, Object data,
      Map<String, String> headerMap, ServerSideEncryption sse,  String contentType)
    throws InvalidBucketNameException, NoSuchAlgorithmException, IOException,
           InvalidKeyException, NoResponseException, XmlPullParserException, ErrorResponseException,
           InternalException, InvalidArgumentException, InsufficientDataException, InvalidResponseException {
    boolean unknownSize = false;

    if (size == null) {
      unknownSize = true;
      size = MAX_OBJECT_SIZE;
    }

    if (headerMap == null) {
      headerMap = new HashMap<>();
    }
    // Add content type if not already set
    // Content Type being passed as argument has higher precedence
    // than the one passed in headerMap if any.
    if (contentType == null) {
      if (headerMap.get("Content-Type") == null) {
        headerMap.put("Content-Type", "application/octet-stream");
      }
    } else {
      headerMap.put("Content-Type", contentType);
    }

    if (sse != null) {
      checkWriteRequestSse(sse);
      headerMap.putAll(sse.headers());
    }


    if (size <= MIN_MULTIPART_SIZE) {
      putObject(bucketName, objectName, data, size.intValue(), headerMap, null, 0);
      return;
    }

    /* Multipart upload 分塊上傳 */
    int[] rv = calculateMultipartSize(size);
    int partSize = rv[0];
    int partCount = rv[1];
    int lastPartSize = rv[2];
    Part[] totalParts = new Part[partCount];

    // initiate new multipart upload.
    String uploadId = initMultipartUpload(bucketName, objectName, headerMap);

    try {
      int expectedReadSize = partSize;
      for (int partNumber = 1; partNumber <= partCount; partNumber++) {
        if (partNumber == partCount) {
          expectedReadSize = lastPartSize;
        }

        // For unknown sized stream, check available size. 未知大小的數據 一般是直播監控等場景使用
        int availableSize = 0;
        if (unknownSize) {
          // Check whether data is available one byte more than expectedReadSize.
          availableSize = getAvailableSize(data, expectedReadSize + 1);
          // If availableSize is less or equal to expectedReadSize, then we reached last part.
          if (availableSize <= expectedReadSize) {
            // If it is first part, do single put object.
            if (partNumber == 1) {
              putObject(bucketName, objectName, data, availableSize, headerMap, null, 0);
              return;
            }
            expectedReadSize = availableSize;
            partCount = partNumber;
          }
        }

        Map<String, String> encryptionHeaders = null;
        // In multi-part uploads, set encryption headers in the case of SSE-C.
        if (sse != null && sse.type() == ServerSideEncryption.Type.SSE_C) {
          encryptionHeaders = sse.headers();
        }

        String etag = putObject(bucketName, objectName, data, expectedReadSize, encryptionHeaders,
                                uploadId, partNumber);
        totalParts[partNumber - 1] = new Part(partNumber, etag);
      }
      // All parts have been uploaded, complete the multipart upload. 所有的數據塊上傳成功就會合並文件
      completeMultipart(bucketName, objectName, uploadId, totalParts);
    } catch (RuntimeException e) {
      abortMultipartUpload(bucketName, objectName, uploadId);
      throw e;
    } catch (Exception e) {
      abortMultipartUpload(bucketName, objectName, uploadId);
      throw e;
    }
  }

進階

斷點續傳協議

tus協議
https://tus.io/

前端上傳文件sdk

Uppy 框架
https://uppy.io/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM