引入
很多同學對於異步這個概念只是停留在了“聽說很NB”的認知層面上,很少有人能夠在項目中真正的使用異步實現高性能的相關操作。本節課,咱們就一起來學習一下,爬蟲中如何使用異步實現高性能的數據爬取操作。
背景
其實爬蟲的本質就是client發請求批量獲取server的響應數據,如果我們有多個url待爬取,只用一個線程且采用串行的方式執行,那只能等待爬取一個結束后才能繼續下一個,效率會非常低。需要強調的是:對於單線程下串行N個任務,並不完全等同於低效,如果這N個任務都是純計算的任務,那么該線程對cpu的利用率仍然會很高,之所以單線程下串行多個爬蟲任務低效,是因為爬蟲任務是明顯的IO密集型(阻塞)程序。那么該如何提高爬取性能呢?
分析處理
同步調用:即提交一個任務后就在原地等待任務結束,等到拿到任務的結果后再繼續下一行代碼,效率低下:
import requests def parse_page(res): print('解析 %s' %(len(res))) def get_page(url): print('下載 %s' %url) response=requests.get(url) if response.status_code == 200: return response.text urls = [ 'http://xmdx.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10231.rar', 'http://zjlt.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10229.rar', 'http://xmdx.sc.chinaz.net/Files/DownLoad/jianli/201904/jianli10231.rar' ] for url in urls: res=get_page(url) #調用一個任務,就在原地等待任務結束拿到結果后才繼續往后執行 parse_page(res)
解決同步調用方案之多線程/多進程(不建議使用)
好處:在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每個連接都擁有獨立的線程(或進程),這樣任何一個連接的阻塞都不會影響其他的連接。
弊端:開啟多進程或都線程的方式,我們是無法無限制地開啟多進程或多線程的:在遇到要同時處理成百上千個的連接請求時,則無論多線程還是多進程都會嚴重占據系統資源,降低系統對外界響應效率,而且線程與進程本身也更容易進入假死狀態。
解決同步調用方案之線程/進程池(適當使用)
好處:很多程序員可能會考慮使用“線程池”或“連接池”。“線程池”旨在減少創建和銷毀線程的頻率,其維持一定合理數量的線程,並讓空閑的線程重新承擔新的執行任務。可以很好的降低系統開銷。
弊端:“線程池”和“連接池”技術也只是在一定程度上緩解了頻繁創建和銷毀線程帶來的資源占用。而且,所謂“池”始終有其上限,當請求大大超過上限時,“池”構成的系統對外界的響應並不比沒有池的時候效果好多少。所以使用“池”必須考慮其面臨的響應規模,並根據響應規模調整“池”的大小。
案例:
對比同步和使用線程池的執行效率
#同步執行 import time def sayhello(str): print("Hello ",str) time.sleep(2) name_list =['xiaozi','aa','bb','cc'] start_time = time.time() for i in range(len(name_list)): sayhello(name_list[i]) print('%d second'% (time.time()-start_time))
執行結果:
Hello xiaozi
Hello aa
Hello bb
Hello cc
8 second
# 異步基於線程池 import time from multiprocessing.dummy import Pool def say_hello(str): print("Hello ", str) time.sleep(2) start = time.time() name_list = ['xiaozi', 'aa', 'bb', 'cc'] # 實例化線程池對象,開啟了4個線程 pool = Pool(4) pool.map(say_hello, name_list) pool.close() pool.join() end = time.time() print(end - start)
執行結果:
Hello xiaozi
Hello aa
Hello bb
Hello cc
2.0805933475494385
基於multiprocessing.dummy線程池爬取梨視頻的視頻信息
import re import random import requests from lxml import etree from fake_useragent import UserAgent # 安裝fake-useragent庫:pip install fake-useragent # 導入線程池模塊 from multiprocessing.dummy import Pool # 實例化線程池對象 pool = Pool() url = 'http://www.pearvideo.com/category_1' # 隨機產生UA ua = UserAgent().random headers = { 'User-Agent': ua } # 獲取首頁頁面數據 page_text = requests.get(url=url, headers=headers).text # 對獲取的首頁頁面數據中的相關視頻詳情鏈接進行解析 tree = etree.HTML(page_text) li_list = tree.xpath('//div[@id="listvideoList"]/ul/li') detail_urls = [] # 存儲二級頁面的url for li in li_list: detail_url = 'http://www.pearvideo.com/' + li.xpath('./div/a/@href')[0] title = li.xpath('.//div[@class="vervideo-title"]/text()')[0] detail_urls.append(detail_url) vedio_urls = [] # 存儲視頻的url for url in detail_urls: page_text = requests.get(url=url, headers=headers).text vedio_url = re.findall('srcUrl="(.*?)"', page_text, re.S)[0] vedio_urls.append(vedio_url) # 使用線程池進行視頻數據下載 func_request = lambda link: requests.get(url=link, headers=headers).content video_data_list = pool.map(func_request, vedio_urls) # 使用線程池進行視頻數據保存 func_saveData = lambda data: save(data) pool.map(func_saveData, video_data_list) def save(data): file_name = str(random.randint(1, 10000)) + '.mp4' with open(file_name, 'wb') as fp: fp.write(data) print(file_name + '已存儲') pool.close() pool.join()
總結:對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,“線程池”或“連接池”或許可以緩解部分壓力,但是不能解決所有問題。總之,多線程模型可以方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,可以用非阻塞接口來嘗試解決這個問題。
異步IO
上一小節中無論哪種解決方案其實沒有解決一個性能相關的問題:IO阻塞,無論是多進程還是多線程,在遇到IO阻塞時都會被操作系統強行剝奪走CPU的執行權限,程序的執行效率因此就降低了下來。
解決這一問題的關鍵在於,我們自己從應用程序級別檢測IO阻塞然后切換到我們自己程序的其他任務執行,這樣把我們程序的IO降到最低,我們的程序處於就緒態就會增多,以此來迷惑操作系統,操作系統便以為我們的程序是IO比較少的程序,從而會盡可能多的分配CPU給我們,這樣也就達到了提升程序執行效率的目的。
在python3.4之后新增了asyncio模塊,可以幫我們檢測IO阻塞,然后實現異步IO。注意:asyncio只能發tcp級別的請求,不能發http協議。
什么是異步IO
所謂「異步 IO」,就是你發起一個 IO阻塞 操作,卻不用等它結束,你可以繼續做其他事情,當它結束時,你會得到通知。
實現異步IO的方式
單線程+異步協程實現異步IO操作
異步協程用法
從 Python 3.4 開始,Python 中加入了協程的概念,但這個版本的協程還是以生成器對象為基礎的,在 Python 3.5 則增加了 async/await,使得協程的實現更加方便。首先我們需要了解下面幾個概念:
event_loop:事件循環,相當於一個無限循環,我們可以把一些函數注冊到這個事件循環上,當滿足某些條件的時候,函數就會被循環執行。程序是按照設定的順序從頭執行到尾,運行的次數也是完全按照設定。當在編寫異步程序時,必然其
中有部分程序的運行耗時是比較久的,需要先讓出當前程序的控制權,讓其在背后運行,讓另一部分的程序先運行起來。當背后運行的程序完成后,也需要及時通知主程序已經完成任務可以進行下一步操作,但這個過程所需的時間是不確定的,
需要主程序不斷的監聽狀態,一旦收到了任務完成的消息,就開始進行下一步。loop就是這個持續不斷的監視器。
coroutine:中文翻譯叫協程,在 Python 中常指代為協程對象類型,我們可以將協程對象注冊到事件循環中,它會被事件循環調用。我們可以使用 async 關鍵字來定義一個方法,這個方法在調用時不會立即被執行,而是返回一個協程對象。
task:任務,它是對協程對象的進一步封裝,包含了任務的各個狀態。
future:代表將來執行或還沒有執行的任務,實際上和 task 沒有本質區別。
另外我們還需要了解 async/await 關鍵字,它是從 Python 3.5 才出現的,專門用於定義協程。其中,async 定義一個協程,await 用來掛起阻塞方法的執行。
定義一個協程:
import asyncio
async def execute(x): print('Number:', x) # 獲取了一個協程對象 coroutine = execute(1) print('Coroutine:', coroutine) print('After calling execute') # 創建一個事件循環對象 loop = asyncio.get_event_loop() # 將協程對象注冊到事件循環中,然后啟動事件循環對象 loop.run_until_complete(coroutine) print('After calling loop')
運行結果:
Coroutine: <coroutine object execute at 0x1034cf830>
After calling execute
Number: 1 After calling loop
首先我們引入了 asyncio 這個包,這樣我們才可以使用 async 和 await,然后我們使用 async 定義了一個 execute() 方法,方法接收一個數字參數,方法執行之后會打印這個數字。
隨后我們直接調用了這個方法,然而這個方法並沒有執行,而是返回了一個 coroutine 協程對象。隨后我們使用 get_event_loop() 方法創建了一個事件循環 loop,並調用了 loop 對象的 run_until_complete() 方法將協程注冊到事件循環 loop 中,然后啟動。最后我們才看到了 execute() 方法打印了輸出結果。
可見,async 定義的方法就會變成一個無法直接執行的 coroutine 對象,必須將其注冊到事件循環中才可以執行。
task的使用:
上文我們還提到了 task,它是對 coroutine 對象的進一步封裝,它里面相比 coroutine 對象多了運行狀態,比如 running、finished 等,我們可以用這些狀態來獲取協程對象的執行情況。
在上面的例子中,當我們將 coroutine 對象傳遞給 run_until_complete() 方法的時候,實際上它進行了一個操作就是將 coroutine 封裝成了 task 對象,我們也可以顯式地進行聲明,如下所示:
# task的使用 import asyncio async def execute(x): print('Number:', x) return x coroutine = execute(1) print('Coroutine:', coroutine) print('After calling execute') loop = asyncio.get_event_loop() # 對協程進行進一步的封裝,封裝到了task對象中 task = loop.create_task(coroutine) print('Task:', task) # 將task對象注冊到事件循環中 loop.run_until_complete(task) print('Task:', task) print('After calling loop')
運行結果:
Coroutine: <coroutine object execute at 0x10e0f7830>
After calling execute
Task: <Task pending coro=<execute() running at demo.py:4>> Number: 1 Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1> After calling loop
ensure_future()的使用
另外定義 task 對象還有一種方式,就是直接通過 asyncio 的 ensure_future() 方法,返回結果也是 task 對象,這樣的話我們就可以不借助於 loop 來定義,即使我們還沒有聲明 loop 也可以提前定義好 task 對象,寫法如下:
import asyncio async def execute(x): print('Number:', x) return x coroutine = execute(1) print('Coroutine:', coroutine) print('After calling execute') # 創建一個future對象 task = asyncio.ensure_future(coroutine) print('Task:', task) loop = asyncio.get_event_loop() # 將future對象注冊到事件循環中 loop.run_until_complete(task) print('Task:', task) print('After calling loop')
綁定回調
也可以為某個 task 綁定一個回調方法,來看下面的例子:
import asyncio import requests async def request(): url = 'https://www.baidu.com' status = requests.get(url).status_code return status def callback(task): print('Status:', task.result()) coroutine = request() task = asyncio.ensure_future(coroutine) # 給任務對象綁定一個回調函數 task.add_done_callback(callback) print('Task:', task) loop = asyncio.get_event_loop() loop.run_until_complete(task) print('Task:', task)
運行結果:
Task: <Task pending coro=<request() running at demo.py:5> cb=[callback() at demo.py:11]>
Status: <Response [200]> Task: <Task finished coro=<request() done, defined at demo.py:5> result=<Response [200]>>
在這里我們定義了一個 request() 方法,請求了百度,返回狀態碼,但是這個方法里面我們沒有任何 print() 語句。隨后我們定義了一個 callback() 方法,這個方法接收一個參數,是 task 對象,然后調用 print() 方法打印了 task 對象的結果。這樣我們就定義好了一個 coroutine 對象和一個回調方法,我們現在希望的效果是,當 coroutine 對象執行完畢之后,就去執行聲明的 callback() 方法。
那么它們二者怎樣關聯起來呢?很簡單,只需要調用 add_done_callback() 方法即可,我們將 callback() 方法傳遞給了封裝好的 task 對象,這樣當 task 執行完畢之后就可以調用 callback() 方法了,同時 task 對象還會作為參數傳遞給 callback() 方法,調用 task 對象的 result() 方法就可以獲取返回結果了。
多任務異步協程
如果我們想執行多次請求應該怎么辦呢?我們可以定義一個 task 列表,然后使用 asyncio 的 wait() 方法即可執行。
import asyncio import time async def request(url): print('正在下載',url) # 由於time為非異步模塊,在此處如果存在非異步操作代碼,則會徹底讓asyncio失去異步的效果 time.sleep(2) print('下載完畢',url) start = time.time() urls = [ 'www.baidu.com', 'www.sogou.com', 'www.goubanjia.com' ] tasks = [] # 任務列表:存放多個任務對象 for url in urls: c = request(url) task = asyncio.ensure_future(c) tasks.append(task) loop = asyncio.get_event_loop() # 將多個任務對象對應的列表注冊到事件循環中,注意這里必須要再封裝一層asyncio.wait loop.run_until_complete(asyncio.wait(tasks)) print(time.time()-start)
這里我們使用一個 for 循環創建了五個 task,組成了一個列表,然后把這個列表首先傳遞給了 asyncio 的 wait() 方法,然后再將其注冊到時間循環中,就可以發起五個任務了。最后我們再將任務的運行結果輸出出來,運行結果如下:
正在下載 www.baidu.com
下載完畢 www.baidu.com
正在下載 www.sogou.com
下載完畢 www.sogou.com
正在下載 www.goubanjia.com
下載完畢 www.goubanjia.com
6.009262800216675
上述案例中確實已經實現了多任務協程,但是說好的異步呢?注意:在實現異步環節的編碼中不可以出現非異步模塊的代碼,否則就無法實現真正的異步了。上述案例中的time.sleep就是非異步模塊中的代碼。因此改寫成:
import asyncio import time async def request(url): print('正在下載',url) # 在異步協程中如果出現了同步模塊相關的代碼,那么就無法實現異步。 # time.sleep(2) # 當在asyncio中遇到阻塞操作必須進行手動掛起, 使用await關鍵字來實現 await asyncio.sleep(2) print('下載完畢',url) start = time.time() urls = [ 'www.baidu.com', 'www.sogou.com', 'www.goubanjia.com' ] #任務列表:存放多個任務對象 stasks = [] for url in urls: c = request(url) task = asyncio.ensure_future(c) stasks.append(task) loop = asyncio.get_event_loop() #需要將任務列表封裝到wait中 loop.run_until_complete(asyncio.wait(stasks)) print(time.time()-start)
執行結果:
正在下載 www.baidu.com
正在下載 www.sogou.com
正在下載 www.goubanjia.com
下載完畢 www.baidu.com
下載完畢 www.sogou.com
下載完畢 www.goubanjia.com
2.0058767795562744
實現了真正的多任務異步協程!
將多任務異步操作應用到爬蟲中
接下來,咱們就可以嘗試的將多任務異步協程應用到爬蟲中進行試驗,看是否能夠實現多任務異步爬蟲?為了表現出協程的優勢,我們需要先創建一個合適的實驗環境,最好的方法就是模擬一個需要等待一定時間才可以獲取返回結果的網頁,在本地模擬一個慢速服務器,這里我們選用 Flask。
服務器代碼如下:
from flask import Flask import time app = Flask(__name__) @app.route('/tiger') def index_tiger(): time.sleep(2) return 'Hello tiger' @app.route('/jay') def index_jay(): time.sleep(2) return 'Hello jay' @app.route('/tom') def index_tom(): time.sleep(2) return 'Hello tom' if __name__ == '__main__': app.run(threaded=True)
接下來,將多任務異步協程操作應用在爬蟲上:
import requests import asyncio import time start = time.time() urls = [ 'http://127.0.0.1:5000/tiger', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom' ] async def get_page(url): print('正在下載', url) # 之所有沒有實現異步操作,是因為requests模塊是一個非異步的模塊 response = requests.get(url=url) print('下載完畢:', response.text) tasks = [] for url in urls: c = get_page(url) task = asyncio.ensure_future(c) tasks.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('總耗時:', end-start)
運行結果如下:
正在下載 http://127.0.0.1:5000/tiger
下載完畢: Hello tiger
正在下載 http://127.0.0.1:5000/jay 下載完畢: Hello jay 正在下載 http://127.0.0.1:5000/tom 下載完畢: Hello tom 總耗時: 6.045619249343872
可以發現和正常的請求並沒有什么兩樣,依然還是順次執行的,耗時 6 秒,平均一個請求耗時 2 秒,說好的異步處理呢?
原因在於requests模塊是非異步模塊,要想實現真正的異步必須使用基於異步的網絡請求模塊所以這里就需要 aiohttp 派上用場了。
aiohttp
簡介
aiohttp
可以實現單線程並發IO操作。
環境安裝
pip install aiohttp
aiohttp使用
發起請求
async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('https://www.baidu.com') as resposne: print(await resposne.text()) loop = asyncio.get_event_loop() tasks = [fetch(),] loop.run_until_complete(asyncio.wait(tasks))
添加請求參數
params = {'key': 'value', 'page': 10} async def fetch(): async with aiohttp.ClientSession() as session: async with session.get('https://www.baidu.com/s',params=params) as resposne: print(await resposne.url) loop = asyncio.get_event_loop() tasks = [fetch(),] loop.run_until_complete(asyncio.wait(tasks))
UA偽裝
url = 'http://httpbin.org/user-agent' headers = {'User-Agent': 'test_user_agent'} async def fetch(): async with aiohttp.ClientSession() as session: async with session.get(url,headers=headers) as resposne: print(await resposne.text()) loop = asyncio.get_event_loop() tasks = [fetch(),] loop.run_until_complete(asyncio.wait(tasks))
自定義cookies
url = 'http://httpbin.org/cookies'
cookies = {'cookies_name': 'test_cookies'} async def fetch(): async with aiohttp.ClientSession() as session: async with session.get(url,cookies=cookies) as resposne: print(await resposne.text()) loop = asyncio.get_event_loop() tasks = [fetch(),] loop.run_until_complete(asyncio.wait(tasks))
post請求參數
url = 'http://httpbin.org'
payload = {'username': 'zhang', 'password': '123456'} async def fetch(): async with aiohttp.ClientSession() as session: async with session.post(url, data=payload) as resposne: print(await resposne.text()) loop = asyncio.get_event_loop() tasks = [fetch(), ] loop.run_until_complete(asyncio.wait(tasks))
設置代理
url = "http://python.org"
async def fetch(): async with aiohttp.ClientSession() as session: async with session.get(url, proxy="http://some.proxy.com") as resposne: print(resposne.status) loop = asyncio.get_event_loop() tasks = [fetch(), ] loop.run_until_complete(asyncio.wait(tasks))
異步IO處理
# 環境安裝:pip install aiohttp # 使用該模塊中的ClientSession import requests import asyncio import time import aiohttp start = time.time() urls = [ 'http://127.0.0.1:5000/tiger','http://127.0.0.1:5000/jay','http://127.0.0.1:5000/tom', 'http://127.0.0.1:5000/tiger', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', 'http://127.0.0.1:5000/tiger', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', 'http://127.0.0.1:5000/tiger', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', ] async def get_page(url): async with aiohttp.ClientSession() as session: #get()、post(): #headers,params/data,proxy='http://ip:port' async with await session.get(url) as response: #text()返回字符串形式的響應數據 #read()返回的二進制形式的響應數據 #json()返回的就是json對象 #注意:獲取響應數據操作之前一定要使用await進行手動掛起 page_text = await response.text() print(page_text) tasks = [] for url in urls: c = get_page(url) task = asyncio.ensure_future(c) tasks.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('總耗時:',end-start)

# 使用aiohttp替代requests模塊 import time import asyncio import aiohttp async def get_page(url): async with aiohttp.ClientSession() as session: # 只要有耗時就會有阻塞,就得使用await進行掛起操作 async with await session.get(url=url) as response: page_text = await response.text() # 二進制read()/json() print('響應數據', page_text) start = time.time() urls = [ 'http://127.0.0.1:5000/tiger', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', ] loop = asyncio.get_event_loop() tasks = [] for url in urls: cone = get_page(url) task = asyncio.ensure_future(cone) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print('總耗時: ', time.time()-start)
在這里我們將請求庫由 requests 改成了 aiohttp,通過 aiohttp 的 ClientSession 類的 get() 方法進行請求,結果如下:
Hello tom
Hello jay
Hello tiger
Hello tiger
Hello jay
Hello tiger
Hello tom
Hello jay
Hello jay
Hello tom
Hello tom
Hello tiger
總耗時: 2.037203073501587
成功了!我們發現這次請求的耗時由 6秒變成了 2 秒,耗時直接變成了原來的 1/3。
代碼里面我們使用了 await,后面跟了 get() 方法,在執行這五個協程的時候,如果遇到了 await,那么就會將當前協程掛起,轉而去執行其他的協程,直到其他的協程也掛起或執行完畢,再進行下一個協程的執行。
開始運行時,時間循環會運行第一個 task,針對第一個 task 來說,當執行到第一個 await 跟着的 get() 方法時,它被掛起,但這個 get() 方法第一步的執行是非阻塞的,掛起之后立馬被喚醒,所以立即又進入執行,創建了 ClientSession 對象,接着遇到了第二個 await,調用了 session.get() 請求方法,然后就被掛起了,由於請求需要耗時很久,所以一直沒有被喚醒,好第一個 task 被掛起了,那接下來該怎么辦呢?事件循環會尋找當前未被掛起的協程繼續執行,於是就轉而執行第二個 task 了,也是一樣的流程操作,直到執行了第五個 task 的 session.get() 方法之后,全部的 task 都被掛起了。所有 task 都已經處於掛起狀態,那咋辦?只好等待了。3 秒之后,幾個請求幾乎同時都有了響應,然后幾個 task 也被喚醒接着執行,輸出請求結果,最后耗時,3 秒!
怎么樣?這就是異步操作的便捷之處,當遇到阻塞式操作時,任務被掛起,程序接着去執行其他的任務,而不是傻傻地等着,這樣可以充分利用 CPU 時間,而不必把時間浪費在等待 IO 上。
可見,使用了異步協程之后,我們幾乎可以在相同的時間內實現成百上千倍次的網絡請求,把這個運用在爬蟲中,速度提升可謂是非常可觀了。
如何實現數據解析--任務的綁定回調機制
import time import asyncio import aiohttp # 回調函數: 主要用來解析響應數據 def callback(task): print('This is callback') # 獲取響應數據 page_text = task.result() print("接下來就可以在回調函數中實現數據解析") async def get_page(url): async with aiohttp.ClientSession() as session: # 只要有耗時就會有阻塞,就得使用await進行掛起操作 async with await session.get(url=url) as response: page_text = await response.text() # 二進制read()/json() print('響應數據', page_text) return page_text start = time.time() urls = [ 'http://127.0.0.1:5000/tiger', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', ] loop = asyncio.get_event_loop() tasks = [] for url in urls: cone = get_page(url) task = asyncio.ensure_future(cone) # 給任務對象綁定回調函數用於解析響應數據 task.add_done_callback(callback) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print('總耗時: ', time.time()-start)