python爬蟲之使用協程爬取小電影


為了防止xxxxxx問題,這里對url進行base64處理一下,如果要訪問需要encode下

本文是結合了兩個案例,來介紹協程,通過這倆案例,我們能對協程有一個更加清晰認識,在閱讀本篇文章前,建議先看之前的兩篇文章。

並發爬取視頻

1. 解析網頁獲取視頻地址

base64 decode之后的url地址為:"aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw="
使用requeests模塊請求,看下網頁源代碼

import requests
import base64

url_base = 'aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw='

url = base64.b64decode(url_base).decode('utf-8')
resp = requests.get(url)
resp.encoding = 'utf-8'
print(resp.text)

拿到網頁源代碼之后,直接搜索關鍵字'm3u8',可以看到源代碼中只有一個地方包含該關鍵字

image-20211101185437971

直接使用xpath拿m3u8

et = etree.HTML(html)
m3u8 = et.xpath('//iframe[@id="mplay"]/@src')[0]
print(m3u8)

但是拿的m3u8地址少了host,所以我們得拼一下,拼接url可以使用urllib模塊中的urljoin,看了下源碼,其實就是內部做了切分,最終拿出domain,然后跟傳入的參數進行拼接。

from urllib.parse import urljoin

m3u8_redirct = urljoin(url, m3u8)
print(m3u8_redirct)

訪問該m3u8,繼續拿源代碼,繼續搜索關鍵字m3u8,發現該關鍵詞在script標簽中,這樣的話,就得需要re模塊,通過正則來捕獲出來了

image-20211101190413564

import re

resp2 = requests.get(m3u8_redirct)
resp2.encoding = 'utf-8'
res2 = resp2.text
get_m3u8_re = re.compile(r'url: "(?P<m3u8_url>.*?)",', re.S)
m3u8_url = get_m3u8_re.search(res2).group('m3u8_url')
print(m3u8_url)

到這里,才拿到了第一個m3u8地址,但不是真正的m3u8地址,需要再次請求

resp3 = requests.get(m3u8_url)
resp3.encoding = 'utf-8'
print(resp3.text)

得到請求結果為:

#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=1000000,RESOLUTION=1280x720
/20200824/7QgAzaOH/1000kb/hls/index.m3u8

需要對該返回結果進行切割,以'\n'換行符為分割,最終目的就是為了得到下面的地址: /20200824/7QgAzaOH/1000kb/hls/index.m3u8

resp3 = requests.get(m3u8_url)
resp3.encoding = 'utf-8'
real_m3u8_url = resp3.text.strip().split('\n')[-1]
print(real_m3u8_url)

但是,拿到的地址,還是少了domain,所以還要在進行拼接

real_m3u8_url = urljoin(m3u8_url, real_m3u8_url)
print(real_m3u8_url)

到這里,我們才真正的拿到真正要請求的m3u8地址,通過請求該地址,可以獲取一個映射,通過該映射,我們可以下載到所有的視頻切片。

resp4 = requests.get(real_m3u8_url)
resp4.encoding = 'utf-8'
print(resp4.text)

返回結果為:

image-20211101191759079

分析該返回結果,可以看到所有的ts文件都已經被加密了,我們需要進行解密,然后在進行下載,解密工具是用Crypto這個模塊中的AES來完成。

進一步分析該內容,我們可以看到加密的key已經放到了里面,#EXT-X-KEY:METHOD=AES-128這個后面的URI就是加密用的key,所以我們直接請求該uri,然后拿到結果即可

resp = requests.get(real_m3u8_url)
get_key_re = re.compile(r'#EXT-X-KEY:.*URI="(?P<encrypt_key>.*?)"', re.S)
encrypt_key = get_key_re.search(resp).group('encrypt_key')
key = requests.get(encrypt_key).content

到這里,終於要寫異步的邏輯了,邏輯很簡單,就是先寫單獨下載ts的代碼,然后在創建一個tasks,最后啟動該任務

import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES

async def decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem):
    aes = AES.new(key=encrypt_key, mode=AES.MODE_CBC, IV=b'0000000000000000')
    async with sem:
        for i in range(100):
            try:
                f_ts_name = m3u8_ts_url.split('/')[-1]
                async with aiofiles.open(f"temp/{f_ts_name}", 'wb') as f:
                    async with session.get(m3u8_ts_url) as resp:
                        # 異步讀ts
                        content = await resp.content.read()
                    # 解密ts
                    decrypt_ts_content = aes.decrypt(content)
                    # 異步寫
                    await f.write(decrypt_ts_content)
                    print(f"{m3u8_ts_url} 保存成功")
                    break
            except Exception as e:
                print(f"{m3u8_ts_url} 下載失敗, 失敗原因為: {e}")

這里需要傳入加密用的key,也就是剛才拿到的key,第二個參數是單個ts地址,第三個參數是一個session, 最后一個參數是信號量,用於控制並發量,防止一下子請求太多,被屏蔽。

拿ts列表跟拿url差不多, 先通過requests拿到所有內容,然后分割,遍歷,拿到ts列表

resp = requests.get(real_m3u8_url)
ts_list = list()
for line in resp.strip().split('\n'):
    if not line.startswith('#') and line.endswith('.ts'):
        ts_list.append(line)
print(ts_list)

然后寫創建協程任務的邏輯,直接遍歷ts列表即可

sem = asyncio.Semaphore(500)
async with aiohttp.ClientSession() as session:
    tasks = [
        asyncio.create_task(decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem))
        for m3u8_ts_url in ts_list
    ]
    await asyncio.wait(tasks)

到這里,基本上就完成了90%的工作了,剩下的就是把所有的ts切片進行合並就可以了

with open("movie.mp4", 'ab+') as fw:
    for ts_line in ts_list:
        ts = ts_line.split('/')[-1]
        with open(f"temp/{ts}", 'rb') as fr:
            fw.write(fr.read())
            print(f"{ts} 分片合並成功")

2. 完整代碼

import re
import base64
import requests
import asyncio
import aiohttp
import aiofiles
from lxml import etree
from urllib.parse import urljoin
from Crypto.Cipher import AES


def base_request(url):
    try:
        resp = requests.get(url)
        resp.encoding = 'utf-8'
        return resp.text
    except Exception as e:
        print(f"request {url} failed, and error is {e}")


def get_source_m3u8_url(url_base):
    et = etree.HTML(base_request(url_base))
    try:
        m3u8_play_url = et.xpath('//iframe[@id="mplay"]/@src')[0]
        m3u8_redirect_url = urljoin(url_base, m3u8_play_url)
        m3u8_resp = base_request(m3u8_redirect_url)
        get_m3u8_re = re.compile(r'url: "(?P<m3u8_url>.*?)",', re.S)
        m3u8_url = get_m3u8_re.search(m3u8_resp).group('m3u8_url')
        real_m3u8_url = base_request(m3u8_url)
        real_m3u8_url = real_m3u8_url.strip().split('\n')[-1]
        return urljoin(m3u8_url, real_m3u8_url)
    except Exception as e:
        print(f"沒有找到m3u8跳轉地址,報錯信息如下: {e}")


def get_aes_key(m3u8_url):
    for i in range(3):
        try:
            resp = base_request(m3u8_url)
            get_key_re = re.compile(r'#EXT-X-KEY:.*URI="(?P<encrypt_key>.*?)"', re.S)
            encrypt_key = get_key_re.search(resp).group('encrypt_key')
            return requests.get(encrypt_key).content
        except:
            print(f"獲取key錯誤, 正在嘗試第{i+1}次重試")


def get_ts_list(m3u8_url):
    resp = base_request(m3u8_url)
    ts_list = list()
    for line in resp.strip().split('\n'):
        if not line.startswith('#') and line.endswith('.ts'):
            ts_list.append(line)
    return ts_list


async def decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem):
    aes = AES.new(key=encrypt_key, mode=AES.MODE_CBC, IV=b'0000000000000000')
    async with sem:
        for i in range(100):
            try:
                f_ts_name = m3u8_ts_url.split('/')[-1]
                async with aiofiles.open(f"temp/{f_ts_name}", 'wb') as f:
                    async with session.get(m3u8_ts_url) as resp:
                        # 異步讀ts
                        content = await resp.content.read()
                    # 解密ts
                    decrypt_ts_content = aes.decrypt(content)
                    # 異步寫
                    await f.write(decrypt_ts_content)
                    print(f"{m3u8_ts_url} 保存成功")
                    break
            except Exception as e:
                print(f"{m3u8_ts_url} 下載失敗, 失敗原因為: {e}")


def merge_ts_to_single(ts_list: list):
    with open("movie.mp4", 'ab+') as fw:
        for ts_line in ts_list:
            ts = ts_line.split('/')[-1]
            with open(f"temp/{ts}", 'rb') as fr:
                fw.write(fr.read())
                print(f"{ts} 分片合並成功")
    print("視頻下載完成")


async def main():
    b64_2_url = 'aHR0cDovL3d3dy53YmR5LnR2L3BsYXkvMzAyODZfMV8xLmh0bWw='
    url_base = base64.b64decode(b64_2_url).decode('utf-8')
    m3u8_url = get_source_m3u8_url(url_base)
    print(f"獲取的m3u8地址: {m3u8_url}")
    encrypt_key = get_aes_key(m3u8_url)
    print(f"獲取用於解密的key: {encrypt_key}")
    ts_list = get_ts_list(m3u8_url)
    print(f"一共有{len(ts_list)}個切片")
    # 設置信號量
    sem = asyncio.Semaphore(500)
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.create_task(decrypt_one_ts(encrypt_key, m3u8_ts_url, session, sem))
            for m3u8_ts_url in ts_list
        ]
        await asyncio.wait(tasks)
    merge_ts_to_single(ts_list)


if __name__ == '__main__':
    asyncio.run(main())

執行的結果如下:

image-20211101132239323

image-20211101132259372

優化使用協程下載圖片的代碼

上篇文章中介紹了,通過協程來並發的去下載圖片,但是每次只能爬取一個網頁下的圖片,假如我們有很多的頁面呢?那該如何去做呢?下面來改進下

方法一、使用協程創建多個任務

import requests
import asyncio
import aiohttp
import aiofiles
import time
import os
import base64
from lxml import etree


def get_img_urls(url):
    resp = requests.get(url)
    resp.encoding = 'utf-8'
    et = etree.HTML(resp.text)
    res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
    return res


async def save_img(content):
    if not os.path.exists("tmp"):
        os.makedirs("tmp")
    async with aiofiles.open(f"tmp/{time.time()}.jpg", "wb") as f:
        await f.write(content)


async def dowbnload_img(img_url):
    # 遍歷圖片url列表,下載每一張圖片
    async with aiohttp.ClientSession() as session:
        async with session.get(img_url) as img_resp:
            content = await img_resp.content.read()
            await save_img(content)


async def get_content(url):
    async with aiohttp.ClientSession() as session:
        # 解析頁面源代碼
        async with session.get(url) as resp:
            html = await resp.text('utf-8')
            et = etree.HTML(html)
            # 獲取圖片url列表
            res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
            ts = [asyncio.create_task(dowbnload_img(img_url=img_url)) for img_url in res]
            await asyncio.wait(ts)


async def main():
    urls = [
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/Xa0AYHXnS8dgDdisVCujJw",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/v-DlbfSpyyXntCzHUUjgyQ",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/RVTLyFDXyJScxk627t4xRg",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/31AhdeS-vlZL6i-l1j2Gow",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/aCGsv64HzTxQz47wwoi2QQ"
    ]
    tasks = [
        asyncio.create_task(get_content(url))
        for url in urls
    ]
    await asyncio.wait(tasks)


if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(end - start)

運行時,請將bXAud2VpeGluLnFxLmNvbQ== 這串base64進行decode

方法二、多進程結合協程

import requests
import asyncio
import aiohttp
import aiofiles
import time
import os
import base64
from lxml import etree
from concurrent.futures import ProcessPoolExecutor


def get_img_urls(url):
    resp = requests.get(url)
    resp.encoding = 'utf-8'
    et = etree.HTML(resp.text)
    res = et.xpath('//img[@class="rich_pages wxw-img js_insertlocalimg"]/@data-src')
    return res


async def save_img(content):
    if not os.path.exists("tmp"):
        os.makedirs("tmp")
    async with aiofiles.open(f"tmp/{time.time()}.jpg", "wb") as f:
        await f.write(content)


async def download_one_img(img_url):
    # 遍歷圖片url列表,下載每一張圖片
    async with aiohttp.ClientSession() as session:
        async with session.get(img_url) as img_resp:
            content = await img_resp.content.read()
            await save_img(content)


async def download_img(img_urls):
    tasks = [
        asyncio.create_task(download_one_img(img_url))
        for img_url in img_urls
    ]
    await asyncio.wait(tasks)


async def main():
    urls = [
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/Xa0AYHXnS8dgDdisVCujJw",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/v-DlbfSpyyXntCzHUUjgyQ",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/RVTLyFDXyJScxk627t4xRg",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/31AhdeS-vlZL6i-l1j2Gow",
        "https://bXAud2VpeGluLnFxLmNvbQ==/s/aCGsv64HzTxQz47wwoi2QQ"
    ]
    with ProcessPoolExecutor(5) as pool:
        for url in urls:
            pool.submit(asyncio.run, download_img(url))


if __name__ == '__main__':
    start = time.time()
    main()
    # asyncio.run(main())
    end = time.time()
    print(end - start)

運行時,請將bXAud2VpeGluLnFxLmNvbQ== 這串base64進行decode

經過對比,多進程結合協程,和使用協程進行下載時,所用時間幾乎一致,所以這里我們盡量選擇使用協程,因為同樣效率下,協程更加節省資源。

歡迎各位朋友關注我的公眾號,來一起學習進步哦
images


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM