跨云平台存储文件同步热备(aws s3 bucket文件自动同步到阿里oss和微软云blob)


功能实现: 实现跨云平台存储资源自动热备,aws存储s3文件备份到阿里云oss和微软云blob

环境:

  1.  配置aws sqs队列权限,允许指定的s3桶全部权限
  2. 配置aws s3的“事件”,当有“所有对象创建事件”发送通知消息到sqs队列,s3桶需具有公网访问权限
  3. 创建aws秘钥(具有s3桶上传权限和sqs队列消息下载权限)
  4. 创建阿里云oss对象存储和bucket,配置上传秘钥
  5. 创建微软云存储对象和blob、container,配置上传秘钥

 

配置文件 application.yaml:

aws:
  sqs:
  queue.sqsurl: XXXXXXXXXXXXXXXXXXXXXXXXXX
  accessKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
  securityKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
  region: xxxxxxx

s3:
  bucket: XXXXXXXXXXXXXXXXXXXXXXXXXX
  accessKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
  securityKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
  region: xxxxxxx

azure:
  account: XXXXXXXXXXXXXXXXXXXXXXXXXX
  blobContainer: XXXXXXXXXXXXXXXXXXXXXXXXXX
  accountKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
  endpointSuffix: XXXXXXXXXXXXXXXXXXXXXXXXXX

aliyun:
  ossdomain: XXXXXXXXXXXXXXXXXXXXXXXXXX
  ossbucket: XXXXXXXXXXXXXXXXXXXXXXXXXX
  ramaccessKeyID: XXXXXXXXXXXXXXXXXXXXXXXXXX
  ramaccessKeySecret: XXXXXXXXXXXXXXXXXXXXXXXXXX

logging:
  config: config/log4j.yaml

 

 方式一(单进程):

# encoding=utf-8

import os
import sys
import argparse
import boto3
import json
import yaml
import Queue
import logging
import logging.config
import urllib
import oss2
import time
import datetime
from azure.storage.blob import BlockBlobService



def parse_config_file(config_file):
    local_path = config_file
    print ("LOCAL PATH: " + local_path)
    with open(local_path) as stream:
        config = json.dumps(yaml.load(stream, Loader=yaml.FullLoader))
    return config

class Objhandle(object):
    def __init__(self, str):
        config = json.loads(str)
        self.sqs_accessKey = config['aws']['sqs']['accessKey']
        self.sqs_securityKey = config['aws']['sqs']['securityKey']
        self.sqs_queue = config['aws']['sqs']['queue.sqsurl']
        self.sqs_region = config['aws']['sqs']['region']
        self.s3_bucket = config['aws']['s3']['bucket']
        self.s3_accessKey = config['aws']['s3']['accessKey']
        self.s3_securityKey = config['aws']['s3']['securityKey']
        self.s3_region = config['aws']['s3']['region']
        self.oss_domain = config['aliyun']['ossdomain']
        self.oss_bucket = config['aliyun']['ossbucket']
        self.oss_accessKey = config['aliyun']['ramaccessKeyID']
        self.oss_accessSecret = config['aliyun']['ramaccessKeySecret']
        self.azure_account = config['azure']['account']
        self.azure_container = config['azure']['blobContainer']
        self.azure_accountKey = config['azure']['accountKey']
        self.azure_endpointSuffix = config['azure']['endpointSuffix']
        self.log_config = config['logging']['config']
        self.sqs_client = boto3.client('sqs', self.sqs_region, aws_access_key_id=self.sqs_accessKey ,aws_secret_access_key=self.sqs_securityKey)
        self.s3_client = boto3.client('s3', self.s3_region, aws_access_key_id=self.s3_accessKey, aws_secret_access_key=self.s3_securityKey)
        self.oss_auth = oss2.Auth(self.oss_accessKey, self.oss_accessSecret)
        self.oss_service = oss2.Bucket(self.oss_auth, self.oss_domain, self.oss_bucket)
        self.blob_service = BlockBlobService(account_name=self.azure_account, account_key= self.azure_accountKey, endpoint_suffix=self.azure_endpointSuffix)

    def _queue_handle(self):
        sqs_response = self.sqs_client.receive_message(QueueUrl=self.sqs_queue, AttributeNames=['SentTimestamp'], MaxNumberOfMessages=10, MessageAttributeNames=['All'], VisibilityTimeout=15, WaitTimeSeconds=5)

        if sqs_response['Messages']:
            msg = []
            num = len(sqs_response['Messages'])
            for i in range(num):
                message = sqs_response['Messages'][i]
                receipt_handle = message['ReceiptHandle']
                body = json.loads(message['Body'])
                key = urllib.unquote(body['Records'][0]['s3']['object']['key']).encode("utf-8")
                if not key.endswith('/'):
                    key_path = os.path.split(key)[0].encode("utf-8")

                    # begin download files
                    local_path = '/export/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key_path)
                    local_file = '/export/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key)
                    if not os.path.exists(local_path):
                        os.makedirs(local_path)
                    try:
                        self.s3_client.download_file(self.s3_bucket, key, local_file)
                    except Exception as e:
                        logging.error(e)
                    else:
                        msg_tuple = (key, local_file, receipt_handle)
                        msg.append(msg_tuple)
        return msg


    def upload_handle(self):
        msg = self._queue_handle()
        for i in range(len(msg)):
            key, local_file, receipt_handle = msg[i]
            self.blob_service.create_blob_from_path(self.azure_container, key, local_file, max_connections=4)
            oss2.resumable_upload(self.oss_service, key, local_file,
                                  store=oss2.ResumableStore(root='/tmp/temp'),
                                  multipart_threshold=10 * 1024 * 1024,
                                  part_size=10 * 1024 * 1024,
                                  num_threads=4)


if __name__ == '__main__':
    config = parse_config_file('config/application.yaml')
    obj = Objhandle(config)
    msg = obj.upload_handle() 
View Code

 

 

方式二(多进程):

# encoding=utf-8

import os
import sys
import argparse
import boto3
import json
import yaml
import Queue
import logging
import logging.config
import urllib
import oss2
import time
import datetime
from azure.storage.blob import BlockBlobService
from multiprocessing import Process


# set up logging 
def setup_logging(default_path = "config/logging.yaml",default_level = logging.INFO):
    path = default_path
    if os.path.exists(path):
        with open(path,"r") as f:
            config = yaml.load(f)
            logging.config.dictConfig(config)
    else:
        logging.basicConfig(level = default_level)


def parse_config_file(config_file):
    local_path = config_file
    print ("LOCAL PATH: " + local_path)
    with open(local_path) as stream:
        config = json.dumps(yaml.load(stream))
    return config

class Objhandle(Process):
    def __init__(self, str):
        super(Objhandle, self).__init__()
        config = json.loads(str)
        self.sqs_accessKey = config['aws']['sqs']['accessKey']
        self.sqs_securityKey = config['aws']['sqs']['securityKey']
        self.sqs_queue = config['aws']['sqs']['queue.sqsurl']
        self.sqs_region = config['aws']['sqs']['region']
        self.s3_bucket = config['aws']['s3']['bucket']
        self.s3_accessKey = config['aws']['s3']['accessKey']
        self.s3_securityKey = config['aws']['s3']['securityKey']
        self.s3_region = config['aws']['s3']['region']
        self.oss_domain = config['aliyun']['ossdomain']
        self.oss_bucket = config['aliyun']['ossbucket']
        self.oss_accessKey = config['aliyun']['ramaccessKeyID']
        self.oss_accessSecret = config['aliyun']['ramaccessKeySecret']
        self.azure_account = config['azure']['account']
        self.azure_container = config['azure']['blobContainer']
        self.azure_accountKey = config['azure']['accountKey']
        self.azure_endpointSuffix = config['azure']['endpointSuffix']
        self.log_config = config['logging']['config']
        self.sqs_client = boto3.client('sqs', self.sqs_region, aws_access_key_id=self.sqs_accessKey ,aws_secret_access_key=self.sqs_securityKey)
        self.s3_client = boto3.client('s3', self.s3_region, aws_access_key_id=self.s3_accessKey, aws_secret_access_key=self.s3_securityKey)
        self.oss_auth = oss2.Auth(self.oss_accessKey, self.oss_accessSecret)
        self.oss_service = oss2.Bucket(self.oss_auth, self.oss_domain, self.oss_bucket)
        self.blob_service = BlockBlobService(account_name=self.azure_account, account_key= self.azure_accountKey, endpoint_suffix=self.azure_endpointSuffix)


    def _queue_handle(self):
        msg = []
        sqs_response = self.sqs_client.receive_message(QueueUrl=self.sqs_queue, AttributeNames=['SentTimestamp'], MaxNumberOfMessages=10, MessageAttributeNames=['All'], VisibilityTimeout=15, WaitTimeSeconds=5)

        if 'Messages' in sqs_response.keys():
            num = len(sqs_response['Messages'])
            for i in range(num):
                message = sqs_response['Messages'][i]
                receipt_handle = message['ReceiptHandle']
                body = json.loads(message['Body'])
                action = body['Records'][0]['eventName']
                key = urllib.unquote(body['Records'][0]['s3']['object']['key']).encode("utf-8")
                if action == "ObjectCreated:Put" and not key.endswith('/'):
                    key_path = os.path.split(key)[0].encode("utf-8")

                    # begin download files
                    local_path = '/export/temp_file/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key_path)
                    local_file = '/export/temp_file/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key)
                    if not os.path.exists(local_path):
                        os.makedirs(local_path)
                    try:
                        self.s3_client.download_file(self.s3_bucket, key, local_file)
                    except Exception as e:
                        logging.error("Download object error %s", e)
                    else:
                        msg_tuple = (key, local_file, receipt_handle)
                        msg.append(msg_tuple)
                else:
                    self.sqs_client.delete_message(QueueUrl=self.sqs_queue, ReceiptHandle=receipt_handle)
        else:
            time.sleep(20)
            logger.info('Queue messages handle')

        return msg


    def run(self):
        while True:
            msg = self._queue_handle()
    
            for i in range(len(msg)):
                key, local_file, receipt_handle = msg[i]
                try:
                    self.blob_service.create_blob_from_path(self.azure_container, key, local_file, max_connections=4)
                    self.oss_service.put_object_from_file(key, local_file)
                except Exception as e:
                    logging.error("upload error %s", e)
                else:
                    logger.info("upload successed %s" , key)
                    self.sqs_client.delete_message(QueueUrl=self.sqs_queue, ReceiptHandle=receipt_handle)
                    if os.path.exists(local_file):
                        os.remove(local_file)


if __name__ == '__main__':

    config = parse_config_file('config/application.yaml')
    
    # set logging from logfile
    if os.path.exists('config/log4j.yaml'):
        setup_logging(default_path = "config/log4j.yaml")
        logger = logging.getLogger(__name__)

    for i in range(4):
        p = Objhandle(config)
        p.start()
View Code

 

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM