python爬虫之使用协程爬取小电影


为了防止xxxxxx问题,这里对url进行base64处理一下,如果要访问需要encode下

本文是结合了两个案例,来介绍协程,通过这俩案例,我们能对协程有一个更加清晰认识,在阅读本篇文章前,建议先看之前的两篇文章。

并发爬取视频

1. 解析网页获取视频地址

base64 decode之后的url地址为:"aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw="
使用requeests模块请求,看下网页源代码

import requests
import base64

url_base = 'aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw='

url = base64.b64decode(url_base).decode('utf-8')
resp = requests.get(url)
resp.encoding = 'utf-8'
print(resp.text)

拿到网页源代码之后,直接搜索关键字'm3u8',可以看到源代码中只有一个地方包含该关键字

image-20211101185437971

直接使用xpath拿m3u8

et = etree.HTML(html)
m3u8 = et.xpath('//iframe[@id="mplay"]/@src')[0]
print(m3u8)

但是拿的m3u8地址少了host,所以我们得拼一下,拼接url可以使用urllib模块中的urljoin,看了下源码,其实就是内部做了切分,最终拿出domain,然后跟传入的参数进行拼接。

from urllib.parse import urljoin

m3u8_redirct = urljoin(url, m3u8)
print(m3u8_redirct)

访问该m3u8,继续拿源代码,继续搜索关键字m3u8,发现该关键词在script标签中,这样的话,就得需要re模块,通过正则来捕获出来了

image-20211101190413564

import re

resp2 = requests.get(m3u8_redirct)
resp2.encoding = 'utf-8'
res2 = resp2.text
get_m3u8_re = re.compile(r'url: "(?P<m3u8_url>.*?)",', re.S)
m3u8_url = get_m3u8_re.search(res2).group('m3u8_url')
print(m3u8_url)

到这里,才拿到了第一个m3u8地址,但不是真正的m3u8地址,需要再次请求

resp3 = requests.get(m3u8_url)
resp3.encoding = 'utf-8'
print(resp3.text)

得到请求结果为:

#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=1000000,RESOLUTION=1280x720
/20200824/7QgAzaOH/1000kb/hls/index.m3u8

需要对该返回结果进行切割,以'\n'换行符为分割,最终目的就是为了得到下面的地址: /20200824/7QgAzaOH/1000kb/hls/index.m3u8

resp3 = requests.get(m3u8_url)
resp3.encoding = 'utf-8'
real_m3u8_url = resp3.text.strip().split('\n')[-1]
print(real_m3u8_url)

但是,拿到的地址,还是少了domain,所以还要在进行拼接

real_m3u8_url = urljoin(m3u8_url, real_m3u8_url)
print(real_m3u8_url)

到这里,我们才真正的拿到真正要请求的m3u8地址,通过请求该地址,可以获取一个映射,通过该映射,我们可以下载到所有的视频切片。

resp4 = requests.get(real_m3u8_url)
resp4.encoding = 'utf-8'
print(resp4.text)

返回结果为:

image-20211101191759079

分析该返回结果,可以看到所有的ts文件都已经被加密了,我们需要进行解密,然后在进行下载,解密工具是用Crypto这个模块中的AES来完成。

进一步分析该内容,我们可以看到加密的key已经放到了里面,#EXT-X-KEY:METHOD=AES-128这个后面的URI就是加密用的key,所以我们直接请求该uri,然后拿到结果即可

resp = requests.get(real_m3u8_url)
get_key_re = re.compile(r'#EXT-X-KEY:.*URI="(?P<encrypt_key>.*?)"', re.S)
encrypt_key = get_key_re.search(resp).group('encrypt_key')
key = requests.get(encrypt_key).content

到这里,终于要写异步的逻辑了,逻辑很简单,就是先写单独下载ts的代码,然后在创建一个tasks,最后启动该任务

import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES

async def decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem):
    aes = AES.new(key=encrypt_key, mode=AES.MODE_CBC, IV=b'0000000000000000')
    async with sem:
        for i in range(100):
            try:
                f_ts_name = m3u8_ts_url.split('/')[-1]
                async with aiofiles.open(f"temp/{f_ts_name}", 'wb') as f:
                    async with session.get(m3u8_ts_url) as resp:
                        # 异步读ts
                        content = await resp.content.read()
                    # 解密ts
                    decrypt_ts_content = aes.decrypt(content)
                    # 异步写
                    await f.write(decrypt_ts_content)
                    print(f"{m3u8_ts_url} 保存成功")
                    break
            except Exception as e:
                print(f"{m3u8_ts_url} 下载失败, 失败原因为: {e}")

这里需要传入加密用的key,也就是刚才拿到的key,第二个参数是单个ts地址,第三个参数是一个session, 最后一个参数是信号量,用于控制并发量,防止一下子请求太多,被屏蔽。

拿ts列表跟拿url差不多, 先通过requests拿到所有内容,然后分割,遍历,拿到ts列表

resp = requests.get(real_m3u8_url)
ts_list = list()
for line in resp.strip().split('\n'):
    if not line.startswith('#') and line.endswith('.ts'):
        ts_list.append(line)
print(ts_list)

然后写创建协程任务的逻辑,直接遍历ts列表即可

sem = asyncio.Semaphore(500)
async with aiohttp.ClientSession() as session:
    tasks = [
        asyncio.create_task(decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem))
        for m3u8_ts_url in ts_list
    ]
    await asyncio.wait(tasks)

到这里,基本上就完成了90%的工作了,剩下的就是把所有的ts切片进行合并就可以了

with open("movie.mp4", 'ab+') as fw:
    for ts_line in ts_list:
        ts = ts_line.split('/')[-1]
        with open(f"temp/{ts}", 'rb') as fr:
            fw.write(fr.read())
            print(f"{ts} 分片合并成功")

2. 完整代码

import re
import base64
import requests
import asyncio
import aiohttp
import aiofiles
from lxml import etree
from urllib.parse import urljoin
from Crypto.Cipher import AES


def base_request(url):
    try:
        resp = requests.get(url)
        resp.encoding = 'utf-8'
        return resp.text
    except Exception as e:
        print(f"request {url} failed, and error is {e}")


def get_source_m3u8_url(url_base):
    et = etree.HTML(base_request(url_base))
    try:
        m3u8_play_url = et.xpath('//iframe[@id="mplay"]/@src')[0]
        m3u8_redirect_url = urljoin(url_base, m3u8_play_url)
        m3u8_resp = base_request(m3u8_redirect_url)
        get_m3u8_re = re.compile(r'url: "(?P<m3u8_url>.*?)",', re.S)
        m3u8_url = get_m3u8_re.search(m3u8_resp).group('m3u8_url')
        real_m3u8_url = base_request(m3u8_url)
        real_m3u8_url = real_m3u8_url.strip().split('\n')[-1]
        return urljoin(m3u8_url, real_m3u8_url)
    except Exception as e:
        print(f"没有找到m3u8跳转地址,报错信息如下: {e}")


def get_aes_key(m3u8_url):
    for i in range(3):
        try:
            resp = base_request(m3u8_url)
            get_key_re = re.compile(r'#EXT-X-KEY:.*URI="(?P<encrypt_key>.*?)"', re.S)
            encrypt_key = get_key_re.search(resp).group('encrypt_key')
            return requests.get(encrypt_key).content
        except:
            print(f"获取key错误, 正在尝试第{i+1}次重试")


def get_ts_list(m3u8_url):
    resp = base_request(m3u8_url)
    ts_list = list()
    for line in resp.strip().split('\n'):
        if not line.startswith('#') and line.endswith('.ts'):
            ts_list.append(line)
    return ts_list


async def decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem):
    aes = AES.new(key=encrypt_key, mode=AES.MODE_CBC, IV=b'0000000000000000')
    async with sem:
        for i in range(100):
            try:
                f_ts_name = m3u8_ts_url.split('/')[-1]
                async with aiofiles.open(f"temp/{f_ts_name}", 'wb') as f:
                    async with session.get(m3u8_ts_url) as resp:
                        # 异步读ts
                        content = await resp.content.read()
                    # 解密ts
                    decrypt_ts_content = aes.decrypt(content)
                    # 异步写
                    await f.write(decrypt_ts_content)
                    print(f"{m3u8_ts_url} 保存成功")
                    break
            except Exception as e:
                print(f"{m3u8_ts_url} 下载失败, 失败原因为: {e}")


def merge_ts_to_single(ts_list: list):
    with open("movie.mp4", 'ab+') as fw:
        for ts_line in ts_list:
            ts = ts_line.split('/')[-1]
            with open(f"temp/{ts}", 'rb') as fr:
                fw.write(fr.read())
                print(f"{ts} 分片合并成功")
    print("视频下载完成")


async def main():
    b64_2_url = 'aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw='
    url_base = base64.b64decode(b64_2_url).decode('utf-8')
    m3u8_url = get_source_m3u8_url(url_base)
    print(f"获取的m3u8地址: {m3u8_url}")
    encrypt_key = get_aes_key(m3u8_url)
    print(f"获取用于解密的key: {encrypt_key}")
    ts_list = get_ts_list(m3u8_url)
    print(f"一共有{len(ts_list)}个切片")
    # 设置信号量
    sem = asyncio.Semaphore(500)
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem))
            for m3u8_ts_url in ts_list
        ]
        await asyncio.wait(tasks)
    merge_ts_to_single(ts_list)


if __name__ == '__main__':
    asyncio.run(main())

执行的结果如下:

image-20211101132239323

image-20211101132259372

优化使用协程下载图片的代码

上篇文章中介绍了,通过协程来并发的去下载图片,但是每次只能爬取一个网页下的图片,假如我们有很多的页面呢?那该如何去做呢?下面来改进下

方法一、使用协程创建多个任务

import requests
import asyncio
import aiohttp
import aiofiles
import time
import os
import base64
from lxml import etree


def get_img_urls(url):
    resp = requests.get(url)
    resp.encoding = 'utf-8'
    et = etree.HTML(resp.text)
    res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
    return res


async def save_img(content):
    if not os.path.exists("tmp"):
        os.makedirs("tmp")
    async with aiofiles.open(f"tmp/{time.time()}.jpg", "wb") as f:
        await f.write(content)


async def dowbnload_img(img_url):
    # 遍历图片url列表,下载每一张图片
    async with aiohttp.ClientSession() as session:
        async with session.get(img_url) as img_resp:
            content = await img_resp.content.read()
            await save_img(content)


async def get_content(url):
    async with aiohttp.ClientSession() as session:
        # 解析页面源代码
        async with session.get(url) as resp:
            html = await resp.text('utf-8')
            et = etree.HTML(html)
            # 获取图片url列表
            res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
            ts = [asyncio.create_task(dowbnload_img(img_url=img_url)) for img_url in res]
            await asyncio.wait(ts)


async def main():
    urls = [
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/Xa0AYHXnS8dgDdisVCujJw",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/v-DlbfSpyyXntCzHUUjgyQ",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/RVTLyFDXyJScxk627t4xRg",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/31AhdeS-vlZL6i-l1j2Gow",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/aCGsv64HzTxQz47wwoi2QQ"
    ]
    tasks = [
        asyncio.create_task(get_content(url))
        for url in urls
    ]
    await asyncio.wait(tasks)


if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(end - start)

运行时,请将bXAud2VpeGluLnFxLmNvbQ== 这串base64进行decode

方法二、多进程结合协程

import requests
import asyncio
import aiohttp
import aiofiles
import time
import os
import base64
from lxml import etree
from concurrent.futures import ProcessPoolExecutor


def get_img_urls(url):
    resp = requests.get(url)
    resp.encoding = 'utf-8'
    et = etree.HTML(resp.text)
    res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
    return res


async def save_img(content):
    if not os.path.exists("tmp"):
        os.makedirs("tmp")
    async with aiofiles.open(f"tmp/{time.time()}.jpg", "wb") as f:
        await f.write(content)


async def download_one_img(img_url):
    # 遍历图片url列表,下载每一张图片
    async with aiohttp.ClientSession() as session:
        async with session.get(img_url) as img_resp:
            content = await img_resp.content.read()
            await save_img(content)


async def download_img(img_urls):
    tasks = [
        asyncio.create_task(download_one_img(img_url))
        for img_url in img_urls
    ]
    await asyncio.wait(tasks)


async def main():
    urls = [
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/Xa0AYHXnS8dgDdisVCujJw",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/v-DlbfSpyyXntCzHUUjgyQ",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/RVTLyFDXyJScxk627t4xRg",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/31AhdeS-vlZL6i-l1j2Gow",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/aCGsv64HzTxQz47wwoi2QQ"
    ]
    with ProcessPoolExecutor(5) as pool:
        for url in urls:
            pool.submit(asyncio.run, download_img(url))


if __name__ == '__main__':
    start = time.time()
    main()
    # asyncio.run(main())
    end = time.time()
    print(end - start)

运行时,请将bXAud2VpeGluLnFxLmNvbQ== 这串base64进行decode

经过对比,多进程结合协程,和使用协程进行下载时,所用时间几乎一致,所以这里我们尽量选择使用协程,因为同样效率下,协程更加节省资源。

欢迎各位朋友关注我的公众号,来一起学习进步哦
images


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM