# -*- coding: utf-8 -*-#
# -------------------------------------------------------------------------------
# Name: m3u8下載器
# Author: yunhgu
# Date: 2022/3/4 9:35
# Description:
# -------------------------------------------------------------------------------
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from traceback import format_exc
import m3u8
import requests
from log_color.log_color import Logger
from progress.bar import IncrementalBar
class ProcessData:
def __init__(self, src, dst, name, num=10):
self.src = src
self.num = num
self.name = name
self.dst = Path(dst)
if self.dst.exists():
self.logger = Logger(name=str(self.dst.joinpath("m3u8下載器.log")))
else:
self.logger = Logger(name="m3u8下載器.log")
def start(self):
try:
self.process_data()
except Exception as e:
self.logger.error(f"運行失敗:{format_exc()}:{e}")
@staticmethod
def get_content(file):
with open(file, encoding="utf-8", mode="r") as f:
return f.read()
def process_data(self):
if Path(self.src).is_file():
m3u8_ob = m3u8.loads(self.get_content(self.src))
self.download_ts(m3u8_ob)
elif str(self.src).lower().startswith("http"):
m3u8_ob = m3u8.load(self.src)
self.download_ts(m3u8_ob)
@staticmethod
def download(id_url):
id_int, url = id_url
content = requests.get(url).content
return id_int, content
def download_ts(self, m3u8_ob):
id_content_dic = {}
with ThreadPoolExecutor(max_workers=self.num) as p:
task_list = []
for id_int, seg in enumerate(m3u8_ob.segments):
uri = seg.uri
if seg.base_uri is not None:
uri = f"{seg.base_uri}{uri}"
task_list.append(p.submit(self.download, (id_int, uri)))
suffix = '%(percent)d%% [%(index)d/%(max)d in %(elapsed)ds (eta:%(eta_td)s)]'
with IncrementalBar("下載進度:", max=len(task_list), suffix=suffix) as bar:
for task in as_completed(task_list):
if task.done():
id_int, content = task.result()
id_content_dic[id_int] = content
# output_file = self.dst.joinpath(self.name).joinpath(f"{id_int}.ts")
# output_file.parent.mkdir(parents=True, exist_ok=True)
# self.save_ts(content, output_file)
bar.next()
output_file = self.dst.joinpath(f"{self.name}.ts")
self.save_all_ts(id_content_dic, output_file)
def save_all_ts(self, id_content_dic, file):
suffix = '%(percent)d%% [%(index)d/%(max)d in %(elapsed)ds (eta:%(eta_td)s)]'
with IncrementalBar("保存進度:", max=len(id_content_dic), suffix=suffix) as bar:
with open(file, "wb+") as f:
for _, content in sorted(id_content_dic.items()):
f.write(content)
bar.next()
self.logger.info(f"\r視頻生成位置:{file}")
@staticmethod
def save_ts(content, file):
with open(file, "wb") as f:
f.write(content)
if __name__ == '__main__':
print("******** 開始 ********")
# input_folder = input("請輸入原始文件夾:").strip("\"")
# output_folder = input("請輸入結果保存文件夾:").strip("\"")
# input_folder = r"F:\測試代碼\個人學習\m3u8下載器\index.m3u8"
input_folder = "https://1257120875.vod2.myqcloud.com/0ef121cdvodtransgzp1257120875/3055695e5285890780828799271/v.f230.m3u8"
output_folder = r"F:\測試代碼\個人學習\m3u8下載器\output"
pd = ProcessData(src=input_folder, dst=output_folder, name="video1", num=100)
pd.start()
print("******** 結束 ********")