阿里雲OSS文件上傳下載與文件刪除及檢索示例
實踐環境
運行環境:
Python 3.5.4
CentOS Linux release 7.4.1708 (Core)/Win10
需要安裝以下類庫:
pip3 install setuptools_rust1.1.2
pip3 install Crypto1.4.1 # Win10下,安裝后,需要更改 site-packages下crypto包名稱為Crypto
pip3 install cryptography3.3.2 # 注意,如果不指定版本,安裝oss2時會報錯:error: can't find Rust compiler
pip3 install oss22.15.0
上傳本地文件到阿里雲OSS示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import traceback
import os
# 批量上傳文件到OSS
def upload_files(bucket, target_dir_path, exclusion_list=[]):
oss_objects_path = []
target_dir_path = os.path.normpath(target_dir_path).replace('\\', '/')
for root, dirs, files in os.walk(target_dir_path):
for file in files:
target_file_path = os.path.normpath(os.path.join(root, file))
target_file_relative_path = target_file_path.replace('\\', '/').replace(target_dir_path, '').lstrip('/')
if target_file_relative_path in exclusion_list:
continue
object_path = 'f2b/artifacts/web-admin-react/%s' % target_file_relative_path
upload_file(bucket, target_file_path, object_path)
oss_objects_path.append(object_path)
return oss_objects_path
# 上傳文件到OSS
def upload_file(bucket, target_file_path, object_path):
with open(target_file_path, 'rb') as fileobj:
res = bucket.put_object(object_path, fileobj) # object_path為Object的完整路徑,路徑中不能包含Bucket名稱。
if res.status != 200:
raise Exception('upload %s error,status:%s' % (target_file_path, res.status))
if __name__ == '__main__':
try:
import oss2
auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret')
# oss2.Bucket(auth, endpoint, bucket_name)
# endpoint填寫Bucket所在地域對應的endpoint,bucket_name為Bucket名稱。以華東1(杭州)為例,填寫為https://oss-cn-hangzhou.aliyuncs.com。
bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket')
oss_objects_path = [] # 存放上傳成功文件對應的OSS對象相對路徑
target_path = 'D:\\artifact-eb34ea94.tar.gz'
if not os.path.exists(target_path):
print('success:false,待上傳路徑(%s)不存在' % target_path)
exit(0)
if os.path.isdir(target_path): # 如果為目錄
oss_objects_path = upload_files(bucket, target_path)
else:
object_path = 'f2b/artifacts/web-admin-react/artifact-eb34ea94.tar.gz'
upload_file(bucket, target_path, object_path)
oss_objects_path.append(object_path)
print(','.join(oss_objects_path))
except Exception:
print('success:false,%s' % traceback.format_exc())
參考連接:
https://help.aliyun.com/document_detail/88426.htm?spm=a2c4g.11186623.0.0.9e7e7dbbsOWOh6#t22317.html
https://help.aliyun.com/document_detail/31848.html
下載阿里雲OSS文件對象到本地文件示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import traceback
if __name__ == '__main__':
try:
import oss2
auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret')
# oss2.Bucket(auth, endpoint, bucket_name)
# endpoint填寫Bucket所在地域對應的endpoint,bucket_name為Bucket名稱。以華東1(杭州)為例,填寫為https://oss-cn-hangzhou.aliyuncs.com。
bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket')
target_file_local_path = 'D:\\artifacts-17a86f.tar.gz' # 本地文件路徑
oss_object_path = 'f2b/artifacts/cloud-f2b-web-admin-react/artifact-eb34ea94.tar.gz'
# bucket.get_object_to_file('object_path', 'object_local_path')
# object_path 填寫Object完整路徑,完整路徑中不包含Bucket名稱,例如testfolder/exampleobject.txt。
# object_local_path 下載的Object在本地存儲的文件路徑,形如 D:\\localpath\\examplefile.txt。如果指定路徑的文件存在會覆蓋,不存在則新建。
try:
res = bucket.get_object_to_file(oss_object_path, target_file_local_path)
if res.status != 200:
print('success:false,download fail, unknow exception, status:%s' % res.status)
except Exception:
print('success:false,%s' % traceback.format_exc())
except Exception:
print('success:false,%s' % traceback.format_exc())
參考連接:
https://help.aliyun.com/document_detail/88442.html
列舉指定前綴的所有文件
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import traceback
if __name__ == '__main__':
try:
import oss2
auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret')
bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket')
result_file_list = []
for obj in oss2.ObjectIteratorV2(bucket, prefix='f2b/www/alpha/f2b/icec-cloud-f2b-mobile'):
result_file_list.append(obj.key)
print(obj.key)
print(','.join(result_file_list))
except Exception:
print('success:false,%s' % traceback.format_exc())
參考連接:
https://help.aliyun.com/document_detail/88458.html
批量刪除OSS對象
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import traceback
if __name__ == '__main__':
try:
import oss2
auth = oss2.Auth('ossAccessKeyId', 'ossAccessKeySecret')
bucket = oss2.Bucket(auth, 'https://oss-cn-shenzhen.aliyuncs.com', 'exampleBucket')
oss_object_path_list = ''.join(sys.argv[1:2]).split(',')
index = 0
oss_objects_to_delete = oss_object_path_list[index: index+1000] # API限制,每次最多刪除1000個文件
while oss_objects_to_delete:
result = bucket.batch_delete_objects(oss_object_path_list[index: index+1000])
# 打印成功刪除的文件名。
print(result.deleted_keys)
print('批量刪除以下OSS對象成功')
print(''.join(result.deleted_keys))
index += 1000
oss_objects_to_delete = oss_object_path_list[index: index+1000]
except Exception:
print('success:false,%s' % traceback.format_exc())
參考連接: