Python網絡爬蟲 第四章 多線程+異步協程


一、多線程抓取北京新發地菜價

多線程、多進程和線程池等的概念,我單獨成章了,算到Python基礎知識里面,https://www.cnblogs.com/wkfvawl/p/14729542.html

這里就直接開啟練習,抓取菜價其實在第二章已經講過了,那時候用的是bs4解析的網頁,這里使用xpath配合多線程。

注意到新發地網站菜價表格網頁的url是按照序號遞增的,像第一頁是

http://www.xinfadi.com.cn/marketanalysis/0/list/1.shtml

第二頁是

http://www.xinfadi.com.cn/marketanalysis/0/list/2.shtml

這樣,只需要遍歷構造url即可得到所有需要的網頁鏈接,但如果是單線程一個個的執行必然效率會很低,那就可以試一試多線程。

使用谷歌瀏覽器F12的功能,直接獲取到表格的xpath。

# 1. 如何提取單個頁面的數據
# 2. 上線程池,多個頁面同時抓取
import requests
from lxml import etree
import csv
from concurrent.futures import ThreadPoolExecutor

f = open("data.csv", mode="w", encoding="utf-8")
csvwriter = csv.writer(f)


def download_one_page(url):
    # 拿到頁面源代碼
    resp = requests.get(url)
    html = etree.HTML(resp.text)
    table = html.xpath("/html/body/div[2]/div[4]/div[1]/table")[0]
    # 去掉表頭 下面兩種方法都想
    # trs = table.xpath("./tr")[1:] # 從第1個開始 去掉第0個表頭
    trs = table.xpath("./tr[position()>1]") # 位置大於1
    # 拿到每個tr
    for tr in trs:
        txt = tr.xpath("./td/text()") # tr中找td td中找文本
        # 對數據做簡單的處理: \\  / 去掉
        txt = (item.replace("\\", "").replace("/", "") for item in txt)
        # 把數據存放在文件中
        csvwriter.writerow(txt)
    print(url, "提取完畢!")


if __name__ == '__main__':
    # for i in range(1, 14870):  # 效率及其低下
    #     download_one_page(f"http://www.xinfadi.com.cn/marketanalysis/0/list/{i}.shtml")

    # 創建線程池 50個線程
    with ThreadPoolExecutor(50) as t:
        for i in range(1, 200):  # 199 * 20 = 3980
            # 把下載任務提交給線程池
            t.submit(download_one_page, f"http://www.xinfadi.com.cn/marketanalysis/0/list/{i}.shtml")

    print("全部下載完畢!")

二、協程

協程是並發編程里面很重要的概念,感覺如果要真正弄明白,可能需要完完整整寫一章博客,這里就先簡單介紹一些基本概念和應用。

協程能夠更加⾼效的利⽤CPU,其實, 我們能夠⾼效的利⽤多線程來完成爬⾍其實已經很6了。但是,從某種⻆度講, 線程的執⾏效率真的就⽆敵了么? 我們真的充分的利⽤CPU資源了么? ⾮也~ ⽐如, 我們來看下⾯這個例⼦。我們單獨的⽤⼀個線程來完成某⼀個操作,看看它的效率是否真的能把CPU完全利⽤起來。

import time
def func():
 print("我愛黎明")
 time.sleep(3)
 print("我真的愛黎明")
func()

各位請看,在該程序中, 我們的func()實際在執⾏的時候⾄少需要3秒的時間來完成操作,中間的三秒鍾需要讓我當前的線程處於阻塞狀態。阻塞狀態的線程 CPU是不會來執⾏的,那么此時cpu很可能會切換到其他程序上去執⾏。此時, 對於你來說, CPU其實並沒有為你⼯作(在這三秒內), 那么我們能不能通過某種⼿段, 讓CPU⼀直為我⽽⼯作,盡量的不要去管其他⼈。

我們要知道CPU⼀般拋開執⾏周期不談,如果⼀個線程遇到了IO操作, CPU就會⾃動的切換到其他線程進⾏執⾏. 那么, 如果我想辦法讓我的線程遇到了IO操作就掛起, 留下的都是運算操作. 那CPU是不是就會⻓時間的來照顧我~.
以此為⽬的, 偉⼤的程序員就發明了⼀個新的執⾏過程. 當線程中遇到了IO操作的時候, 將線程中的任務進⾏切換, 切換成⾮ IO操作. 等原來的IO執⾏完了. 再恢復回原來的任務中。

這里來看一個協程程序

import asyncio
import time

async def func1():
    print("你好啊, 我叫test1")
    time.sleep(3)  # 當程序出現了同步操作的時候. 異步就中斷了
    print("你好啊, 我叫test1")


async def func2():
    print("你好啊, 我叫test2")
    time.sleep(2)
    print("你好啊, 我叫test2")


async def func3():
    print("你好啊, 我叫test3")
    time.sleep(4)
    print("你好啊, 我叫test3")


if __name__ == '__main__':
    f1 = func1()
    f2 = func2()
    f3 = func3()
    # 任務列表
    tasks = [
        f1, f2, f3
    ]
    t1 = time.time()
    # 一次性啟動多個任務(協程)
    asyncio.run(asyncio.wait(tasks))
    t2 = time.time()
    print(t2 - t1)

 

 

 運行的結果並沒有如同協程定義那樣,產生異步效果,反而是同步的?這是因為里面的time.sleep()是同步操作,導致異步中斷了,正確的寫法應該是這樣:

import asyncio
import time

async def func1():
    print("你好啊, 我叫test1")
    await asyncio.sleep(3)  # 異步操作的代碼 await掛起
    print("你好啊, 我叫test1")

async def func2():
    print("你好啊, 我叫test2")
    await asyncio.sleep(2)
    print("你好啊, 我叫test2")

async def func3():
    print("你好啊, 我叫test3")
    await asyncio.sleep(4)
    print("你好啊, 我叫test3")

async def main():
    # 第一種寫法
    # f1 = func1()
    # await f1  # 一般await掛起操作放在協程對象前面
    # 第二種寫法(推薦)
    # tasks = [
    #     func1(),
    #     func2(),
    #     func3()
    # ]
    tasks = [
        asyncio.create_task(func1()),  # py3.8以后加上asyncio.create_task()
        asyncio.create_task(func2()),
        asyncio.create_task(func3())
    ]
    await asyncio.wait(tasks)


if __name__ == '__main__':
    t1 = time.time()
    # 一次性啟動多個任務(協程)
    asyncio.run(main())
    t2 = time.time()
    print(t2 - t1)

 從程序運行時間上來看利用異步協程直接從9秒減少到了4秒。這里需要asyncio的支持。

關於asyncio的介紹參考https://www.liaoxuefeng.com/wiki/1016959663602400/1017970488768640

await關鍵詞。異步io的關鍵在於,await io操作,此時,當前攜程就會被掛起,時間循環轉而執行其他攜程,但是要注意前面這句話,並不是說所有攜程里的await都會導致當前攜程的掛起,要看await后面跟的是什么,如果跟的是我們定義的攜程,則會執行這個攜程,如果是asyncio模塊制作者定義的固有攜程,比如模擬io操作的asyncio.sleep,以及io操作,比如網絡io:asyncio.open_connection這些,才會掛起當前攜程。

三、aiohttp模塊應用

前面我們使用asyncio來實現了異步協程,那我們該如何將異步協程應用到爬蟲上呢?其實爬蟲在連接到要爬取的網頁上的過程,也是一個類似IO的過程,這里介紹一下aiohttp,是一個用於asyncio和Python的異步HTTP客戶端/服務器。

以第二章講過的唯美壁紙網站為例。之前同步時候用的requests ,換成了異步操作的aiohttp。

import asyncio
import aiohttp

urls = [
    "http://kr.shanghai-jiuxin.com/file/2020/1031/191468637cab2f0206f7d1d9b175ac81.jpg",
    "http://kr.shanghai-jiuxin.com/file/2020/1031/563337d07af599a9ea64e620729f367e.jpg",
    "http://kr.shanghai-jiuxin.com/file/2020/1031/774218be86d832f359637ab120eba52d.jpg"
]

async def aiodownload(url):
    # 發送請求.
    # 得到圖片內容
    # 保存到文件
    name = url.rsplit("/", 1)[1]  # 從右邊切, 切一次. 得到[1]位置的內容
    # 加with 上下文管理器
    # s = aiohttp.ClientSession() <==> requests.session()
    async with aiohttp.ClientSession() as session:  # requests
        async with session.get(url) as resp:  # resp = requests.get()
            # 請求回來了. 寫入文件
            # 可以自己去學習一個模塊, aiofiles
            with open(name, mode="wb") as f:  # 創建文件
                f.write(await resp.content.read())  # 讀取內容是異步的. 需要await掛起, resp.text()

    print(name, "搞定")

async def main():
    # tasks列表
    tasks = []
    for url in urls:
        tasks.append(aiodownload(url))
    await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())

這個程序還有待改進空間的,創建文件寫文件也是一個IO操作,也是可以異步的,要引入aiofiles這個后面會講。

四、利用協程下載小說

這次我們下載百度小說上的《西游記》。http://dushu.baidu.com/pc/detail?gid=4306063500

 

 

 F12抓包,找到了每一章節的名稱和cid

http://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4306063500"}

經歷了之前的實踐,是不是感覺這次的url優點奇怪?date后面是一個json?

接着為了獲取每個章節里面的內容,點開一章,發現內容存在於http://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4306063500","cid":"4306063500|11348571","need_bookinfo":1}中

通過更換cid我們就能很輕松的獲取到其他章節的內容了。

 

 

 在編寫程序之前,先要清楚我們需要做什么工作?

其實這是一個同步異步相結合的工作

  • 1. 同步操作: 訪問getCatalog 拿到所有章節的cid和名稱
  • 2. 異步操作: 訪問getChapterContent 下載所有的文章內容
import requests
import asyncio
import aiohttp
import aiofiles
import json

async def aiodownload(cid, b_id, title):
    data = {
        "book_id": b_id,
        "cid": f"{b_id}|{cid}",
        "need_bookinfo": 1
    }
    # 轉成json
    data = json.dumps(data)
    url = f"http://dushu.baidu.com/api/pc/getChapterContent?data={data}"

    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            dic = await resp.json()

            async with aiofiles.open(title, mode="w", encoding="utf-8") as f:
                await f.write(dic['data']['novel']['content'])  # 把小說內容寫出


async def getCatalog(url):
    resp = requests.get(url)
    # 取json
    dic = resp.json()
    tasks = []
    for item in dic['data']['novel']['items']:  # item就是對應每一個章節的名稱和cid
        title = './novel/' + item['title'] + '.txt'
        cid = item['cid']
        # 准備異步任務
        tasks.append(aiodownload(cid, b_id, title))
    await asyncio.wait(tasks)


if __name__ == '__main__':
    b_id = "4306063500"
    url = 'http://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"' + b_id + '"}'
    asyncio.run(getCatalog(url))

爬蟲程序運行速度極快!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM