Python使用阿里雲OSS服務
前言:
在遠程搭建了一個平台,通過改遠程平台進行數據的采集,需要將數據內容傳送至本地進行處理;為了實現該功能,考慮了阿里雲的OSS對象儲存的服務。
40G包月只需1元:-)
甚至還有客服致電給你,說有問題可直接通過電話聯系對方,15星好評
OSS安裝
關於賬號注冊,開通服務等等功能直接去阿里雲的官方進行相應操作即可
-
安裝
python-devel
- win:此過程不需要,在安裝Python時已經包含了;
- Debian/Ubuntu:
apt-get install python-dev
-
安裝OSS:
pip3 install oss2 -i https://pypi.tuna.tsinghua.edu.cn/simple/
- 安裝完成后可用python進行驗證:
In [1]: import oss2 In [2]: oss2.__version__ Out[2]: '2.8.0'
OSS使用
創建存儲空間:
-
方式一:直接去網站頁面進行創建即可;
-
方式二:通過代碼創建
import oss2 auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret') # Endpoint以杭州為例,其它Region請按實際情況填寫。yourBucketName就是你要創建的Bucket bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'yourBucketName') bucket.create_bucket(oss2.models.BUCKET_ACL_PRIVATE)
yourBucketName
為你的Bucket名稱,若已存在則會報錯;yourAccessKeySecret
,別告訴別哈:)
連接OSS:
auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret')
bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'yourBucketName')
上傳文件:
# 此處上傳了圖片文件
image_path = './test.jpg'
put_result = bucket.put_object_from_file('test.jpg', image_path)
if put_result.status == 200:
# 若此時的status狀態為200,則說明上傳成功;在你選擇的Bucket中已經存在文件名為'test.jpg'的文件了
print('put success')
下載文件:
# param1:oss上bucket中的文件名
# param2:保存在當地的文件路徑+文件名
get_result = bucket.get_object_to_file('test01.jpg', './get/test.jpg')
if get_result == 200:
print("get sucess")
整體代碼:
- 根據觸發信號獲取攝像頭的圖片;
- 獲取攝像頭的實時圖片;
- 將先圖片保存到本地->上傳阿里雲->在本地進行刪除
import oss2
import time
import cv2
import os
import RPi.GPIO as GPIO
class Camera(object):
"""攝像的初始化,配置"""
def __init__(self, channel):
self.capture = cv2.VideoCapture(channel)
self.fps = int(self.capture.get(cv2.CAP_PROP_FPS))
self.video_height = int(self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.video_width = int(self.capture.get(cv2.CAP_PROP_FRAME_WIDTH))
self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, self.video_width)
self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, self.video_height)
self.capture.set(cv2.CAP_PROP_FPS, self.fps)
# 設置攝像頭的緩存圖片為1,為了能獲取實時的圖像
self.capture.set(cv2.CAP_PROP_BUFFERSIZE, 1)
def get_pic(self, image_dir=None, image_name=None, is_show=False, is_write=False):
"""
寫入照片
"""
if self.capture.isOpened():
# 讀取緩存中的圖片,起到清空緩存的作用
abandon_ret, abandon_frame = self.capture.read()
# 用於處理/上傳的圖片
ret, frame = self.capture.read()
# 判斷圖片信息是否讀取成功
if ret is False:
print('get picture failed')
return None
# 是否需要顯示
if is_show is True:
cv2.imshow('image', frame)
cv2.waitKey(200)
# 是否需要寫入到文件中
if is_write is True:
cv2.imwrite(image_dir + '/' + image_name, frame)
print('get picture success')
return frame
def release_camera(self):
"""
釋放攝像機資源
"""
self.capture.release()
cv2.destroyAllWindows()
class oss(object):
"""對象存儲類,將圖片傳至阿里雲端"""
def __init__(self):
self.auth = oss2.Auth('LTAIkA49HkgVYUsW', '**********************')
self.bucket = oss2.Bucket(self.auth, 'http://oss-cn-hangzhou.aliyuncs.com', '20190731')
def put_image(self, image_dir, image_name):
put_result = self.bucket.put_object_from_file(image_name, image_dir + '/' + image_name)
# 狀態為200表示成功的上傳
if put_result.status == 200:
print('put success')
def get_image(self):
pass
class gpio(object):
"""觸發信號的控制"""
event_flag = False # 判斷是否有電平觸發信號
def __init__(self, pin):
"""GPIO初始化"""
self.pin = pin
# 設置引腳應用模式
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.pin, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
# 設計引腳觸發模式為:上升沿觸發模式,防抖的時間為1000ms
GPIO.add_event_detect(self.pin, GPIO.RISING, callback=gpio.callback, bouncetime=1000)
@staticmethod
def callback(flag):
"""回調函數"""
gpio.event_flag = True
def stop_event(self):
GPIO.remove_event_detect(self.pin)
def start_event(self):
GPIO.add_event_detect(self.pin, GPIO.RISING, callback=gpio.callback, bouncetime=1000)
@staticmethod
def release():
"""GPIO釋放引腳資源"""
GPIO.cleanup()
if __name__ == "__main__":
picture = None
sleep_time = 0
picture_path = './put'
picture_name = '.jpg'
gpio_pin = 18
# oss配置
oss_server = oss()
# 開啟攝像頭
camera = Camera(0)
# GPIO初始化
event = gpio(gpio_pin)
while True:
if gpio.event_flag is True:
gpio.event_flag = False # 標志位置位
event.stop_event() # 關閉事件觸發
picture_name = time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(time.time())) + '.jpg'
# 獲取圖片
picture = camera.get_pic(picture_path, picture_name, is_show=True, is_write=True)
if picture is not None:
pass
# oss_server.put_image(picture_path, picture_name)
else:
camera.release_camera()
camera = Camera(0)
# 刪除照片,防止內存爆炸
os.remove(picture_path + '/' + picture_name)
event.start_event() # 開啟事件觸發
# time.sleep(sleep_time)
gpio.release()