功能实现: 实现跨云平台存储资源自动热备,aws存储s3文件备份到阿里云oss和微软云blob
环境:
- 配置aws sqs队列权限,允许指定的s3桶全部权限
- 配置aws s3的“事件”,当有“所有对象创建事件”发送通知消息到sqs队列,s3桶需具有公网访问权限
- 创建aws秘钥(具有s3桶上传权限和sqs队列消息下载权限)
- 创建阿里云oss对象存储和bucket,配置上传秘钥
- 创建微软云存储对象和blob、container,配置上传秘钥
配置文件 application.yaml:
aws:
sqs:
queue.sqsurl: XXXXXXXXXXXXXXXXXXXXXXXXXX
accessKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
securityKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
region: xxxxxxx
s3:
bucket: XXXXXXXXXXXXXXXXXXXXXXXXXX
accessKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
securityKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
region: xxxxxxx
azure:
account: XXXXXXXXXXXXXXXXXXXXXXXXXX
blobContainer: XXXXXXXXXXXXXXXXXXXXXXXXXX
accountKey: XXXXXXXXXXXXXXXXXXXXXXXXXX
endpointSuffix: XXXXXXXXXXXXXXXXXXXXXXXXXX
aliyun:
ossdomain: XXXXXXXXXXXXXXXXXXXXXXXXXX
ossbucket: XXXXXXXXXXXXXXXXXXXXXXXXXX
ramaccessKeyID: XXXXXXXXXXXXXXXXXXXXXXXXXX
ramaccessKeySecret: XXXXXXXXXXXXXXXXXXXXXXXXXX
logging:
config: config/log4j.yaml
方式一(单进程):

# encoding=utf-8 import os import sys import argparse import boto3 import json import yaml import Queue import logging import logging.config import urllib import oss2 import time import datetime from azure.storage.blob import BlockBlobService def parse_config_file(config_file): local_path = config_file print ("LOCAL PATH: " + local_path) with open(local_path) as stream: config = json.dumps(yaml.load(stream, Loader=yaml.FullLoader)) return config class Objhandle(object): def __init__(self, str): config = json.loads(str) self.sqs_accessKey = config['aws']['sqs']['accessKey'] self.sqs_securityKey = config['aws']['sqs']['securityKey'] self.sqs_queue = config['aws']['sqs']['queue.sqsurl'] self.sqs_region = config['aws']['sqs']['region'] self.s3_bucket = config['aws']['s3']['bucket'] self.s3_accessKey = config['aws']['s3']['accessKey'] self.s3_securityKey = config['aws']['s3']['securityKey'] self.s3_region = config['aws']['s3']['region'] self.oss_domain = config['aliyun']['ossdomain'] self.oss_bucket = config['aliyun']['ossbucket'] self.oss_accessKey = config['aliyun']['ramaccessKeyID'] self.oss_accessSecret = config['aliyun']['ramaccessKeySecret'] self.azure_account = config['azure']['account'] self.azure_container = config['azure']['blobContainer'] self.azure_accountKey = config['azure']['accountKey'] self.azure_endpointSuffix = config['azure']['endpointSuffix'] self.log_config = config['logging']['config'] self.sqs_client = boto3.client('sqs', self.sqs_region, aws_access_key_id=self.sqs_accessKey ,aws_secret_access_key=self.sqs_securityKey) self.s3_client = boto3.client('s3', self.s3_region, aws_access_key_id=self.s3_accessKey, aws_secret_access_key=self.s3_securityKey) self.oss_auth = oss2.Auth(self.oss_accessKey, self.oss_accessSecret) self.oss_service = oss2.Bucket(self.oss_auth, self.oss_domain, self.oss_bucket) self.blob_service = BlockBlobService(account_name=self.azure_account, account_key= self.azure_accountKey, endpoint_suffix=self.azure_endpointSuffix) def _queue_handle(self): sqs_response = self.sqs_client.receive_message(QueueUrl=self.sqs_queue, AttributeNames=['SentTimestamp'], MaxNumberOfMessages=10, MessageAttributeNames=['All'], VisibilityTimeout=15, WaitTimeSeconds=5) if sqs_response['Messages']: msg = [] num = len(sqs_response['Messages']) for i in range(num): message = sqs_response['Messages'][i] receipt_handle = message['ReceiptHandle'] body = json.loads(message['Body']) key = urllib.unquote(body['Records'][0]['s3']['object']['key']).encode("utf-8") if not key.endswith('/'): key_path = os.path.split(key)[0].encode("utf-8") # begin download files local_path = '/export/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key_path) local_file = '/export/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key) if not os.path.exists(local_path): os.makedirs(local_path) try: self.s3_client.download_file(self.s3_bucket, key, local_file) except Exception as e: logging.error(e) else: msg_tuple = (key, local_file, receipt_handle) msg.append(msg_tuple) return msg def upload_handle(self): msg = self._queue_handle() for i in range(len(msg)): key, local_file, receipt_handle = msg[i] self.blob_service.create_blob_from_path(self.azure_container, key, local_file, max_connections=4) oss2.resumable_upload(self.oss_service, key, local_file, store=oss2.ResumableStore(root='/tmp/temp'), multipart_threshold=10 * 1024 * 1024, part_size=10 * 1024 * 1024, num_threads=4) if __name__ == '__main__': config = parse_config_file('config/application.yaml') obj = Objhandle(config) msg = obj.upload_handle()
方式二(多进程):

# encoding=utf-8 import os import sys import argparse import boto3 import json import yaml import Queue import logging import logging.config import urllib import oss2 import time import datetime from azure.storage.blob import BlockBlobService from multiprocessing import Process # set up logging def setup_logging(default_path = "config/logging.yaml",default_level = logging.INFO): path = default_path if os.path.exists(path): with open(path,"r") as f: config = yaml.load(f) logging.config.dictConfig(config) else: logging.basicConfig(level = default_level) def parse_config_file(config_file): local_path = config_file print ("LOCAL PATH: " + local_path) with open(local_path) as stream: config = json.dumps(yaml.load(stream)) return config class Objhandle(Process): def __init__(self, str): super(Objhandle, self).__init__() config = json.loads(str) self.sqs_accessKey = config['aws']['sqs']['accessKey'] self.sqs_securityKey = config['aws']['sqs']['securityKey'] self.sqs_queue = config['aws']['sqs']['queue.sqsurl'] self.sqs_region = config['aws']['sqs']['region'] self.s3_bucket = config['aws']['s3']['bucket'] self.s3_accessKey = config['aws']['s3']['accessKey'] self.s3_securityKey = config['aws']['s3']['securityKey'] self.s3_region = config['aws']['s3']['region'] self.oss_domain = config['aliyun']['ossdomain'] self.oss_bucket = config['aliyun']['ossbucket'] self.oss_accessKey = config['aliyun']['ramaccessKeyID'] self.oss_accessSecret = config['aliyun']['ramaccessKeySecret'] self.azure_account = config['azure']['account'] self.azure_container = config['azure']['blobContainer'] self.azure_accountKey = config['azure']['accountKey'] self.azure_endpointSuffix = config['azure']['endpointSuffix'] self.log_config = config['logging']['config'] self.sqs_client = boto3.client('sqs', self.sqs_region, aws_access_key_id=self.sqs_accessKey ,aws_secret_access_key=self.sqs_securityKey) self.s3_client = boto3.client('s3', self.s3_region, aws_access_key_id=self.s3_accessKey, aws_secret_access_key=self.s3_securityKey) self.oss_auth = oss2.Auth(self.oss_accessKey, self.oss_accessSecret) self.oss_service = oss2.Bucket(self.oss_auth, self.oss_domain, self.oss_bucket) self.blob_service = BlockBlobService(account_name=self.azure_account, account_key= self.azure_accountKey, endpoint_suffix=self.azure_endpointSuffix) def _queue_handle(self): msg = [] sqs_response = self.sqs_client.receive_message(QueueUrl=self.sqs_queue, AttributeNames=['SentTimestamp'], MaxNumberOfMessages=10, MessageAttributeNames=['All'], VisibilityTimeout=15, WaitTimeSeconds=5) if 'Messages' in sqs_response.keys(): num = len(sqs_response['Messages']) for i in range(num): message = sqs_response['Messages'][i] receipt_handle = message['ReceiptHandle'] body = json.loads(message['Body']) action = body['Records'][0]['eventName'] key = urllib.unquote(body['Records'][0]['s3']['object']['key']).encode("utf-8") if action == "ObjectCreated:Put" and not key.endswith('/'): key_path = os.path.split(key)[0].encode("utf-8") # begin download files local_path = '/export/temp_file/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key_path) local_file = '/export/temp_file/{}/{}'.format(datetime.datetime.now().strftime("%Y-%m-%d"), key) if not os.path.exists(local_path): os.makedirs(local_path) try: self.s3_client.download_file(self.s3_bucket, key, local_file) except Exception as e: logging.error("Download object error %s", e) else: msg_tuple = (key, local_file, receipt_handle) msg.append(msg_tuple) else: self.sqs_client.delete_message(QueueUrl=self.sqs_queue, ReceiptHandle=receipt_handle) else: time.sleep(20) logger.info('Queue messages handle') return msg def run(self): while True: msg = self._queue_handle() for i in range(len(msg)): key, local_file, receipt_handle = msg[i] try: self.blob_service.create_blob_from_path(self.azure_container, key, local_file, max_connections=4) self.oss_service.put_object_from_file(key, local_file) except Exception as e: logging.error("upload error %s", e) else: logger.info("upload successed %s" , key) self.sqs_client.delete_message(QueueUrl=self.sqs_queue, ReceiptHandle=receipt_handle) if os.path.exists(local_file): os.remove(local_file) if __name__ == '__main__': config = parse_config_file('config/application.yaml') # set logging from logfile if os.path.exists('config/log4j.yaml'): setup_logging(default_path = "config/log4j.yaml") logger = logging.getLogger(__name__) for i in range(4): p = Objhandle(config) p.start()