高性能的異步爬蟲


線程池(適當使用)

import re
import requests
from lxml import etree
from multiprocessing.dummy import Pool
import random
headers = {
    'User-Agent':'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36'
}

def request_video(url):
    return requests.get(url=url,headers=headers).content


def saveVideo(data):
    name = str(random.randint(0,9999))+'.mp4'
    with open(name,'wb') as fp:
        fp.write(data)
        print(name,'下載存儲成功!!!')

url = 'https://www.pearvideo.com/category_1'
page_text = requests.get(url=url,headers=headers).text

tree = etree.HTML(page_text)
li_list = tree.xpath('//ul[@id="listvideoListUl"]/li')
#實例化一個線程池對象
pool = Pool(4)
video_url_list = [] #所有的視頻連接
for li in li_list:
    detail_url = 'https://www.pearvideo.com/'+li.xpath('./div/a/@href')[0]
    detail_page_text = requests.get(url=detail_url,headers=headers).text
    ex = 'srcUrl="(.*?)",vdoUrl='
    video_url = re.findall(ex,detail_page_text,re.S)[0]
    video_url_list.append(video_url)
#異步的獲取4個視頻的二進制數據
video_data_list = pool.map(request_video,video_url_list)

#進行視頻的持久化存儲
pool.map(saveVideo,video_data_list)

單線程+異步協程(推薦)

  • event_loop:事件循環,相當於一個無限循環,我們可以把一些函數注冊到這個事件循環上,當滿足某些條件的時候,函數就會被循環執行。程序是按照設定的順序從頭執行到尾,運行的次數也是完全按照設定。當在編寫異步程序時,必然其中有部分程序的運行耗時是比較久的,需要先讓出當前程序的控制權,讓其在背后運行,讓另一部分的程序先運行起來。當背后運行的程序完成后,也需要及時通知主程序已經完成任務可以進行下一步操作,但這個過程所需的時間是不確定的,需要主程序不斷的監聽狀態,一旦收到了任務完成的消息,就開始進行下一步。loop就是這個持續不斷的監視器。

  • coroutine:中文翻譯叫協程,在 Python 中常指代為協程對象類型,我們可以將協程對象注冊到事件循環中,它會被事件循環調用。我們可以使用 async 關鍵字來定義一個方法,這個方法在調用時不會立即被執行,而是返回一個協程對象。

  • task:任務,它是對協程對象的進一步封裝,包含了任務的各個狀態。

  • future:代表將來執行或還沒有執行的任務,實際上和 task 沒有本質區別。

  • 另外我們還需要了解 async/await 關鍵字,它是從 Python 3.5 才出現的,專門用於定義協程。其中,async 定義一個協程,await 用來掛起阻塞方法的執行。

基本使用

#基本使用
import asyncio
async def hello(name):
    print('hello to :',name)
#獲取了一個協程對象
c = hello('bobo')

#創建一個事件循環對象
loop = asyncio.get_event_loop()

#將協程對象注冊到事件循環中,然后啟動事件循環對象
loop.run_until_complete(c)

task的使用

#task的使用
import asyncio
async def hello(name):
    print('hello to :',name)

#獲取了一個協程對象
c = hello('bobo')
#創建一個事件循環對象
loop = asyncio.get_event_loop()
#就協程進行進一步的封裝,封裝到了task對象中
task = loop.create_task(c)
print(task)
loop.run_until_complete(task)
print(task)

future

#future
import asyncio
async def hello(name):
    print('hello to :',name)
c
= hello('bobo') task = asyncio.ensure_future(c) loop.run_until_complete(task)

綁定回調(task)

import asyncio
def callback(task):
    print('i am callback:',task.result())

async def hello(name):
    print('hello to :',name)
    return name

c = hello('bobo')

task = asyncio.ensure_future(c)
#給任務對象綁定一個回調函數
task.add_done_callback(callback)
loop.run_until_complete(task)

多任務異步協程

import requests
async def get_page(url):
    print('正在下載:',url)
    #之所以沒有實現異步操作,原因是因為requests模塊是一個非異步的模塊
    response = requests.get(url=url)
    print('響應數據:',response.text)
    print('下載成功:',url)
start = time.time()
urls = [
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom'
]
tasks = []
loop = asyncio.get_event_loop()
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:',time.time()-start)

asyncio.sleep()

import asyncio
async def request(url):
    print('正在下載:',url)
#     sleep(2) #非異步模塊的代碼:在此處如果存在非異步操作代碼,則會徹底讓asyncio失去異步的效果
    await asyncio.sleep(2)
    print('下載成功:',url)
urls = [
    'www.baidu.com',
    'www.taobao.com',
    'www.sogou.com'
]
start = time.time()
loop = asyncio.get_event_loop()
tasks = [] #任務列表,放置多個任務對象
for url in urls:
    c = request(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
    
#將多個任務對象對應的列表注冊到事件循環中
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:',time.time()-start)

多任務異步操作應用到爬蟲中

  • 環境安裝:pip install aiohttp  支持異步的網絡請求的模塊
import aiohttp
import asyncio

async def get_page(url):
    async with aiohttp.ClientSession() as session:
        async with await session.get(url=url) as response:
            page_text = await response.text() #read()  json()
            print(page_text)
start = time.time()
urls = [
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom'
]
tasks = []
loop = asyncio.get_event_loop()
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:',time.time()-start)

實現數據解析---任務的綁定回調機制

import aiohttp
import asyncio
#回調函數:解析響應數據
def callback(task):
    print('this is callback()')
    #獲取響應數據
    page_text = task.result()
    print('在回調函數中,實現數據解析')
    
async def get_page(url):
    async with aiohttp.ClientSession() as session:
        async with await session.get(url=url) as response:
            page_text = await response.text() #read()  json()
#             print(page_text)
            return page_text
start = time.time()
urls = [
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom',
    'http://127.0.0.1:5000/bobo',
    'http://127.0.0.1:5000/jay',
    'http://127.0.0.1:5000/tom'
]
tasks = []
loop = asyncio.get_event_loop()
for url in urls:
    c = get_page(url)
    task = asyncio.ensure_future(c)
    #給任務對象綁定回調函數用於解析響應數據
    task.add_done_callback(callback)
    tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print('總耗時:',time.time()-start)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM