跨雲平台存儲文件同步熱備(aws s3 bucket文件自動同步到阿里oss和微軟雲blob)


功能實現: 實現跨雲平台存儲資源自動熱備,aws存儲s3文件備份到阿里雲oss和微軟雲blob

環境:

  1.  配置aws sqs隊列權限,允許指定的s3桶全部權限
  2. 配置aws s3的“事件”,當有“所有對象創建事件”發送通知消息到sqs隊列,s3桶需具有公網訪問權限
  3. 創建aws秘鑰(具有s3桶上傳權限和sqs隊列消息下載權限)
  4. 創建阿里雲oss對象存儲和bucket,配置上傳秘鑰
  5. 創建微軟雲存儲對象和blob、container,配置上傳秘鑰

 

配置文件 application.yaml:

aws:
  sqs:
  queue.sqsurl: XXXXXXXXXXXXXXXXXXXXXXXXXX
  accessKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
  securityKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
  region: xxxxxxx

s3:
  bucket: XXXXXXXXXXXXXXXXXXXXXXXXXX
  accessKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
  securityKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
  region: xxxxxxx

azure:
  account: XXXXXXXXXXXXXXXXXXXXXXXXXX
  blobContainer: XXXXXXXXXXXXXXXXXXXXXXXXXX
  accountKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
  endpointSuffix: XXXXXXXXXXXXXXXXXXXXXXXXXX

aliyun:
  ossdomain: XXXXXXXXXXXXXXXXXXXXXXXXXX
  ossbucket: XXXXXXXXXXXXXXXXXXXXXXXXXX
  ramaccessKeyID: XXXXXXXXXXXXXXXXXXXXXXXXXX
  ramaccessKeySecret: XXXXXXXXXXXXXXXXXXXXXXXXXX

logging:
  config: config/log4j.yaml

 

 方式一(單進程):

# encoding=utf-8

import os
import sys
import argparse
import boto3
import json
import yaml
import Queue
import logging
import logging.config
import urllib
import oss2
import time
import datetime
from azure.storage.blob import BlockBlobService



def parse_config_file(config_file):
    local_path = config_file
    print ("LOCAL PATH: " + local_path)
    with open(local_path) as stream:
        config = json.dumps(yaml.load(stream, Loader=yaml.FullLoader))
    return config

class Objhandle(object):
    def __init__(self, str):
        config = json.loads(str)
        self.sqs_accessKey = config['aws']['sqs']['accessKey']
        self.sqs_securityKey = config['aws']['sqs']['securityKey']
        self.sqs_queue = config['aws']['sqs']['queue.sqsurl']
        self.sqs_region = config['aws']['sqs']['region']
        self.s3_bucket = config['aws']['s3']['bucket']
        self.s3_accessKey = config['aws']['s3']['accessKey']
        self.s3_securityKey = config['aws']['s3']['securityKey']
        self.s3_region = config['aws']['s3']['region']
        self.oss_domain = config['aliyun']['ossdomain']
        self.oss_bucket = config['aliyun']['ossbucket']
        self.oss_accessKey = config['aliyun']['ramaccessKeyID']
        self.oss_accessSecret = config['aliyun']['ramaccessKeySecret']
        self.azure_account = config['azure']['account']
        self.azure_container = config['azure']['blobContainer']
        self.azure_accountKey = config['azure']['accountKey']
        self.azure_endpointSuffix = config['azure']['endpointSuffix']
        self.log_config = config['logging']['config']
        self.sqs_client = boto3.client('sqs', self.sqs_region, aws_access_key_id=self.sqs_accessKey ,aws_secret_access_key=self.sqs_securityKey)
        self.s3_client = boto3.client('s3', self.s3_region, aws_access_key_id=self.s3_accessKey, aws_secret_access_key=self.s3_securityKey)
        self.oss_auth = oss2.Auth(self.oss_accessKey, self.oss_accessSecret)
        self.oss_service = oss2.Bucket(self.oss_auth, self.oss_domain, self.oss_bucket)
        self.blob_service = BlockBlobService(account_name=self.azure_account, account_key= self.azure_accountKey, endpoint_suffix=self.azure_endpointSuffix)

    def _queue_handle(self):
        sqs_response = self.sqs_client.receive_message(QueueUrl=self.sqs_queue, AttributeNames=['SentTimestamp'], MaxNumberOfMessages=10, MessageAttributeNames=['All'], VisibilityTimeout=15, WaitTimeSeconds=5)

        if sqs_response['Messages']:
            msg = []
            num = len(sqs_response['Messages'])
            for i in range(num):
                message = sqs_response['Messages'][i]
                receipt_handle = message['ReceiptHandle']
                body = json.loads(message['Body'])
                key = urllib.unquote(body['Records'][0]['s3']['object']['key']).encode("utf-8")
                if not key.endswith('/'):
                    key_path = os.path.split(key)[0].encode("utf-8")

                    # begin download files
                    local_path = '/export/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key_path)
                    local_file = '/export/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key)
                    if not os.path.exists(local_path):
                        os.makedirs(local_path)
                    try:
                        self.s3_client.download_file(self.s3_bucket, key, local_file)
                    except Exception as e:
                        logging.error(e)
                    else:
                        msg_tuple = (key, local_file, receipt_handle)
                        msg.append(msg_tuple)
        return msg


    def upload_handle(self):
        msg = self._queue_handle()
        for i in range(len(msg)):
            key, local_file, receipt_handle = msg[i]
            self.blob_service.create_blob_from_path(self.azure_container, key, local_file, max_connections=4)
            oss2.resumable_upload(self.oss_service, key, local_file,
                                  store=oss2.ResumableStore(root='/tmp/temp'),
                                  multipart_threshold=10 * 1024 * 1024,
                                  part_size=10 * 1024 * 1024,
                                  num_threads=4)


if __name__ == '__main__':
    config = parse_config_file('config/application.yaml')
    obj = Objhandle(config)
    msg = obj.upload_handle() 
View Code

 

 

方式二(多進程):

# encoding=utf-8

import os
import sys
import argparse
import boto3
import json
import yaml
import Queue
import logging
import logging.config
import urllib
import oss2
import time
import datetime
from azure.storage.blob import BlockBlobService
from multiprocessing import Process


# set up logging 
def setup_logging(default_path = "config/logging.yaml",default_level = logging.INFO):
    path = default_path
    if os.path.exists(path):
        with open(path,"r") as f:
            config = yaml.load(f)
            logging.config.dictConfig(config)
    else:
        logging.basicConfig(level = default_level)


def parse_config_file(config_file):
    local_path = config_file
    print ("LOCAL PATH: " + local_path)
    with open(local_path) as stream:
        config = json.dumps(yaml.load(stream))
    return config

class Objhandle(Process):
    def __init__(self, str):
        super(Objhandle, self).__init__()
        config = json.loads(str)
        self.sqs_accessKey = config['aws']['sqs']['accessKey']
        self.sqs_securityKey = config['aws']['sqs']['securityKey']
        self.sqs_queue = config['aws']['sqs']['queue.sqsurl']
        self.sqs_region = config['aws']['sqs']['region']
        self.s3_bucket = config['aws']['s3']['bucket']
        self.s3_accessKey = config['aws']['s3']['accessKey']
        self.s3_securityKey = config['aws']['s3']['securityKey']
        self.s3_region = config['aws']['s3']['region']
        self.oss_domain = config['aliyun']['ossdomain']
        self.oss_bucket = config['aliyun']['ossbucket']
        self.oss_accessKey = config['aliyun']['ramaccessKeyID']
        self.oss_accessSecret = config['aliyun']['ramaccessKeySecret']
        self.azure_account = config['azure']['account']
        self.azure_container = config['azure']['blobContainer']
        self.azure_accountKey = config['azure']['accountKey']
        self.azure_endpointSuffix = config['azure']['endpointSuffix']
        self.log_config = config['logging']['config']
        self.sqs_client = boto3.client('sqs', self.sqs_region, aws_access_key_id=self.sqs_accessKey ,aws_secret_access_key=self.sqs_securityKey)
        self.s3_client = boto3.client('s3', self.s3_region, aws_access_key_id=self.s3_accessKey, aws_secret_access_key=self.s3_securityKey)
        self.oss_auth = oss2.Auth(self.oss_accessKey, self.oss_accessSecret)
        self.oss_service = oss2.Bucket(self.oss_auth, self.oss_domain, self.oss_bucket)
        self.blob_service = BlockBlobService(account_name=self.azure_account, account_key= self.azure_accountKey, endpoint_suffix=self.azure_endpointSuffix)


    def _queue_handle(self):
        msg = []
        sqs_response = self.sqs_client.receive_message(QueueUrl=self.sqs_queue, AttributeNames=['SentTimestamp'], MaxNumberOfMessages=10, MessageAttributeNames=['All'], VisibilityTimeout=15, WaitTimeSeconds=5)

        if 'Messages' in sqs_response.keys():
            num = len(sqs_response['Messages'])
            for i in range(num):
                message = sqs_response['Messages'][i]
                receipt_handle = message['ReceiptHandle']
                body = json.loads(message['Body'])
                action = body['Records'][0]['eventName']
                key = urllib.unquote(body['Records'][0]['s3']['object']['key']).encode("utf-8")
                if action == "ObjectCreated:Put" and not key.endswith('/'):
                    key_path = os.path.split(key)[0].encode("utf-8")

                    # begin download files
                    local_path = '/export/temp_file/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key_path)
                    local_file = '/export/temp_file/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key)
                    if not os.path.exists(local_path):
                        os.makedirs(local_path)
                    try:
                        self.s3_client.download_file(self.s3_bucket, key, local_file)
                    except Exception as e:
                        logging.error("Download object error %s", e)
                    else:
                        msg_tuple = (key, local_file, receipt_handle)
                        msg.append(msg_tuple)
                else:
                    self.sqs_client.delete_message(QueueUrl=self.sqs_queue, ReceiptHandle=receipt_handle)
        else:
            time.sleep(20)
            logger.info('Queue messages handle')

        return msg


    def run(self):
        while True:
            msg = self._queue_handle()
    
            for i in range(len(msg)):
                key, local_file, receipt_handle = msg[i]
                try:
                    self.blob_service.create_blob_from_path(self.azure_container, key, local_file, max_connections=4)
                    self.oss_service.put_object_from_file(key, local_file)
                except Exception as e:
                    logging.error("upload error %s", e)
                else:
                    logger.info("upload successed %s" , key)
                    self.sqs_client.delete_message(QueueUrl=self.sqs_queue, ReceiptHandle=receipt_handle)
                    if os.path.exists(local_file):
                        os.remove(local_file)


if __name__ == '__main__':

    config = parse_config_file('config/application.yaml')
    
    # set logging from logfile
    if os.path.exists('config/log4j.yaml'):
        setup_logging(default_path = "config/log4j.yaml")
        logger = logging.getLogger(__name__)

    for i in range(4):
        p = Objhandle(config)
        p.start()
View Code

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM