必須先下載 ffmpeg 命令 或ffmpeg.exe
# !/usr/bin/env python
# Software: PyCharm
# __author__ == "YU HAIPENG"
# fileName: VideoHelp.py
# Month: 五月
# time: 2021/5/22 17:06
"""
https://ffmpeg.org/ffmpeg-filters.html 文檔
https://blog.csdn.net/weixin_42081389/article/details/100543007
ffmpeg -y -i 待轉換mp4文件路徑 -vcodec copy -acodec copy -vbsf h264_mp4toannexb 目標ts文件
ffmpeg -i 待轉換ts文件路徑 -c copy -map 0 -f segment -segment_list 目標m3u8文件 -segment_time 單個切片時長 目標ts切片文件名稱
通用選項
-L license
-h 幫助
-fromats 顯示可用的格式,編解碼的,協議的。。。
-f fmt 強迫采用格式fmt
-I filename 輸入文件
-y 覆蓋輸出文件
-t duration 設置紀錄時間 hh:mm:ss[.xxx]格式的記錄時間也支持
-ss position 搜索到指定的時間 [-]hh:mm:ss[.xxx]的格式也支持
-title string 設置標題
-author string 設置作者
-copyright string 設置版權
-comment string 設置評論
-target type 設置目標文件類型(vcd,svcd,dvd) 所有的格式選項(比特率,編解碼以及緩沖區大小)自動設置 ,只需要輸入如下的就可以了:
ffmpeg -i myfile.avi -target vcd /tmp/vcd.mpg
-hq 激活高質量設置
-itsoffset offset 設置以秒為基准的時間偏移,該選項影響所有后面的輸入文件。
該偏移被加到輸入文件的時戳,定義一個正偏移意味着相應的流被延遲了 offset秒。 [-]hh:mm:ss[.xxx]的格式也支持
視頻選項
-b bitrate 設置比特率,缺省200kb/s
-r fps 設置幀頻 缺省25
-s size 設置幀大小 格式為WXH 缺省160X128.下面的簡寫也可以直接使用:
Sqcif 128X96 qcif 176X144 cif 252X288 4cif 704X576
-aspect aspect 設置橫縱比 4:3 16:9 或 1.3333 1.7777
-croptop size 設置頂部切除帶大小 像素單位
-cropbottom size –cropleft size –cropright size
-padtop size 設置頂部補齊的大小 像素單位
-padbottom size –padleft size –padright size –padcolor color 設置補齊條顏色(hex,6個16進制的數,紅:綠:蘭排列,比如 000000代表黑色)
-vn 不做視頻記錄
-bt tolerance 設置視頻碼率容忍度kbit/s
-maxrate bitrate設置最大視頻碼率容忍度
-minrate bitreate 設置最小視頻碼率容忍度
-bufsize size 設置碼率控制緩沖區大小
-vcodec codec 強制使用codec編解碼方式。 如果用copy表示原始編解碼數據必須被拷貝。
-sameq 使用同樣視頻質量作為源(VBR)
-pass n 選擇處理遍數(1或者2)。兩遍編碼非常有用。第一遍生成統計信息,第二遍生成精確的請求的碼率
-passlogfile file 選擇兩遍的紀錄文件名為file
高級視頻選項
-g gop_size 設置圖像組大小
-intra 僅適用幀內編碼
-qscale q 使用固定的視頻量化標度(VBR)
-qmin q 最小視頻量化標度(VBR)
-qmax q 最大視頻量化標度(VBR)
-qdiff q 量化標度間最大偏差 (VBR)
-qblur blur 視頻量化標度柔化(VBR)
-qcomp compression 視頻量化標度壓縮(VBR)
-rc_init_cplx complexity 一遍編碼的初始復雜度
-b_qfactor factor 在p和b幀間的qp因子
-i_qfactor factor 在p和i幀間的qp因子
-b_qoffset offset 在p和b幀間的qp偏差
-i_qoffset offset 在p和i幀間的qp偏差
-rc_eq equation 設置碼率控制方程 默認tex^qComp
-rc_override override 特定間隔下的速率控制重載
-me method 設置運動估計的方法 可用方法有 zero phods log x1 epzs(缺省) full
-dct_algo algo 設置dct的算法 可用的有 0 FF_DCT_AUTO 缺省的DCT 1 FF_DCT_FASTINT 2 FF_DCT_INT 3 FF_DCT_MMX 4 FF_DCT_MLIB 5 FF_DCT_ALTIVEC
-idct_algo algo 設置idct算法。可用的有 0 FF_IDCT_AUTO 缺省的IDCT 1 FF_IDCT_INT 2 FF_IDCT_SIMPLE 3 FF_IDCT_SIMPLEMMX 4 FF_IDCT_LIBMPEG2MMX 5 FF_IDCT_PS2 6 FF_IDCT_MLIB 7 FF_IDCT_ARM 8 FF_IDCT_ALTIVEC 9 FF_IDCT_SH4 10 FF_IDCT_SIMPLEARM
-er n 設置錯誤殘留為n 1 FF_ER_CAREFULL 缺省 2 FF_ER_COMPLIANT 3 FF_ER_AGGRESSIVE 4 FF_ER_VERY_AGGRESSIVE
-ec bit_mask 設置錯誤掩蔽為bit_mask,該值為如下值的位掩碼 1 FF_EC_GUESS_MVS (default=enabled) 2 FF_EC_DEBLOCK (default=enabled)
-bf frames 使用frames B 幀,支持mpeg1,mpeg2,mpeg4
-mbd mode 宏塊決策 0 FF_MB_DECISION_SIMPLE 使用mb_cmp 1 FF_MB_DECISION_BITS 2 FF_MB_DECISION_RD
-4mv 使用4個運動矢量 僅用於mpeg4
-part 使用數據划分 僅用於mpeg4
-bug param 繞過沒有被自動監測到編碼器的問題
-strict strictness 跟標准的嚴格性
-aic 使能高級幀內編碼 h263+
-umv 使能無限運動矢量 h263+
-deinterlace 不采用交織方法
-interlace 強迫交織法編碼 僅對mpeg2和mpeg4有效。當你的輸入是交織的並且你想要保持交織以最小圖像損失的時候采用該選項。可選的方法是不交織,但是損失更大
-psnr 計算壓縮幀的psnr
-vstats 輸出視頻編碼統計到vstats_hhmmss.log
-vhook module 插入視頻處理模塊 module 包括了模塊名和參數,用空格分開
音頻選項
-ab bitrate 設置音頻碼率
-ar freq 設置音頻采樣率
-ac channels 設置通道 缺省為1
-an 不使能音頻紀錄
-acodec codec 使用codec編解碼
音頻/視頻捕獲選項
-vd device 設置視頻捕獲設備。比如/dev/video0
-vc channel 設置視頻捕獲通道 DV1394專用
-tvstd standard 設置電視標准 NTSC PAL(SECAM)
-dv1394 設置DV1394捕獲
-av device 設置音頻設備 比如/dev/dsp
高級選項
-map file:stream 設置輸入流映射
-debug 打印特定調試信息
-benchmark 為基准測試加入時間
-hex 傾倒每一個輸入包
-bitexact 僅使用位精確算法 用於編解碼測試
-ps size 設置包大小,以bits為單位
-re 以本地幀頻讀數據,主要用於模擬捕獲設備
-loop 循環輸入流。只工作於圖像流,用於ffserver測試
# todo example
1、avi轉MP4命令:
ffmpeg -i .\\Video.avi -c copy -map 0 video.mp4
或
ffmpeg -i .\\Video.avi -c:v libx264 -crf 19 -preset slow -c:a aac -b:a 192k -ac 2 video.mp4
//剪切視頻
ffmpeg -ss 0:1:30 -t 0:0:50 -i 1.avi -vcodec copy -acodec copy 3.mp4
//-r 提取圖像的頻率,-ss 開始時間,-t 持續時間
MP4轉ts
ffmpeg -i .\video.mp4 output.ts
視頻壓縮
1)ffmpeg -i 123_ffmpeg.mp4 (壓縮的文件更大更清晰,一般情況下不用)
2)ffmpeg.exe -i 123.MP4 -b:v 700k 1231_ffmpeg.mp4(壓縮的更小,相對模糊一些)
"""
import os
from re import compile
import filetype
import platform
import subprocess
import requests
import time
reg = compile(r".*%0+\dd\.ts")
def main_path(path: str):
"""
路徑總函數
@param path:
@return:
"""
current_path = os.getcwd()
if path.startswith('..'):
path = _wne_path(_parse(path, current_path))
elif path.startswith('.'):
path = _wne_path(path[1:], current_path)
elif path:
path = _wne_path(path)
else:
raise ValueError('文件路徑錯誤')
return path
def _parse(path: str, current_path):
"""
解析 .. 路徑
:param path:
:param current_path:
:return:
"""
new_path_args = list(filter(lambda x: x != '', _get_path_params(path)))
row = 0
while row < len(new_path_args):
if new_path_args[row] == '..':
current_path = os.path.dirname(current_path)
new_path_args.remove(new_path_args[row])
row -= 1
else:
break
row += 1
return os.path.join(current_path, *new_path_args)
def _wne_path(new_path: str, current_path=None):
new_path_args = _get_path_params(new_path)
if current_path:
path = os.path.join(current_path, *new_path_args)
else:
sys_str = get_sys()
if sys_str == "Windows":
if new_path_args[0].find(':') != -1:
new_path_args[0] += os.sep
path = os.path.join(*new_path_args)
elif sys_str == ["Linux", "Mac", "Darwin"]:
if new_path_args[0] == '':
new_path_args[0] = os.sep
path = os.path.join(*new_path_args)
else:
path = new_path
return path
def get_sys():
"""
平台
@return:
"""
sys_str = platform.system()
return sys_str
def get_home():
"""
家目錄
@return:
"""
return os.path.expanduser("~")
def _get_path_params(path):
"""
路徑參數
@param path:
@return:
"""
return path.split('/') if path.find('/') != -1 else path.split('\\')
def guss_file_type(file_path):
"""
file_path
@param file_path:
@return:
"""
kind = filetype.guess_extension(file_path)
return kind
def m3u8_to_mp4_slice_download(m3u8_urls: list, sep=100, **kwargs):
"""
分片下載
@param m3u8_urls: urls 數組 過大時可用分片下載 傳入的 m3u8_urls必須排好序 果ts文件名 或 ts 文件路徑
@param sep:
@param kwargs: M3u8ToMp4 參數
@return:
"""
n = 1
mp4_path_name = kwargs.pop("mp4_path_name", None) or "temp"
if mp4_path_name.endswith(".mp4"):
mp4_path_name = os.path.splitext(mp4_path_name)[0]
for start in range(0, len(m3u8_urls), sep):
new_list = m3u8_urls[start:start + sep]
M3u8ToMp4(new_list, mp4_path_name=f"{mp4_path_name}{str(n).zfill(5)}.mp4", **kwargs).run()
n += 1
def get_log(self):
if self.logger is None:
import logging
logging.basicConfig(
format='PROCESS ID:%(process)d: %(asctime)s-%(name)s-%(levelname)s '
'-[line:%(lineno)d]: %(message)s',
level=logging.INFO)
logger = logging.getLogger('video_log')
self.logger = logger
class OtherFormatToMp4:
"""
別的格式轉mp4
todo 請先安裝 ffmpeg 程序
"""
def __init__(
self,
origin_video_path,
save_mp4_path,
logger=None,
change_timeout=None):
"""
@param origin_video_path: 原視頻路徑 ../xx or ./x or abs_path
@param save_mp4_path: 生成文件路徑
@param change_timeout: 生成文件路徑
@param logger
"""
self.origin_video_path = main_path(origin_video_path)
self.save_mp4_path = main_path(save_mp4_path)
self.logger = logger
self.change_timeout = 180 if change_timeout is None else change_timeout
get_log(self)
def run(self):
return self.to_mp4()
def to_mp4(self):
try:
params = self.get_change_params()
self.logger.info("change start %s", self.origin_video_path)
self.__change_to_mp4(params)
self.logger.info("change end %s", self.origin_video_path)
finally:
for handler in self.logger.handlers:
if hasattr(handler, "close"):
handler.close() # noqa
self.logger.handlers.clear()
self.logger.info("over")
def get_change_params(self):
if not self.save_mp4_path.endswith(".mp4"):
self.save_mp4_path = f"{main_path(self.save_mp4_path)}.mp4"
self.make_folder(self.save_mp4_path)
cmd = F"ffmpeg -y -i {self.origin_video_path} " \
F"-vcodec copy -acodec copy {self.save_mp4_path}"
return cmd
def __change_to_mp4(self, cmd):
subprocess.check_output(cmd, shell=True, timeout=self.change_timeout)
@staticmethod
def make_folder(save_mp4_path):
a, b = os.path.split(save_mp4_path)
if not a:
return
if not os.path.isdir(a):
os.makedirs(a)
class M3u8ToMp4:
"""ts_文件 m3u8文件轉mp4"""
def __init__(
self,
m3u8_path,
request_bool=False,
decrypt_params: dict = None,
decrypt_callback=None,
decrypt_decode_before_func=None,
decrypt_decode_after_func=None,
req_headers=None,
req_method='GET',
req_params: dict = None,
req_data: dict = None,
req_before_func=None,
req_after_func=None,
thread_pool=None,
logger=None,
sort_func=None,
mp4_path_name=None,
change_timeout=None,
req_sleep=None
):
"""
todo 請先安裝 ffmpeg 程序 (本轉化為調用ffmpeg程序轉化)
@param m3u8_path: [url, url] or folder
todo if m3u8_path is urls -> request_bool must be True urls must be sort
todo 如果 m3u8_path url集合長度大於10000, 最好分片下載 調用 m3u8_to_mp4_slice_download
@param request_bool: 讓本程序發送請求 m3u8_path 必須是完整的 url 集合 bool
@param req_method: request_bool 為 true 生效 req 參數 - > dict
@param req_params: request_bool 為 true 生效 req 參數 - > dict
@param req_data: request_bool 為 true 生效 req 參數 - > dict
@param req_headers: 請求頭參數
@param req_before_func request_bool=True 生效
入參 req_params, req_data 必須返回處理后的 req_params, req_data
@param req_after_func request_bool=True 生效
入參 請求后的返回對象 response 必須返回處理后的 response
@param decrypt_params: 解密函數所要的參數 字典格式
@param decrypt_callback: 解密函數 必須返回 bytes 解碼后的數據
@param decrypt_decode_before_func: request_bool=True 生效
解密前的調用的函數 入參 data, index(當前請求標識), decrypt_params 必須返回 data, kwargs (解密參數)
@param decrypt_decode_after_func: request_bool=True 生效
解密后的調用的函數 入參 解密后的 data 必須返回處理后的 data
@param thread_pool: request_bool 為 true 時啟用的線程池個數
@param sort_func: ts文件的 排序規則 返回排好序的list 默認排序sort
@param mp4_path_name: 轉化后mp4的存儲路徑 xxx/xxx.mp4
@param logger: 日志記錄器
@param self.change_timeout: 超時設置 默認 180 s
"""
self.change_timeout = 180 if change_timeout is None else change_timeout
if not request_bool:
if not isinstance(m3u8_path, str):
raise FileNotFoundError(f"this path m3u8_path is not folder")
m3u8_path = main_path(m3u8_path)
if not os.path.isdir(m3u8_path):
raise FileNotFoundError(f"this path m3u8_path is not folder")
self.m3u8_path = m3u8_path
self.decrypt_callback = decrypt_callback
self.decrypt_params = decrypt_params
self.sort_func = sort_func
self.req_sleep = req_sleep
self.logger = logger
self.mp4_path_name = mp4_path_name
self.req_headers = req_headers or {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36"
}
self._remove_ts = False
self.__ts_len = None
self.request_bool = request_bool
self.thread_pool = thread_pool
self.req_method = req_method
self.req_params = req_params
self.req_data = req_data
self.decrypt_decode_before_func = decrypt_decode_before_func
self.decrypt_decode_after_func = decrypt_decode_after_func
self.req_before_func = req_before_func
self.req_after_func = req_after_func
get_log(self)
def req(self):
if not self.m3u8_path or not isinstance(self.m3u8_path, list):
raise ValueError(f"this m3u8_path is not list or empty")
temp_path = main_path(f"./temp_{self.get_uuid}")
if not os.path.isdir(temp_path):
os.makedirs(temp_path)
self._remove_ts = True
from concurrent.futures import ThreadPoolExecutor, as_completed
thread = ThreadPoolExecutor(self.thread_pool)
all_task = [
thread.submit(
self._requests,
index,
url) for index,
url in enumerate(
self.m3u8_path,
1)]
all_t = list()
for future in as_completed(all_task):
data, index = future.result()
if callable(self.decrypt_callback):
new_decrypt_params = self.decrypt_params.copy() if self.decrypt_params else dict()
if callable(self.decrypt_decode_before_func):
data, new_decrypt_params = self.decrypt_decode_before_func(
data, index, new_decrypt_params)
data = self.decrypt_callback(data, **new_decrypt_params)
if callable(self.decrypt_decode_after_func):
data = self.decrypt_decode_after_func(data)
all_t.append(thread.submit(self._save, data, index, temp_path))
for a in as_completed(all_t):
self.logger.info(f"file %s download %s" % a.result())
self.m3u8_path = temp_path
thread.shutdown()
def _save(self, data, index, temp_path):
if self.__ts_len is None:
self.__ts_len = 1
name = self.get_ts_name(index)
ts_path = os.path.join(temp_path, name)
with open(ts_path, mode='wb') as f:
f.write(data)
return os.path.basename(ts_path), "ok"
def get_ts_name(self, index: int):
return f"{self.get_uuid}_{str(index).zfill(7)}.ts"
@property
def get_uuid(self):
import uuid
if hasattr(self, "_uuid"):
return self._uuid
self._uuid = uuid.uuid4().hex[:7] # noqa
return self._uuid
def _requests(self, index, url, times=0):
with requests.session() as session:
session.headers.update(self.req_headers)
params = self.req_params.copy() if self.req_params else dict()
data = self.req_data.copy() if self.req_data else dict()
if callable(self.req_before_func):
params, data = self.req_before_func(params, data)
self.logger.info("request url %s start", url)
req_func = getattr(session, self.req_method.lower())
if self.req_sleep and isinstance(self.req_sleep, (int, float)):
time.sleep(self.req_sleep)
try:
res = req_func(
url,
verify=False,
timeout=30,
data=data,
params=params
)
except requests.exceptions.ConnectionError as e:
if times > 3:
raise e
return self._requests(index, url, times + 1)
self.logger.info("request url %s end", url)
if callable(self.req_after_func):
res = self.req_after_func(res)
return res.content, index
def run(self, remove_ts=False):
self._remove_ts = remove_ts
return self.to_mp4()
def to_mp4(self):
self.logger.info("start %s", "--".center(50, '-'))
try:
if self.request_bool:
self.req()
self.parse()
finally:
if self.logger:
for handler in self.logger.handlers:
if hasattr(handler, "close"):
handler.close() # noqa
self.logger.handlers.clear()
self.logger.info("end %s", "--".center(50, '-'))
def parse(self):
ts_path_list = []
if self.__ts_len is None:
if callable(self.sort_func):
ts_path_list = self.sort_func(ts_path_list)
else:
ts_path_list = self.default_sort_func
else:
ts_path_list = self.default_sort_func
self.__change_to_mp4(ts_path_list)
@property
def default_sort_func(self):
return list(filter(lambda x: x.endswith(".ts"), sorted(os.listdir(self.m3u8_path))))
def __change_to_mp4(self, ts_path_list):
if self.mp4_path_name is None:
self.mp4_path_name = main_path(
os.path.join(
os.path.dirname(self.m3u8_path),
f"{os.path.basename(self.m3u8_path)}_{self.get_uuid}.mp4"
)
)
if self.logger:
self.logger.info(f"mp4_path_name {self.mp4_path_name}")
else:
if not self.mp4_path_name.endswith(".mp4"):
self.mp4_path_name = f"{main_path(self.mp4_path_name)}.mp4"
self.mp4_path_name = main_path(self.mp4_path_name)
p, file_name = os.path.split(self.mp4_path_name)
if p:
if not os.path.isdir(p):
try:
os.makedirs(p)
except FileExistsError:
pass
if file_name == ".mp4":
file_name = f"{self.get_uuid}{file_name}"
self.mp4_path_name = os.path.join(p, file_name)
# todo cmd 中不能帶有 ffmpeg 不能解析的空格
cmd = self._get_cmd(self.m3u8_path, ts_path_list, self.mp4_path_name)
try:
# todo 轉化開始
subprocess.check_output(
cmd, shell=True, timeout=self.change_timeout)
finally:
if self._remove_ts:
self.logger.info("原始文件刪除中 %s", '--'.center(50, '-'))
for n in ts_path_list:
os.unlink(os.path.join(self.m3u8_path, n))
if not os.listdir(self.m3u8_path):
os.rmdir(self.m3u8_path)
self.logger.info("原始文件刪除成功 %s", '--'.center(50, '-'))
return
@staticmethod
def _get_cmd(path, name_list, mp4_path_name, to_encoding=None):
if not to_encoding:
if get_sys() == "Windows":
to_encoding = "gbk"
else:
to_encoding = 'utf8'
out_list = 'concat:'
for n in name_list:
out_list += os.path.join(path,
n).encode(to_encoding).decode(to_encoding) + '|'
cmd = f'ffmpeg -i "{out_list.rstrip("|")}" -y -acodec copy -vcodec copy -absf aac_adtstoasc {mp4_path_name}'
return cmd
class MP4ToM3u8:
def __init__(
self,
ts_time=5,
change_timeout=30,
logger=None,
*,
encrypt=False,
key_len=16
):
"""
@param ts_time: ts 切片時長
@param change_timeout: 轉換超時時長
@param encrypt:
@param key_len:
"""
self.ts_time = ts_time
self.change_timeout = change_timeout
self._enc_info_path = "enc.key"
self.key_len = key_len
self.encrypt = encrypt
self.logger = logger
get_log(self)
@staticmethod
def __get_key(length=16):
b = '0123456789abcdefghigklmnopqrstuvwxyzABCDEFGHIGKLMNOPQRSTUVWXYZ'
import random
return ''.join(random.sample(b, length)).encode('utf8')
def run(self, *args, **kwargs):
"""
路徑中空格不要出現
@param args:
@param kwargs:
@return:
"""
return self.mp4_to_m3u8(*args, **kwargs)
def mp4_to_m3u8(self, mp4_path: str, save_m3u8_path=None, ts_time=None, cmd=''):
"""
@param mp4_path: mp4 路徑
@param save_m3u8_path: 保存
@param ts_time:
@param cmd:
@return:
"""
self.logger.info("start to m3u8 %s", '--'.center(50, '-'))
mp4_path = main_path(mp4_path)
kind = guss_file_type(mp4_path)
create_dir = False
if kind != "mp4":
raise ValueError("file not mp4")
if not save_m3u8_path:
create_dir = True
save_m3u8_path = os.path.splitext(mp4_path)[0]
if not save_m3u8_path.endswith(".m3u8"):
save_m3u8_path += ".m3u8"
if create_dir:
if not os.path.isdir(save_m3u8_path):
os.makedirs(save_m3u8_path)
save_m3u8_path = os.path.join(save_m3u8_path, os.path.split(save_m3u8_path)[-1])
save_ts_file_path = os.path.splitext(save_m3u8_path)[0]
else:
save_ts_file_path = save_m3u8_path
if not cmd:
if not save_ts_file_path.endswith(".ts"):
if save_ts_file_path.endswith(".m3u8"):
save_ts_file_path = os.path.splitext(save_m3u8_path)[0]
save_ts_file_path += ".ts"
f_path, f_name = os.path.split(save_ts_file_path)
if not reg.search(f_name):
f_na_li = list(os.path.splitext(f_name))
f_na_li.insert(1, "%07d")
f_name = ''.join(f_na_li)
del f_na_li
save_ts_file_path = os.path.join(f_path, f_name)
del f_name
del f_path
cmd = F"ffmpeg -i " \
F"{mp4_path} " \
F"-f segment -segment_time " \
F"{ts_time or self.ts_time} " \
F"-segment_format mpegts -segment_list " \
F"{save_m3u8_path} " \
F"-c copy -bsf:v h264_mp4toannexb -map 0 " \
F"{save_ts_file_path} -y"
if os.path.exists(save_m3u8_path):
return
# todo 轉化開始
subprocess.check_output(
cmd, shell=True, timeout=self.change_timeout)
if self.encrypt:
with open(os.path.join(os.path.split(save_m3u8_path)[0], self._enc_info_path), 'w') as f:
f.write(f"key:{self.__get_key(self.key_len)}\n")
f.write(f"iv:{self.__get_key()}\n")
self.logger.info("start to m3u8 end %s", '--'.center(50, '-'))
class MP4ToTs:
def __init__(self, change_timeout=None, thread_pool=None, logger=None):
self.change_timeout = change_timeout or 60 * 30
self.save_ts_path = []
self.thread_pool = thread_pool or 30
self.logger = logger
self._remove_ts = False
get_log(self)
def run(self, remove_ts=False, *args, **kwargs):
self._remove_ts = remove_ts
return self.mp4_to_ts(*args, **kwargs)
def get_mp4_list(self, folder, save_ts_path, video_format='.mp4'):
"""
@param folder:
@param save_ts_path:
@param video_format:
@return:
"""
li = []
for index, file in enumerate(os.listdir(folder), 1):
if os.path.splitext(file)[-1] != video_format:
continue
if save_ts_path:
a, b = os.path.splitext(save_ts_path)
self.save_ts_path.append(f"{a}{str(index).zfill(3)}{b}")
else:
a = os.path.splitext(file)[0]
self.save_ts_path.append(f"{a}.ts")
li.append(file)
return li
def mp4_to_ts(self, mp4_path: str, save_ts_path=None):
"""
@param mp4_path:
@param save_ts_path:
@return:
"""
self.logger.info("start to ts %s", '--'.center(50, '-'))
mp4_path = main_path(mp4_path)
if os.path.isdir(mp4_path):
mp4_path_list = self.get_mp4_list(mp4_path, save_ts_path)
else:
if not save_ts_path:
self.save_ts_path.append(os.path.splitext(mp4_path)[0] + ".ts")
else:
self.save_ts_path.append(save_ts_path)
mp4_path_list = [mp4_path]
from concurrent.futures import ThreadPoolExecutor, as_completed
thread = ThreadPoolExecutor(self.thread_pool)
thread_list = []
for index, mp4_file in enumerate(mp4_path_list):
mp4_path_file = os.path.join(mp4_path, mp4_file)
kind = guss_file_type(mp4_path_file)
if kind != "mp4":
raise ValueError("file not mp4")
save_ts_file_path = self.save_ts_path[index]
folder_file = os.path.split(save_ts_file_path)[0]
if not save_ts_path and not folder_file:
save_ts_file_path = os.path.join(mp4_path, save_ts_file_path)
folder_file = os.path.split(save_ts_file_path)[0]
if folder_file and not os.path.isdir(folder_file):
os.makedirs(folder_file)
cmd = F"ffmpeg -y -i {mp4_path_file} " \
F"-vcodec copy -acodec copy -vbsf " \
F"h264_mp4toannexb {save_ts_file_path}"
if os.path.exists(save_ts_file_path):
continue
# todo 轉化開始
_t_ = thread.submit(
self.__change,
cmd,
save_ts_file_path,
shell=True,
timeout=self.change_timeout
)
thread_list.append(_t_)
for k in as_completed(thread_list):
self.logger.info("file save ok %s ", k.result())
thread.shutdown()
if self._remove_ts:
for mp4_file in mp4_path_list:
mp4_path_file = os.path.join(mp4_path, mp4_file)
os.unlink(mp4_path_file)
if not os.listdir(mp4_path):
os.rmdir(mp4_path)
self.logger.info("start to ts end %s", '--'.center(50, '-'))
@staticmethod
def __change(cmd, mp4_path_file, **kwargs):
subprocess.check_output(cmd, **kwargs)
return mp4_path_file
```python