線程池(適當使用)
import re import requests from lxml import etree from multiprocessing.dummy import Pool import random headers = { 'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36' } def request_video(url): return requests.get(url=url,headers=headers).content def saveVideo(data): name = str(random.randint(0,9999))+'.mp4' with open(name,'wb') as fp: fp.write(data) print(name,'下載存儲成功!!!') url = 'https://www.pearvideo.com/category_1' page_text = requests.get(url=url,headers=headers).text tree = etree.HTML(page_text) li_list = tree.xpath('//ul[@id="listvideoListUl"]/li') #實例化一個線程池對象 pool = Pool(4) video_url_list = [] #所有的視頻連接 for li in li_list: detail_url = 'https://www.pearvideo.com/'+li.xpath('./div/a/@href')[0] detail_page_text = requests.get(url=detail_url,headers=headers).text ex = 'srcUrl="(.*?)",vdoUrl=' video_url = re.findall(ex,detail_page_text,re.S)[0] video_url_list.append(video_url) #異步的獲取4個視頻的二進制數據 video_data_list = pool.map(request_video,video_url_list) #進行視頻的持久化存儲 pool.map(saveVideo,video_data_list)
單線程+異步協程(推薦)
-
event_loop:事件循環,相當於一個無限循環,我們可以把一些函數注冊到這個事件循環上,當滿足某些條件的時候,函數就會被循環執行。程序是按照設定的順序從頭執行到尾,運行的次數也是完全按照設定。當在編寫異步程序時,必然其中有部分程序的運行耗時是比較久的,需要先讓出當前程序的控制權,讓其在背后運行,讓另一部分的程序先運行起來。當背后運行的程序完成后,也需要及時通知主程序已經完成任務可以進行下一步操作,但這個過程所需的時間是不確定的,需要主程序不斷的監聽狀態,一旦收到了任務完成的消息,就開始進行下一步。loop就是這個持續不斷的監視器。
-
coroutine:中文翻譯叫協程,在 Python 中常指代為協程對象類型,我們可以將協程對象注冊到事件循環中,它會被事件循環調用。我們可以使用 async 關鍵字來定義一個方法,這個方法在調用時不會立即被執行,而是返回一個協程對象。
-
task:任務,它是對協程對象的進一步封裝,包含了任務的各個狀態。
-
future:代表將來執行或還沒有執行的任務,實際上和 task 沒有本質區別。
-
另外我們還需要了解 async/await 關鍵字,它是從 Python 3.5 才出現的,專門用於定義協程。其中,async 定義一個協程,await 用來掛起阻塞方法的執行。
基本使用
#基本使用 import asyncio async def hello(name): print('hello to :',name) #獲取了一個協程對象 c = hello('bobo') #創建一個事件循環對象 loop = asyncio.get_event_loop() #將協程對象注冊到事件循環中,然后啟動事件循環對象 loop.run_until_complete(c)
task的使用
#task的使用 import asyncio async def hello(name): print('hello to :',name) #獲取了一個協程對象 c = hello('bobo') #創建一個事件循環對象 loop = asyncio.get_event_loop() #就協程進行進一步的封裝,封裝到了task對象中 task = loop.create_task(c) print(task) loop.run_until_complete(task) print(task)
future
#future import asyncio async def hello(name): print('hello to :',name)
c = hello('bobo') task = asyncio.ensure_future(c) loop.run_until_complete(task)
綁定回調(task)
import asyncio def callback(task): print('i am callback:',task.result()) async def hello(name): print('hello to :',name) return name c = hello('bobo') task = asyncio.ensure_future(c) #給任務對象綁定一個回調函數 task.add_done_callback(callback) loop.run_until_complete(task)
多任務異步協程
import requests async def get_page(url): print('正在下載:',url) #之所以沒有實現異步操作,原因是因為requests模塊是一個非異步的模塊 response = requests.get(url=url) print('響應數據:',response.text) print('下載成功:',url) start = time.time() urls = [ 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom' ] tasks = [] loop = asyncio.get_event_loop() for url in urls: c = get_page(url) task = asyncio.ensure_future(c) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print('總耗時:',time.time()-start)
asyncio.sleep()
import asyncio async def request(url): print('正在下載:',url) # sleep(2) #非異步模塊的代碼:在此處如果存在非異步操作代碼,則會徹底讓asyncio失去異步的效果 await asyncio.sleep(2) print('下載成功:',url) urls = [ 'www.baidu.com', 'www.taobao.com', 'www.sogou.com' ] start = time.time() loop = asyncio.get_event_loop() tasks = [] #任務列表,放置多個任務對象 for url in urls: c = request(url) task = asyncio.ensure_future(c) tasks.append(task) #將多個任務對象對應的列表注冊到事件循環中 loop.run_until_complete(asyncio.wait(tasks)) print('總耗時:',time.time()-start)
多任務異步操作應用到爬蟲中
- 環境安裝:pip install aiohttp 支持異步的網絡請求的模塊
import aiohttp import asyncio async def get_page(url): async with aiohttp.ClientSession() as session: async with await session.get(url=url) as response: page_text = await response.text() #read() json() print(page_text) start = time.time() urls = [ 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom' ] tasks = [] loop = asyncio.get_event_loop() for url in urls: c = get_page(url) task = asyncio.ensure_future(c) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print('總耗時:',time.time()-start)
實現數據解析---任務的綁定回調機制
import aiohttp import asyncio #回調函數:解析響應數據 def callback(task): print('this is callback()') #獲取響應數據 page_text = task.result() print('在回調函數中,實現數據解析') async def get_page(url): async with aiohttp.ClientSession() as session: async with await session.get(url=url) as response: page_text = await response.text() #read() json() # print(page_text) return page_text start = time.time() urls = [ 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom', 'http://127.0.0.1:5000/bobo', 'http://127.0.0.1:5000/jay', 'http://127.0.0.1:5000/tom' ] tasks = [] loop = asyncio.get_event_loop() for url in urls: c = get_page(url) task = asyncio.ensure_future(c) #給任務對象綁定回調函數用於解析響應數據 task.add_done_callback(callback) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print('總耗時:',time.time()-start)