為了防止xxxxxx問題,這里對url進行base64處理一下,如果要訪問需要encode下
本文是結合了兩個案例,來介紹協程,通過這倆案例,我們能對協程有一個更加清晰認識,在閱讀本篇文章前,建議先看之前的兩篇文章。
並發爬取視頻
1. 解析網頁獲取視頻地址
base64
decode之后的url地址為:"aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw="
使用requeests模塊請求,看下網頁源代碼
import requests
import base64
url_base = 'aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw='
url = base64.b64decode(url_base).decode('utf-8')
resp = requests.get(url)
resp.encoding = 'utf-8'
print(resp.text)
拿到網頁源代碼之后,直接搜索關鍵字'm3u8',可以看到源代碼中只有一個地方包含該關鍵字
直接使用xpath
拿m3u8
et = etree.HTML(html)
m3u8 = et.xpath('//iframe[@id="mplay"]/@src')[0]
print(m3u8)
但是拿的m3u8地址少了host,所以我們得拼一下,拼接url可以使用urllib
模塊中的urljoin
,看了下源碼,其實就是內部做了切分,最終拿出domain
,然后跟傳入的參數進行拼接。
from urllib.parse import urljoin
m3u8_redirct = urljoin(url, m3u8)
print(m3u8_redirct)
訪問該m3u8,繼續拿源代碼,繼續搜索關鍵字m3u8
,發現該關鍵詞在script
標簽中,這樣的話,就得需要re
模塊,通過正則來捕獲出來了
import re
resp2 = requests.get(m3u8_redirct)
resp2.encoding = 'utf-8'
res2 = resp2.text
get_m3u8_re = re.compile(r'url: "(?P<m3u8_url>.*?)",', re.S)
m3u8_url = get_m3u8_re.search(res2).group('m3u8_url')
print(m3u8_url)
到這里,才拿到了第一個m3u8地址,但不是真正的m3u8地址,需要再次請求
resp3 = requests.get(m3u8_url)
resp3.encoding = 'utf-8'
print(resp3.text)
得到請求結果為:
#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=1000000,RESOLUTION=1280x720
/20200824/7QgAzaOH/1000kb/hls/index.m3u8
需要對該返回結果進行切割,以'\n'換行符為分割,最終目的就是為了得到下面的地址: /20200824/7QgAzaOH/1000kb/hls/index.m3u8
resp3 = requests.get(m3u8_url)
resp3.encoding = 'utf-8'
real_m3u8_url = resp3.text.strip().split('\n')[-1]
print(real_m3u8_url)
但是,拿到的地址,還是少了domain,所以還要在進行拼接
real_m3u8_url = urljoin(m3u8_url, real_m3u8_url)
print(real_m3u8_url)
到這里,我們才真正的拿到真正要請求的m3u8地址,通過請求該地址,可以獲取一個映射,通過該映射,我們可以下載到所有的視頻切片。
resp4 = requests.get(real_m3u8_url)
resp4.encoding = 'utf-8'
print(resp4.text)
返回結果為:
分析該返回結果,可以看到所有的ts文件都已經被加密了,我們需要進行解密,然后在進行下載,解密工具是用Crypto
這個模塊中的AES
來完成。
進一步分析該內容,我們可以看到加密的key已經放到了里面,#EXT-X-KEY:METHOD=AES-128
這個后面的URI
就是加密用的key,所以我們直接請求該uri,然后拿到結果即可
resp = requests.get(real_m3u8_url)
get_key_re = re.compile(r'#EXT-X-KEY:.*URI="(?P<encrypt_key>.*?)"', re.S)
encrypt_key = get_key_re.search(resp).group('encrypt_key')
key = requests.get(encrypt_key).content
到這里,終於要寫異步的邏輯了,邏輯很簡單,就是先寫單獨下載ts的代碼,然后在創建一個tasks,最后啟動該任務
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES
async def decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem):
aes = AES.new(key=encrypt_key, mode=AES.MODE_CBC, IV=b'0000000000000000')
async with sem:
for i in range(100):
try:
f_ts_name = m3u8_ts_url.split('/')[-1]
async with aiofiles.open(f"temp/{f_ts_name}", 'wb') as f:
async with session.get(m3u8_ts_url) as resp:
# 異步讀ts
content = await resp.content.read()
# 解密ts
decrypt_ts_content = aes.decrypt(content)
# 異步寫
await f.write(decrypt_ts_content)
print(f"{m3u8_ts_url} 保存成功")
break
except Exception as e:
print(f"{m3u8_ts_url} 下載失敗, 失敗原因為: {e}")
這里需要傳入加密用的key,也就是剛才拿到的key
,第二個參數是單個ts地址,第三個參數是一個session, 最后一個參數是信號量,用於控制並發量,防止一下子請求太多,被屏蔽。
拿ts列表跟拿url差不多, 先通過requests拿到所有內容,然后分割,遍歷,拿到ts列表
resp = requests.get(real_m3u8_url)
ts_list = list()
for line in resp.strip().split('\n'):
if not line.startswith('#') and line.endswith('.ts'):
ts_list.append(line)
print(ts_list)
然后寫創建協程任務的邏輯,直接遍歷ts列表即可
sem = asyncio.Semaphore(500)
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem))
for m3u8_ts_url in ts_list
]
await asyncio.wait(tasks)
到這里,基本上就完成了90%的工作了,剩下的就是把所有的ts切片進行合並就可以了
with open("movie.mp4", 'ab+') as fw:
for ts_line in ts_list:
ts = ts_line.split('/')[-1]
with open(f"temp/{ts}", 'rb') as fr:
fw.write(fr.read())
print(f"{ts} 分片合並成功")
2. 完整代碼
import re
import base64
import requests
import asyncio
import aiohttp
import aiofiles
from lxml import etree
from urllib.parse import urljoin
from Crypto.Cipher import AES
def base_request(url):
try:
resp = requests.get(url)
resp.encoding = 'utf-8'
return resp.text
except Exception as e:
print(f"request {url} failed, and error is {e}")
def get_source_m3u8_url(url_base):
et = etree.HTML(base_request(url_base))
try:
m3u8_play_url = et.xpath('//iframe[@id="mplay"]/@src')[0]
m3u8_redirect_url = urljoin(url_base, m3u8_play_url)
m3u8_resp = base_request(m3u8_redirect_url)
get_m3u8_re = re.compile(r'url: "(?P<m3u8_url>.*?)",', re.S)
m3u8_url = get_m3u8_re.search(m3u8_resp).group('m3u8_url')
real_m3u8_url = base_request(m3u8_url)
real_m3u8_url = real_m3u8_url.strip().split('\n')[-1]
return urljoin(m3u8_url, real_m3u8_url)
except Exception as e:
print(f"沒有找到m3u8跳轉地址,報錯信息如下: {e}")
def get_aes_key(m3u8_url):
for i in range(3):
try:
resp = base_request(m3u8_url)
get_key_re = re.compile(r'#EXT-X-KEY:.*URI="(?P<encrypt_key>.*?)"', re.S)
encrypt_key = get_key_re.search(resp).group('encrypt_key')
return requests.get(encrypt_key).content
except:
print(f"獲取key錯誤, 正在嘗試第{i+1}次重試")
def get_ts_list(m3u8_url):
resp = base_request(m3u8_url)
ts_list = list()
for line in resp.strip().split('\n'):
if not line.startswith('#') and line.endswith('.ts'):
ts_list.append(line)
return ts_list
async def decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem):
aes = AES.new(key=encrypt_key, mode=AES.MODE_CBC, IV=b'0000000000000000')
async with sem:
for i in range(100):
try:
f_ts_name = m3u8_ts_url.split('/')[-1]
async with aiofiles.open(f"temp/{f_ts_name}", 'wb') as f:
async with session.get(m3u8_ts_url) as resp:
# 異步讀ts
content = await resp.content.read()
# 解密ts
decrypt_ts_content = aes.decrypt(content)
# 異步寫
await f.write(decrypt_ts_content)
print(f"{m3u8_ts_url} 保存成功")
break
except Exception as e:
print(f"{m3u8_ts_url} 下載失敗, 失敗原因為: {e}")
def merge_ts_to_single(ts_list: list):
with open("movie.mp4", 'ab+') as fw:
for ts_line in ts_list:
ts = ts_line.split('/')[-1]
with open(f"temp/{ts}", 'rb') as fr:
fw.write(fr.read())
print(f"{ts} 分片合並成功")
print("視頻下載完成")
async def main():
b64_2_url = 'aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw='
url_base = base64.b64decode(b64_2_url).decode('utf-8')
m3u8_url = get_source_m3u8_url(url_base)
print(f"獲取的m3u8地址: {m3u8_url}")
encrypt_key = get_aes_key(m3u8_url)
print(f"獲取用於解密的key: {encrypt_key}")
ts_list = get_ts_list(m3u8_url)
print(f"一共有{len(ts_list)}個切片")
# 設置信號量
sem = asyncio.Semaphore(500)
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem))
for m3u8_ts_url in ts_list
]
await asyncio.wait(tasks)
merge_ts_to_single(ts_list)
if __name__ == '__main__':
asyncio.run(main())
執行的結果如下:
優化使用協程下載圖片的代碼
上篇文章中介紹了,通過協程來並發的去下載圖片,但是每次只能爬取一個網頁下的圖片,假如我們有很多的頁面呢?那該如何去做呢?下面來改進下
方法一、使用協程創建多個任務
import requests
import asyncio
import aiohttp
import aiofiles
import time
import os
import base64
from lxml import etree
def get_img_urls(url):
resp = requests.get(url)
resp.encoding = 'utf-8'
et = etree.HTML(resp.text)
res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
return res
async def save_img(content):
if not os.path.exists("tmp"):
os.makedirs("tmp")
async with aiofiles.open(f"tmp/{time.time()}.jpg", "wb") as f:
await f.write(content)
async def dowbnload_img(img_url):
# 遍歷圖片url列表,下載每一張圖片
async with aiohttp.ClientSession() as session:
async with session.get(img_url) as img_resp:
content = await img_resp.content.read()
await save_img(content)
async def get_content(url):
async with aiohttp.ClientSession() as session:
# 解析頁面源代碼
async with session.get(url) as resp:
html = await resp.text('utf-8')
et = etree.HTML(html)
# 獲取圖片url列表
res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
ts = [asyncio.create_task(dowbnload_img(img_url=img_url)) for img_url in res]
await asyncio.wait(ts)
async def main():
urls = [
"https://bXAud2VpeGluLnFxLmNvbQ==/s/Xa0AYHXnS8dgDdisVCujJw",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/v-DlbfSpyyXntCzHUUjgyQ",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/RVTLyFDXyJScxk627t4xRg",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/31AhdeS-vlZL6i-l1j2Gow",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/aCGsv64HzTxQz47wwoi2QQ"
]
tasks = [
asyncio.create_task(get_content(url))
for url in urls
]
await asyncio.wait(tasks)
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
end = time.time()
print(end - start)
運行時,請將bXAud2VpeGluLnFxLmNvbQ==
這串base64
進行decode
方法二、多進程結合協程
import requests
import asyncio
import aiohttp
import aiofiles
import time
import os
import base64
from lxml import etree
from concurrent.futures import ProcessPoolExecutor
def get_img_urls(url):
resp = requests.get(url)
resp.encoding = 'utf-8'
et = etree.HTML(resp.text)
res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
return res
async def save_img(content):
if not os.path.exists("tmp"):
os.makedirs("tmp")
async with aiofiles.open(f"tmp/{time.time()}.jpg", "wb") as f:
await f.write(content)
async def download_one_img(img_url):
# 遍歷圖片url列表,下載每一張圖片
async with aiohttp.ClientSession() as session:
async with session.get(img_url) as img_resp:
content = await img_resp.content.read()
await save_img(content)
async def download_img(img_urls):
tasks = [
asyncio.create_task(download_one_img(img_url))
for img_url in img_urls
]
await asyncio.wait(tasks)
async def main():
urls = [
"https://bXAud2VpeGluLnFxLmNvbQ==/s/Xa0AYHXnS8dgDdisVCujJw",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/v-DlbfSpyyXntCzHUUjgyQ",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/RVTLyFDXyJScxk627t4xRg",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/31AhdeS-vlZL6i-l1j2Gow",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/aCGsv64HzTxQz47wwoi2QQ"
]
with ProcessPoolExecutor(5) as pool:
for url in urls:
pool.submit(asyncio.run, download_img(url))
if __name__ == '__main__':
start = time.time()
main()
# asyncio.run(main())
end = time.time()
print(end - start)
運行時,請將bXAud2VpeGluLnFxLmNvbQ==
這串base64
進行decode
經過對比,多進程結合協程,和使用協程進行下載時,所用時間幾乎一致,所以這里我們盡量選擇使用協程,因為同樣效率下,協程更加節省資源。
歡迎各位朋友關注我的公眾號,來一起學習進步哦