1.優酷上傳
1)調用優酷的sdk完成優酷視頻的上傳首先需要將實例化YoukuUpload類實例化,傳入的參數為(client_id,access_token,文件地址)
實例化時執行__init__()
2)實例化完成后得到類的對象,通過對象調用upload方法,傳入參數為字典(dict),字典內的必傳參數為title,其余可為默認,其中的一些參數是為了控制視頻一些信息的,具體參見代碼的注釋
3)在upload方法中
(1)會先判斷upload_token 這個參數是否存在,該參數為優酷返回,存在就繼續之前的上傳,如果不存在的話就判斷為新上傳。
(2)在新上傳中限制性create方法來在服務端創建上傳,在此之前會對傳進來的參數和默認的參數會放入新的字典中
def prepare_video_params(self, title=None, tags='Others', description='', copyright_type='original', public_type='all', category=None, watch_password=None, latitude=None, longitude=None, shoot_time=None ): # 准備視頻所以需要的一些參數 params = {} if title is None: # 如果沒有傳title的話,title默認等於文件名 title = self.file_name elif len(title) > 50: # 如果title過長,長度大於50的話,就截取前50個字符作為title title = title[:50] params['title'] = title params['tags'] = tags # tags就等於你傳進來的那個tags,默認為others params['description'] = description # 描述默認為空,可以傳進來 params['copyright_type'] = copyright_type # 版權所有 original: 原創 reproduced: 轉載 params['public_type'] = public_type # 公開類型 all: 公開 friend: 僅好友觀看 password: 輸入密碼觀看 private: 私有 if category: params['category'] = category # 視頻分類 默認為空,可傳 if watch_password: params['watch_password'] = watch_password # 觀看密碼,默認為空,可傳 """ latitude/longitude用戶記錄地理位置信息 shoot_time用來標記視頻中正片的開始時間 """ if latitude: params['latitude'] = latitude # 緯度 if longitude: params['longitude'] = longitude # 經度 if shoot_time: params['shoot_time'] = shoot_time log.debug("prepare_video_params:%s" % params) return params
(3)在create方法中,會用get方法訪問url = 'https://api.youku.com/uploads/create.json',並將參數傳過去,創建oss客戶端,優酷會返回一些字段
def create(self, params): # prepare file info params['file_name'] = self.file_name params['file_size'] = self.file_size # 在__init__中獲取到了,也就是在類的實例化時就已經根據文件的地址獲取了文件的大小 params['file_md5'] = self.file_md5 = self.checksum_md5file(self.file) # 將文件信息做md5 校驗,根據文件名打開文件,然后每次讀取8192B,進行md5更新后再轉為十六進制返回 self.logger.info('upload file %s, size: %d bytes' % (self.file_name, self.file_size)) self.logger.info('md5 of %s: %s' % (self.file_name, self.file_md5)) params['client_id'] = self.client_id # client_id params['access_token'] = self.access_token # access_token url = 'https://api.youku.com/uploads/create.json' r = requests.get(url, params=params)# 以get方法將文件信息發送到'https://api.youku.com/uploads/create.json' check_error(r, 201) result = r.json() log.debug("file--->vid:%s,return_result:%s" % (self.v_vid, result)) self.upload_token = result['upload_token'] self.logger.info('upload token of %s: %s' % (self.file_name, self.upload_token)) self.upload_server_ip = socket.gethostbyname( result['upload_server_uri']) self.logger.info('upload_server_ip of %s: %s' % (self.file_name, self.upload_server_ip)) log.debug("file_vid:%s ip:%s" % (self.v_vid, self.upload_server_ip))
(4) 調用create_file方法,將文件的大小、token、每次上傳切片大小傳到上一步返回的ip所指向的服務器
(5) 調用new_slice方法 告訴服務器才准備上傳文件切片,目的在於檢查服務器狀態和返回服務器中這個新建切片的信息,沒有報錯就執行_save_slice_state方法
(6)在_save_slice_state方法中,將切片的信息進行更新或者保存,之后判斷返回的task_id是否為0,如果為零就直接提交commit完成整個上傳,不為0就 調用upload_slice方法上傳文件切片
(7)在upload_slice方法中,每次打開文件移動指針位置到上一次上傳的地方,然后讀取2048kb的數據,之后調用url進行上傳。
def upload_slice(self): # 上傳文件切片 seek():移動文件讀取指針到指定位置 data = None with open(self.file, 'rb') as f: f.seek(self.slice_offset) data = f.read(self.slice_length) # 2048 params = { 'upload_token': self.upload_token, 'slice_task_id': self.slice_task_id, 'offset': self.slice_offset, 'length': self.slice_length, # Byte 'hash': self.checksum_md5data(data)# hash這個字段是為了在服務端進行校驗,來驗證上傳的文件有沒有出現丟失或錯誤 } url = 'http://%s/gupload/upload_slice' % self.upload_server_ip r = requests.post(url, params=params, data=data) check_error(r, 201) self._save_slice_state(r.json())
(8) 循環判斷task_id,若task_id為0則結束上傳(即調用commit())
整體流程:應該是先本地將文件的信息上傳到服務器,在服務器創建同名文件,然后根據文件的size和每次對文件進行切片的大小控制上傳,在上傳過程中服務端會先返回這次要上傳的切片的task_id,如果在服務端的文件大小等於你上傳的參數中提交的size,就會返回task_id為0,否則就不為0。本地會根據這個參數來判斷是否結束上傳。
2. 完整優酷上傳sdk代碼
"""Youku cloud Python Client doc: http://cloud.youku.com/docs/doc?id=110 """ import os import requests import json import time import hashlib import socket import logging from time import sleep from util import check_error, YoukuError if not os.path.exists('/var/log/youku/'): os.makedirs('/var/log/youku') log = logging.getLogger() log.setLevel(logging.DEBUG) fmt = logging.Formatter("%(asctime)s %(pathname)s %(filename)s %(funcName)s %(lineno)s %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S") stream_handler = logging.FileHandler( '/var/log/youku/debug-%s.log' % (time.strftime('%Y-%m-%d', time.localtime(time.time())))) stream_handler.setLevel(logging.DEBUG) stream_handler.setFormatter(fmt) log.addHandler(stream_handler) class YoukuUpload(object): """Youku Upload Client. Upload file to Youku Video. Support resume upload if interrupted. Should use one instance of YoukuUpload for one upload file in one thread, since it has internal state of upload process. doc: http://cloud.youku.com/docs/doc?id=110 """ def __init__(self, client_id, access_token, file, v_vid = None, logger=None): """ Args: file: string, include path and filename for open(). filename must contain video file extension. """ super(YoukuUpload, self).__init__() self.client_id = client_id self.access_token = access_token self.v_vid = v_vid # 上傳的視頻id號 self.logger = logger or logging.getLogger(__name__) # file info self.file = file self.file_size = os.path.getsize(self.file) # int 獲取文件的大小 self.file_dir, self.file_name = os.path.split(self.file) # string 分割路徑來獲取文件名 if self.file_dir == '': self.file_dir = '.' self.file_ext = self.file_name.rsplit('.', 1)[1], # file extension self.file_md5 = None # string, do later # upload state self.upload_token = None # string self.upload_server_ip = None # string self.slice_task_id = None # int self.slice_offset = None # int self.slice_length = None # int self.transferred = None # int for bytes has uploaded self.finished = False # boolean # resume upload state self._read_upload_state_from_file() def prepare_video_params(self, title=None, tags='Others', description='', copyright_type='original', public_type='all', category=None, watch_password=None, latitude=None, longitude=None, shoot_time=None ): # 准備視頻所以需要的一些參數 """ util method for create video params to upload. Only need to provide a minimum of two essential parameters: title and tags, other video params are optional. All params spec see: http://cloud.youku.com/docs?id=110#create . Args: title: string, 2-50 characters. tags: string, 1-10 tags joind with comma. description: string, less than 2000 characters. copyright_type: string, 'original' or 'reproduced' public_type: string, 'all' or 'friend' or 'password' watch_password: string, if public_type is password. latitude: double. longitude: double. shoot_time: datetime. Returns: dict params that upload/create method need. """ params = {} if title is None: # 如果沒有傳title的話,title默認等於文件名 title = self.file_name elif len(title) > 50: # 如果title過長,長度大於50的話,就截取前50個字符作為title title = title[:50] params['title'] = title params['tags'] = tags # tags就等於你傳進來的那個tags,默認為others params['description'] = description # 描述默認為空,可以傳進來 params['copyright_type'] = copyright_type # 版權所有 original: 原創 reproduced: 轉載 params['public_type'] = public_type # 公開類型 all: 公開 friend: 僅好友觀看 password: 輸入密碼觀看 private: 私有 if category: params['category'] = category # 視頻分類 默認為空,可傳 if watch_password: params['watch_password'] = watch_password # 觀看密碼,默認為空,可傳 """ latitude/longitude用戶記錄地理位置信息 shoot_time用來標記視頻中正片的開始時間 """ if latitude: params['latitude'] = latitude # 緯度 if longitude: params['longitude'] = longitude # 經度 if shoot_time: params['shoot_time'] = shoot_time log.debug("prepare_video_params:%s" % params) return params def create(self, params): # prepare file info params['file_name'] = self.file_name params['file_size'] = self.file_size # 在__init__中獲取到了,也就是在類的實例化時就已經根據文件的地址獲取了文件的大小 params['file_md5'] = self.file_md5 = self.checksum_md5file(self.file) # 將文件信息做md5 校驗,根據文件名打開文件,然后每次讀取8192B,進行md5更新后再轉為十六進制返回 self.logger.info('upload file %s, size: %d bytes' % (self.file_name, self.file_size)) self.logger.info('md5 of %s: %s' % (self.file_name, self.file_md5)) params['client_id'] = self.client_id # client_id params['access_token'] = self.access_token # access_token url = 'https://api.youku.com/uploads/create.json' r = requests.get(url, params=params)# 以get方法將文件信息發送到'https://api.youku.com/uploads/create.json' check_error(r, 201) result = r.json() log.debug("file--->vid:%s,return_result:%s" % (self.v_vid, result)) self.upload_token = result['upload_token'] self.logger.info('upload token of %s: %s' % (self.file_name, self.upload_token)) self.upload_server_ip = socket.gethostbyname( result['upload_server_uri']) self.logger.info('upload_server_ip of %s: %s' % (self.file_name, self.upload_server_ip)) log.debug("file_vid:%s ip:%s" % (self.v_vid, self.upload_server_ip)) def _save_upload_state_to_file(self): """if create and create_file has execute, save upload state to file for next resume upload if current upload process is interrupted. """ # 保存文件的上傳信息,先判斷文件是否可寫、可讀、可執行 # 保存的信息包括文件的上傳upload_token.上傳到的服務器ip # 保存的文件會在上傳完畢之后刪除 if os.access(self.file_dir, os.W_OK | os.R_OK | os.X_OK): save_file = '/tmp' + 'youku.upload' data = { 'upload_token': self.upload_token, 'upload_server_ip': self.upload_server_ip } with open(save_file, 'w') as f: json.dump(data, f) def _read_upload_state_from_file(self): save_file = '/tmp' + 'youku.upload' try: with open(save_file) as f: data = json.load(f) self.upload_token = data['upload_token'] self.upload_server_ip = data['upload_server_ip'] # check upload_token expired try: self.check() except YoukuError, e: if e.code == 120010223: # Expired upload token self.upload_token = None self.upload_server_ip = None self._delete_upload_state_file() except: pass def _delete_upload_state_file(self): try: os.remove('/tmp' + 'youku.upload') except: pass def checksum_md5file(self, filename): md5 = hashlib.md5() with open(filename, 'rb') as f: for chunk in iter(lambda: f.read(8192), b''): md5.update(chunk) return md5.hexdigest() def checksum_md5data(self, data): md5 = hashlib.md5() md5.update(data) return md5.hexdigest() def create_file(self): params = { 'upload_token': self.upload_token, 'file_size': self.file_size, # Byte 'ext': self.file_ext, 'slice_length': 2048 # KB } # 上傳文件每次傳2048KB url = 'http://%s/gupload/create_file' % self.upload_server_ip r = requests.post(url, data=params) check_error(r, 201) # save upload state to resume upload self._save_upload_state_to_file() def new_slice(self): params = { 'upload_token': self.upload_token } url = 'http://%s/gupload/new_slice' % self.upload_server_ip r = requests.get(url, params=params) check_error(r, 201) self._save_slice_state(r.json()) def _save_slice_state(self, result): # 更新切片狀態 self.slice_task_id = result['slice_task_id'] self.slice_offset = result['offset'] self.slice_length = result['length'] self.transferred = result['transferred'] self.finished = result['finished'] def upload_slice(self): # 上傳文件切片 seek():移動文件讀取指針到指定位置 data = None with open(self.file, 'rb') as f: f.seek(self.slice_offset) data = f.read(self.slice_length) # 2048 params = { 'upload_token': self.upload_token, 'slice_task_id': self.slice_task_id, 'offset': self.slice_offset, 'length': self.slice_length, # Byte 'hash': self.checksum_md5data(data) } url = 'http://%s/gupload/upload_slice' % self.upload_server_ip r = requests.post(url, params=params, data=data) check_error(r, 201) self._save_slice_state(r.json()) def check(self): params = { 'upload_token': self.upload_token } url = 'http://%s/gupload/check' % self.upload_server_ip r = requests.get(url, params=params) check_error(r, 200) return r.json() def commit(self): status = self.check()# 檢查上傳狀態 if status['status'] == 4: raise ValueError('upload has not complete, should not commit') while status['status'] != 1: # status is 2 or 3 sleep(10) status = self.check() params = { 'access_token': self.access_token, 'client_id': self.client_id, 'upload_token': self.upload_token, 'upload_server_ip': status['upload_server_ip'] } url = 'https://api.youku.com/uploads/commit.json' r = requests.post(url, data=params) check_error(r, 200) self.finished = True self._delete_upload_state_file()# 刪除記錄視頻上傳信息的文件 log.debug("sdk---->vid:%s youku video_id:%s" % (self.v_vid, r.json()['video_id'])) return r.json()['video_id'] def cancel(self): status = self.check() params = { 'access_token': self.access_token, 'client_id': self.client_id, 'upload_token': self.upload_token, 'upload_server_ip': status['upload_server_ip'] } url = 'https://api.youku.com/uploads/cancel.json' r = requests.get(url, params=params) check_error(r, 200) self._delete_upload_state_file() return r.json()['upload_token'] def spec(self): url = 'https://api.youku.com/schemas/upload/spec.json' r = requests.get(url) check_error(r, 200) return r.json() def transferred_percent(self): """return current transferred percent """ return int(self.transferred / self.file_size) def upload(self, params={}): """start uploading the file until upload is complete or error. This is the main method to used, If you do not care about state of process. Args: params: a dict object describe video info, eg title, tags, description, category. all video params see the doc of prepare_video_params. Returns: return video_id if upload successfully """ if self.upload_token is not None: # resume upload status = self.check() if status['status'] != 4: return self.commit() else: self.new_slice() while self.slice_task_id != 0: self.upload_slice() return self.commit() else: # new upload try: log.debug('youku upload params:%s' % params) # 記錄上傳參數 except Exception as e: pass self.create(self.prepare_video_params(**params)) # 創建上傳 self.create_file() self.new_slice() while self.slice_task_id != 0: self.upload_slice() return self.commit()