ceph儲存的S3接口實現(支持斷點續傳)


最近公司准備接ceph儲存,研究了一番,准備用亞馬遜的s3接口實現,實現類如下:

/**
 * Title:        S3Manager
 * Description:  Ceph儲存的s3接口實現,參考文檔:
 * https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/dev/RetrievingObjectUsingJava.html
 * http://docs.ceph.org.cn/radosgw/s3/
 * author:       xu jun
 * date:         2018/10/22
 */
@Slf4j
@Service
public class S3Manager extends StorageManagerBase implements StorageManager {
    private final UKID ukid;
    private final S3ClientConfig s3ClientConfig;
    private final RedisManage redisManage;
    private AmazonS3 amazonClient;

    @Autowired
    public S3Manager(UKID ukid, S3ClientConfig s3ClientConfig, RedisManage redisManage) {
        this.ukid = ukid;
        this.s3ClientConfig = s3ClientConfig;
        this.redisManage = redisManage;
    }

    private AmazonS3 getAmazonClient() {
        if (amazonClient == null) {
            String accessKey = s3ClientConfig.getAccessKey();
            String secretKey = s3ClientConfig.getSecretKey();
            String endpoint = s3ClientConfig.getEndPoint();

            AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
            ClientConfiguration clientConfig = new ClientConfiguration();
            clientConfig.setProtocol(Protocol.HTTP);

            AmazonS3 conn = AmazonS3ClientBuilder.standard()
                    .withClientConfiguration(clientConfig)
                    .withCredentials(new AWSStaticCredentialsProvider(credentials))
                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, ""))
                    .withPathStyleAccessEnabled(true)
                    .build();

            //檢查儲存空間是否創建
            checkBucket(conn);
            amazonClient = conn;
        }
        return amazonClient;
    }

    @Override
    public String uploadFile(byte[] fileData, String extension) {
        log.info("Storage s3 api, upload file start");

        //生成上傳文件的隨機序號
        long fileId = ukid.getGeneratorID();
        String fileName = Long.toString(fileId);
        //儲存空間名
        String bucketName = s3ClientConfig.getBucketName();
        AmazonS3 conn = getAmazonClient();

        PutObjectResult result = conn.putObject(bucketName, fileName, new ByteArrayInputStream(fileData), null);
        log.info("Storage s3 api, put object result :{}", result);

        log.info("Storage s3 api, upload file end, file name:" + fileName);
        return fileName;
    }

    @Override
    public String uploadAppenderFile(byte[] fileData, String extension) {
        log.info("Storage s3 api, upload appender file start");

        //生成上傳文件的隨機序號
        long ukId = ukid.getGeneratorID();
        String fileName = Long.toString(ukId);
        //儲存空間名
        String bucketName = s3ClientConfig.getBucketName();
        AmazonS3 conn = getAmazonClient();
        List<PartETag> partETags = new ArrayList<>();
        //初始化分片上傳
        InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, fileName);
        InitiateMultipartUploadResult initResponse = conn.initiateMultipartUpload(initRequest);
        String uploadId = initResponse.getUploadId();

        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(fileData);
        Integer contentLength = fileData.length;
        // 文件上傳
        UploadPartRequest uploadPartRequest = new UploadPartRequest()
                .withBucketName(bucketName)
                .withKey(fileName)
                .withUploadId(uploadId)
                .withPartNumber(1)
                .withPartSize(contentLength)
                .withInputStream(byteArrayInputStream);
        UploadPartResult uploadPartResult = conn.uploadPart(uploadPartRequest);

        try {
            byteArrayInputStream.close();
        } catch (IOException e) {
            throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
        }
        partETags.add(uploadPartResult.getPartETag());
        Integer partNumber = uploadPartResult.getPartNumber();

        S3CacheMode cacheMode = new S3CacheMode();
        cacheMode.setPartETags(partETags);
        cacheMode.setPartNumber(partNumber);
        cacheMode.setUploadId(uploadId);
        redisManage.set(fileName, cacheMode);

        log.info("Storage s3 api, upload appender file end, fileName: {}", fileName);
        return fileName;
    }

    @Override
    public void uploadChunkFile(ChunkFileSaveParams chunkFileSaveParams) {
        log.info("Storage s3 api, upload chunk file start");

        String fileName = chunkFileSaveParams.getFileAddress();
        Result result = redisManage.get(fileName);
        JSONObject jsonObject = (JSONObject) result.getData();
        if (jsonObject == null) {
            throw FileCenterExceptionConstants.CACHE_DATA_NOT_EXIST;
        }
        S3CacheMode cacheMode = jsonObject.toJavaObject(S3CacheMode.class);
        Integer partNumber = cacheMode.partNumber;
        String uploadId = cacheMode.getUploadId();
        List<PartETag> partETags = cacheMode.partETags;

        //儲存空間名
        String bucketName = s3ClientConfig.getBucketName();
        AmazonS3 conn = getAmazonClient();
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(chunkFileSaveParams.getBytes());
        Integer contentLength = chunkFileSaveParams.getBytes().length;

        UploadPartRequest uploadPartRequest = new UploadPartRequest()
                .withBucketName(bucketName)
                .withKey(fileName)
                .withUploadId(uploadId)
                .withPartNumber(partNumber + 1)
                .withPartSize(contentLength)
                .withInputStream(byteArrayInputStream);

        UploadPartResult uploadPartResult = conn.uploadPart(uploadPartRequest);
        partETags.add(uploadPartResult.getPartETag());
        partNumber = uploadPartResult.getPartNumber();

        try {
            byteArrayInputStream.close();
        } catch (IOException e) {
            throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
        }

        S3CacheMode cacheModeUpdate = new S3CacheMode();
        cacheModeUpdate.setPartETags(partETags);
        cacheModeUpdate.setPartNumber(partNumber);
        cacheModeUpdate.setUploadId(uploadId);
        redisManage.set(fileName, cacheModeUpdate);

        if (chunkFileSaveParams.getChunk().equals(chunkFileSaveParams.getChunks() - 1)) {
            //完成分片上傳,生成儲存對象
            CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, fileName,
                    uploadId, partETags);
            conn.completeMultipartUpload(compRequest);
        }

        log.info("Storage s3 api, upload chunk file end");
    }

    @Override
    public byte[] downloadFile(String fileName) {
        log.info("Storage s3 api, download file start");
        //儲存空間名
        String bucketName = s3ClientConfig.getBucketName();
        AmazonS3 conn = getAmazonClient();
        S3Object object;
        if (conn.doesObjectExist(bucketName, fileName)) {
            object = conn.getObject(bucketName, fileName);
        } else {
            throw FileCenterExceptionConstants.OBJECT_NOT_EXIST;
        }
        log.debug("Storage s3 api, get object result :{}", object);

        byte[] fileByte;
        InputStream inputStream;
        inputStream = object.getObjectContent();
        try {
            fileByte = IOUtils.toByteArray(inputStream);
            inputStream.close();
        } catch (IOException e) {
            throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    log.error(e.getMessage());
                }
            }
        }
        log.info("Storage s3 api, download file end");
        return fileByte;
    }

    @Override
    public byte[] downloadFile(String fileName, long fileOffset, long fileSize) {
        log.info("Storage s3 api, download file by block start");
        //儲存空間名
        String bucketName = s3ClientConfig.getBucketName();
        AmazonS3 conn = getAmazonClient();
        S3Object object;
        if (conn.doesObjectExist(bucketName, fileName)) {
            GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, fileName)
                    .withRange(fileOffset, fileOffset + fileSize);
            //范圍下載。
            object = conn.getObject(getObjectRequest);
        } else {
            throw FileCenterExceptionConstants.OBJECT_NOT_EXIST;
        }
        log.info("Storage s3 api, get object result :{}", object);

        // 讀取數據。
        byte[] buf;
        InputStream in = object.getObjectContent();
        try {
            buf = inputToByte(in, (int) fileSize);
        } catch (IOException e) {
            throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
        } finally {
            try {
                in.close();
            } catch (IOException e) {
                log.error(e.getMessage());
            }
        }
        log.info("Storage s3 api, download file by block end");
        return buf;
    }

    @Override
    public String fileSecret(String filePath) {
        return null;
    }

    @Override
    public String fileDecrypt(String filePath) {
        return null;
    }

    @Override
    public String getDomain() {
        return null;
    }


    /**
     * 檢查儲存空間是否已創建
     *
     * @param conn 客戶端連接
     */
    private void checkBucket(AmazonS3 conn) {
        //儲存空間名
        String bucketName = s3ClientConfig.getBucketName();
        if (conn.doesBucketExist(bucketName)) {
            log.debug("Storage s3 api, bucketName is found: " + bucketName);
        } else {
            log.warn("Storage s3 api, bucketName is not exist, create it: " + bucketName);
            conn.createBucket(bucketName);
        }
    }

    /**
     * inputStream轉byte[]
     *
     * @param inStream 輸入
     * @param fileSize 文件大小
     * @return 輸出
     * @throws IOException 異常
     */
    private static byte[] inputToByte(InputStream inStream, int fileSize) throws IOException {
        ByteArrayOutputStream swapStream = new ByteArrayOutputStream();
        byte[] buff = new byte[fileSize];
        int rc;
        while ((rc = inStream.read(buff, 0, fileSize)) > 0) {
            swapStream.write(buff, 0, rc);
        }
        return swapStream.toByteArray();
    }

    /**
     * 調試用的方法,可以在控制台看到io的數據
     *
     * @param input 輸入
     * @throws IOException 異常
    private static void displayTextInputStream(InputStream input) throws IOException {
        // Read the text input stream one line at a time and display each line.
        BufferedReader reader = new BufferedReader(new InputStreamReader(input));
        String line;
        while ((line = reader.readLine()) != null) {
            log.info(line);
        }
    }
     */
}

業務接口要實現包括分片上傳(支持斷點續傳)、分片下載等功能,上面類是底層類不包含業務邏輯。

maven依賴:

        <!-- ceph儲存的接口 -->
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk</artifactId>
            <version>1.11.433</version>
        </dependency>

開發感受:

  1.ceph官網上提供的s3接口文檔(java版),內容又少又舊,已經基本不能當做參考了。所以API和代碼示例要去亞馬遜官網上看(提供了中文版,好評)

  2.s3接口本身不提供文件追加儲存的功能。所以在實現分片上傳的時候,比較麻煩(不想fastDFS和OSS那么方便)

  3.分片上傳默認最小限制是5M,要修改可以在服務器配置上做

  4.如果使用域名做端點的話,默認會把bucket的名字,作為子域名來訪問(需要域名解析,所以不建議)。如果想作為路徑來訪問,需要在連接配置中指定。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM