为了防止xxxxxx问题,这里对url进行base64处理一下,如果要访问需要encode下
本文是结合了两个案例,来介绍协程,通过这俩案例,我们能对协程有一个更加清晰认识,在阅读本篇文章前,建议先看之前的两篇文章。
并发爬取视频
1. 解析网页获取视频地址
base64
decode之后的url地址为:"aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw="
使用requeests模块请求,看下网页源代码
import requests
import base64
url_base = 'aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw='
url = base64.b64decode(url_base).decode('utf-8')
resp = requests.get(url)
resp.encoding = 'utf-8'
print(resp.text)
拿到网页源代码之后,直接搜索关键字'm3u8',可以看到源代码中只有一个地方包含该关键字
直接使用xpath
拿m3u8
et = etree.HTML(html)
m3u8 = et.xpath('//iframe[@id="mplay"]/@src')[0]
print(m3u8)
但是拿的m3u8地址少了host,所以我们得拼一下,拼接url可以使用urllib
模块中的urljoin
,看了下源码,其实就是内部做了切分,最终拿出domain
,然后跟传入的参数进行拼接。
from urllib.parse import urljoin
m3u8_redirct = urljoin(url, m3u8)
print(m3u8_redirct)
访问该m3u8,继续拿源代码,继续搜索关键字m3u8
,发现该关键词在script
标签中,这样的话,就得需要re
模块,通过正则来捕获出来了
import re
resp2 = requests.get(m3u8_redirct)
resp2.encoding = 'utf-8'
res2 = resp2.text
get_m3u8_re = re.compile(r'url: "(?P<m3u8_url>.*?)",', re.S)
m3u8_url = get_m3u8_re.search(res2).group('m3u8_url')
print(m3u8_url)
到这里,才拿到了第一个m3u8地址,但不是真正的m3u8地址,需要再次请求
resp3 = requests.get(m3u8_url)
resp3.encoding = 'utf-8'
print(resp3.text)
得到请求结果为:
#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=1000000,RESOLUTION=1280x720
/20200824/7QgAzaOH/1000kb/hls/index.m3u8
需要对该返回结果进行切割,以'\n'换行符为分割,最终目的就是为了得到下面的地址: /20200824/7QgAzaOH/1000kb/hls/index.m3u8
resp3 = requests.get(m3u8_url)
resp3.encoding = 'utf-8'
real_m3u8_url = resp3.text.strip().split('\n')[-1]
print(real_m3u8_url)
但是,拿到的地址,还是少了domain,所以还要在进行拼接
real_m3u8_url = urljoin(m3u8_url, real_m3u8_url)
print(real_m3u8_url)
到这里,我们才真正的拿到真正要请求的m3u8地址,通过请求该地址,可以获取一个映射,通过该映射,我们可以下载到所有的视频切片。
resp4 = requests.get(real_m3u8_url)
resp4.encoding = 'utf-8'
print(resp4.text)
返回结果为:
分析该返回结果,可以看到所有的ts文件都已经被加密了,我们需要进行解密,然后在进行下载,解密工具是用Crypto
这个模块中的AES
来完成。
进一步分析该内容,我们可以看到加密的key已经放到了里面,#EXT-X-KEY:METHOD=AES-128
这个后面的URI
就是加密用的key,所以我们直接请求该uri,然后拿到结果即可
resp = requests.get(real_m3u8_url)
get_key_re = re.compile(r'#EXT-X-KEY:.*URI="(?P<encrypt_key>.*?)"', re.S)
encrypt_key = get_key_re.search(resp).group('encrypt_key')
key = requests.get(encrypt_key).content
到这里,终于要写异步的逻辑了,逻辑很简单,就是先写单独下载ts的代码,然后在创建一个tasks,最后启动该任务
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES
async def decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem):
aes = AES.new(key=encrypt_key, mode=AES.MODE_CBC, IV=b'0000000000000000')
async with sem:
for i in range(100):
try:
f_ts_name = m3u8_ts_url.split('/')[-1]
async with aiofiles.open(f"temp/{f_ts_name}", 'wb') as f:
async with session.get(m3u8_ts_url) as resp:
# 异步读ts
content = await resp.content.read()
# 解密ts
decrypt_ts_content = aes.decrypt(content)
# 异步写
await f.write(decrypt_ts_content)
print(f"{m3u8_ts_url} 保存成功")
break
except Exception as e:
print(f"{m3u8_ts_url} 下载失败, 失败原因为: {e}")
这里需要传入加密用的key,也就是刚才拿到的key
,第二个参数是单个ts地址,第三个参数是一个session, 最后一个参数是信号量,用于控制并发量,防止一下子请求太多,被屏蔽。
拿ts列表跟拿url差不多, 先通过requests拿到所有内容,然后分割,遍历,拿到ts列表
resp = requests.get(real_m3u8_url)
ts_list = list()
for line in resp.strip().split('\n'):
if not line.startswith('#') and line.endswith('.ts'):
ts_list.append(line)
print(ts_list)
然后写创建协程任务的逻辑,直接遍历ts列表即可
sem = asyncio.Semaphore(500)
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem))
for m3u8_ts_url in ts_list
]
await asyncio.wait(tasks)
到这里,基本上就完成了90%的工作了,剩下的就是把所有的ts切片进行合并就可以了
with open("movie.mp4", 'ab+') as fw:
for ts_line in ts_list:
ts = ts_line.split('/')[-1]
with open(f"temp/{ts}", 'rb') as fr:
fw.write(fr.read())
print(f"{ts} 分片合并成功")
2. 完整代码
import re
import base64
import requests
import asyncio
import aiohttp
import aiofiles
from lxml import etree
from urllib.parse import urljoin
from Crypto.Cipher import AES
def base_request(url):
try:
resp = requests.get(url)
resp.encoding = 'utf-8'
return resp.text
except Exception as e:
print(f"request {url} failed, and error is {e}")
def get_source_m3u8_url(url_base):
et = etree.HTML(base_request(url_base))
try:
m3u8_play_url = et.xpath('//iframe[@id="mplay"]/@src')[0]
m3u8_redirect_url = urljoin(url_base, m3u8_play_url)
m3u8_resp = base_request(m3u8_redirect_url)
get_m3u8_re = re.compile(r'url: "(?P<m3u8_url>.*?)",', re.S)
m3u8_url = get_m3u8_re.search(m3u8_resp).group('m3u8_url')
real_m3u8_url = base_request(m3u8_url)
real_m3u8_url = real_m3u8_url.strip().split('\n')[-1]
return urljoin(m3u8_url, real_m3u8_url)
except Exception as e:
print(f"没有找到m3u8跳转地址,报错信息如下: {e}")
def get_aes_key(m3u8_url):
for i in range(3):
try:
resp = base_request(m3u8_url)
get_key_re = re.compile(r'#EXT-X-KEY:.*URI="(?P<encrypt_key>.*?)"', re.S)
encrypt_key = get_key_re.search(resp).group('encrypt_key')
return requests.get(encrypt_key).content
except:
print(f"获取key错误, 正在尝试第{i+1}次重试")
def get_ts_list(m3u8_url):
resp = base_request(m3u8_url)
ts_list = list()
for line in resp.strip().split('\n'):
if not line.startswith('#') and line.endswith('.ts'):
ts_list.append(line)
return ts_list
async def decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem):
aes = AES.new(key=encrypt_key, mode=AES.MODE_CBC, IV=b'0000000000000000')
async with sem:
for i in range(100):
try:
f_ts_name = m3u8_ts_url.split('/')[-1]
async with aiofiles.open(f"temp/{f_ts_name}", 'wb') as f:
async with session.get(m3u8_ts_url) as resp:
# 异步读ts
content = await resp.content.read()
# 解密ts
decrypt_ts_content = aes.decrypt(content)
# 异步写
await f.write(decrypt_ts_content)
print(f"{m3u8_ts_url} 保存成功")
break
except Exception as e:
print(f"{m3u8_ts_url} 下载失败, 失败原因为: {e}")
def merge_ts_to_single(ts_list: list):
with open("movie.mp4", 'ab+') as fw:
for ts_line in ts_list:
ts = ts_line.split('/')[-1]
with open(f"temp/{ts}", 'rb') as fr:
fw.write(fr.read())
print(f"{ts} 分片合并成功")
print("视频下载完成")
async def main():
b64_2_url = 'aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw='
url_base = base64.b64decode(b64_2_url).decode('utf-8')
m3u8_url = get_source_m3u8_url(url_base)
print(f"获取的m3u8地址: {m3u8_url}")
encrypt_key = get_aes_key(m3u8_url)
print(f"获取用于解密的key: {encrypt_key}")
ts_list = get_ts_list(m3u8_url)
print(f"一共有{len(ts_list)}个切片")
# 设置信号量
sem = asyncio.Semaphore(500)
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem))
for m3u8_ts_url in ts_list
]
await asyncio.wait(tasks)
merge_ts_to_single(ts_list)
if __name__ == '__main__':
asyncio.run(main())
执行的结果如下:
优化使用协程下载图片的代码
上篇文章中介绍了,通过协程来并发的去下载图片,但是每次只能爬取一个网页下的图片,假如我们有很多的页面呢?那该如何去做呢?下面来改进下
方法一、使用协程创建多个任务
import requests
import asyncio
import aiohttp
import aiofiles
import time
import os
import base64
from lxml import etree
def get_img_urls(url):
resp = requests.get(url)
resp.encoding = 'utf-8'
et = etree.HTML(resp.text)
res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
return res
async def save_img(content):
if not os.path.exists("tmp"):
os.makedirs("tmp")
async with aiofiles.open(f"tmp/{time.time()}.jpg", "wb") as f:
await f.write(content)
async def dowbnload_img(img_url):
# 遍历图片url列表,下载每一张图片
async with aiohttp.ClientSession() as session:
async with session.get(img_url) as img_resp:
content = await img_resp.content.read()
await save_img(content)
async def get_content(url):
async with aiohttp.ClientSession() as session:
# 解析页面源代码
async with session.get(url) as resp:
html = await resp.text('utf-8')
et = etree.HTML(html)
# 获取图片url列表
res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
ts = [asyncio.create_task(dowbnload_img(img_url=img_url)) for img_url in res]
await asyncio.wait(ts)
async def main():
urls = [
"https://bXAud2VpeGluLnFxLmNvbQ==/s/Xa0AYHXnS8dgDdisVCujJw",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/v-DlbfSpyyXntCzHUUjgyQ",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/RVTLyFDXyJScxk627t4xRg",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/31AhdeS-vlZL6i-l1j2Gow",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/aCGsv64HzTxQz47wwoi2QQ"
]
tasks = [
asyncio.create_task(get_content(url))
for url in urls
]
await asyncio.wait(tasks)
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
end = time.time()
print(end - start)
运行时,请将bXAud2VpeGluLnFxLmNvbQ==
这串base64
进行decode
方法二、多进程结合协程
import requests
import asyncio
import aiohttp
import aiofiles
import time
import os
import base64
from lxml import etree
from concurrent.futures import ProcessPoolExecutor
def get_img_urls(url):
resp = requests.get(url)
resp.encoding = 'utf-8'
et = etree.HTML(resp.text)
res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
return res
async def save_img(content):
if not os.path.exists("tmp"):
os.makedirs("tmp")
async with aiofiles.open(f"tmp/{time.time()}.jpg", "wb") as f:
await f.write(content)
async def download_one_img(img_url):
# 遍历图片url列表,下载每一张图片
async with aiohttp.ClientSession() as session:
async with session.get(img_url) as img_resp:
content = await img_resp.content.read()
await save_img(content)
async def download_img(img_urls):
tasks = [
asyncio.create_task(download_one_img(img_url))
for img_url in img_urls
]
await asyncio.wait(tasks)
async def main():
urls = [
"https://bXAud2VpeGluLnFxLmNvbQ==/s/Xa0AYHXnS8dgDdisVCujJw",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/v-DlbfSpyyXntCzHUUjgyQ",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/RVTLyFDXyJScxk627t4xRg",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/31AhdeS-vlZL6i-l1j2Gow",
"https://bXAud2VpeGluLnFxLmNvbQ==/s/aCGsv64HzTxQz47wwoi2QQ"
]
with ProcessPoolExecutor(5) as pool:
for url in urls:
pool.submit(asyncio.run, download_img(url))
if __name__ == '__main__':
start = time.time()
main()
# asyncio.run(main())
end = time.time()
print(end - start)
运行时,请将bXAud2VpeGluLnFxLmNvbQ==
这串base64
进行decode
经过对比,多进程结合协程,和使用协程进行下载时,所用时间几乎一致,所以这里我们尽量选择使用协程,因为同样效率下,协程更加节省资源。
欢迎各位朋友关注我的公众号,来一起学习进步哦