Python實現基於協程的異步爬蟲


Python實現基於協程的異步爬蟲

一、課程介紹

1. 課程來源

本課程核心部分來自《500 lines or less》項目,作者是來自 MongoDB 的工程師 A. Jesse Jiryu Davis 與 Python 之父 Guido van Rossum。項目代碼使用 MIT 協議,項目文檔使用 http://creativecommons.org/licenses/by/3.0/legalcode 協議。

課程內容在原文檔基礎上做了稍許修改,增加了部分原理介紹,步驟的拆解分析及源代碼注釋。

2. 內容簡介

傳統計算機科學往往將大量精力放在如何追求更有效率的算法上。但如今大部分涉及網絡的程序,它們的時間開銷主要並不是在計算上,而是在維持多個Socket連接上。亦或是它們的事件循環處理的不夠高效導致了更多的時間開銷。對於這些程序來說,它們面臨的挑戰是如何更高效地等待大量的網絡事件並進行調度。目前流行的解決方式就是使用異步I/O。

本課程將探討幾種實現爬蟲的方法,從傳統的線程池到使用協程,每節課實現一個小爬蟲。另外學習協程的時候,我們會從原理入手,以ayncio協程庫為原型,實現一個簡單的異步編程模型。

本課程實現的爬蟲為爬一個整站的爬蟲,不會爬到站點外面去,且功能較簡單,主要目的在於學習原理,提供實現並發與異步的思路,並不適合直接改寫作為日常工具使用。

3. 課程知識點

本課程項目完成過程中,我們將學習:

  1. 線程池實現並發爬蟲
  2. 回調方法實現異步爬蟲
  3. 協程技術的介紹
  4. 一個基於協程的異步編程模型
  5. 協程實現異步爬蟲

二、實驗環境

本課程使用Python 3.4,所以本課程內運行py腳本都是使用python3命令。

打開終端,進入 Code 目錄,創建 crawler 文件夾, 並將其作為我們的工作目錄。

$ cd Code $ mkdir crawler && cd crawler 

環保起見,測試爬蟲的網站在本地搭建。

我們使用 Python 2.7 版本官方文檔作為測試爬蟲用的網站

wget http://labfile.oss.aliyuncs.com/courses/574/python-doc.zip unzip python-doc.zip 

安裝serve,一個用起來很方便的靜態文件服務器:

sudo npm install -g serve 

啟動服務器:

serve python-doc 

如果訪問不了npm的資源,也可以用以下方式開啟服務器:

ruby -run -ehttpd python-doc -p 3000 

訪問localhost:3000查看網站:

此處輸入圖片的描述

三、實驗原理

什么是爬蟲?

網絡爬蟲(又被稱為網頁蜘蛛,網絡機器人,在FOAF社區中間,更經常的稱為網頁追逐者),是一種按照一定的規則,自動地抓取萬維網信息的程序或者腳本。

爬蟲的工作流程

網絡爬蟲基本的工作流程是從一個根URL開始,抓取頁面,解析頁面中所有的URL,將還沒有抓取過的URL放入工作隊列中,之后繼續抓取工作隊列中的URL,重復抓取、解析,將解析到的url放入工作隊列的步驟,直到工作隊列為空為止。

線程池、回調、協程

我們希望通過並發執行來加快爬蟲抓取頁面的速度。一般的實現方式有三種:

  1. 線程池方式:開一個線程池,每當爬蟲發現一個新鏈接,就將鏈接放入任務隊列中,線程池中的線程從任務隊列獲取一個鏈接,之后建立socket,完成抓取頁面、解析、將新連接放入工作隊列的步驟。
  2. 回調方式:程序會有一個主循環叫做事件循環,在事件循環中會不斷獲得事件,通過在事件上注冊解除回調函數來達到多任務並發執行的效果。缺點是一旦需要的回調操作變多,代碼就會非常散,變得難以維護。
  3. 協程方式:同樣通過事件循環執行程序,利用了Python 的生成器特性,生成器函數能夠中途停止並在之后恢復,那么原本不得不分開寫的回調函數就能夠寫在一個生成器函數中了,這也就實現了協程。

四、實驗一:線程池實現爬蟲

使用socket抓取頁面需要先建立連接,之后發送GET類型的HTTP報文,等待讀入,將讀到的所有內容存入響應緩存。

def fetch(url): sock = socket.socket() sock.connect(('localhost.com', 3000)) request = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = parse_links(response) q.add(links) 

默認的socket連接與讀寫是阻塞式的,在等待讀入的這段時間的CPU占用是被完全浪費的。

多線程

默認這部分同學們都是學過的,所以就粗略記幾個重點,沒學過的同學可以直接參考廖雪峰的教程:廖雪峰的官方網站-Python多線程

導入線程庫:

import threading 

開啟一個線程的方法:

t = 你新建的線程
t.start()   #開始運行線程 t.join() #你的當前函數就阻塞在這一步直到線程運行完 

建立線程的兩種方式:

#第一種:通過函數創建線程 def 函數a(): pass t = threading.Thread(target=函數a,name=自己隨便取的線程名字) #第二種:繼承線程類 class Fetcher(threading.Thread): def __init__(self): Thread.__init__(self): #加這一步后主程序中斷退出后子線程也會跟着中斷退出 self.daemon = True def run(self): #線程運行的函數 pass t = Fetcher() 

線程同時操作一個全局變量時會產生線程競爭所以需要鎖:

lock = threading.Lock() lock.acquire() #獲得鎖 #..操作全局變量.. lock.release() #釋放鎖 

多線程同步-隊列

默認這部分同學們都是學過的,所以就粗略記幾個重點,沒學過的同學可以直接參考PyMOTW3-queue — Thread-safe FIFO Implementation中文翻譯版

多線程同步就是多個線程競爭一個全局變量時按順序讀寫,一般情況下要用鎖,但是使用標准庫里的Queue的時候它內部已經實現了鎖,不用程序員自己寫了。

導入隊列類:

from queue import Queue 

創建一個隊列:

q = Queue(maxsize=0) 

maxsize為隊列大小,為0默認隊列大小可無窮大。

隊列是先進先出的數據結構:

q.put(item) #往隊列添加一個item,隊列滿了則阻塞 q.get(item) #從隊列得到一個item,隊列為空則阻塞 

還有相應的不等待的版本,這里略過。

隊列不為空,或者為空但是取得item的線程沒有告知任務完成時都是處於阻塞狀態

q.join() #阻塞直到所有任務完成 

線程告知任務完成使用task_done

q.task_done() #在線程內調用 

實現線程池

創建thread.py文件作為爬蟲程序的文件。

我們使用seen_urls來記錄已經解析到的url地址:

seen_urls = set(['/']) 

創建Fetcher類:

class Fetcher(Thread): def __init__(self, tasks): Thread.__init__(self) #tasks為任務隊列 self.tasks = tasks self.daemon = True self.start() def run(self): while True: url = self.tasks.get() print(url) sock = socket.socket() sock.connect(('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(get.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) #解析頁面上的所有鏈接 links = self.parse_links(url, response) lock.acquire() #得到新鏈接加入任務隊列與seen_urls中 for link in links.difference(seen_urls): self.tasks.put(link) seen_urls.update(links) lock.release() #通知任務隊列這個線程的任務完成了 self.tasks.task_done() 

使用正則庫與url解析庫來解析抓取的頁面,這里圖方便用了正則,同學也可以用Beautifulsoup等專門用來解析頁面的Python庫:

import urllib.parse import re 

Fetcher中實現parse_links解析頁面:

def parse_links(self, fetched_url, response): if not response: print('error: {}'.format(fetched_url)) return set() if not self._is_html(response): return set() #通過href屬性找到所有鏈接 urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body(response))) links = set() for url in urls: #可能找到的url是相對路徑,這時候就需要join一下,絕對路徑的話就還是會返回url normalized = urllib.parse.urljoin(fetched_url, url) #url的信息會被分段存在parts里 parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue #有的頁面會通過地址里的#frag后綴在頁面內跳轉,這里去掉frag的部分 defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links #得到報文的html正文 def body(self, response): body = response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _is_html(self, response): head, body = response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') 

實現線程池類與main的部分:

class ThreadPool: def __init__(self, num_threads): self.tasks = Queue() for _ in range(num_threads): Fetcher(self.tasks) def add_task(self, url): self.tasks.put(url) def wait_completion(self): self.tasks.join() if __name__ == '__main__': start = time.time() #開4個線程 pool = ThreadPool(4) #從根地址開始抓取頁面 pool.add_task("/") pool.wait_completion() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start)) 

運行效果

這里先貼出完整代碼:

from queue import Queue from threading import Thread, Lock import urllib.parse import socket import re import time seen_urls = set(['/']) lock = Lock() class Fetcher(Thread): def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: url = self.tasks.get() print(url) sock = socket.socket() sock.connect(('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(get.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = self.parse_links(url, response) lock.acquire() for link in links.difference(seen_urls): self.tasks.put(link) seen_urls.update(links) lock.release() self.tasks.task_done() def parse_links(self, fetched_url, response): if not response: print('error: {}'.format(fetched_url)) return set() if not self._is_html(response): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body(response))) links = set() for url in urls: normalized = urllib.parse.urljoin(fetched_url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def body(self, response): body = response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _is_html(self, response): head, body = response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') class ThreadPool: def __init__(self, num_threads): self.tasks = Queue() for _ in range(num_threads): Fetcher(self.tasks) def add_task(self, url): self.tasks.put(url) def wait_completion(self): self.tasks.join() if __name__ == '__main__': start = time.time() pool = ThreadPool(4) pool.add_task("/") pool.wait_completion() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start)) 

運行python3 thread.py命令查看效果(記得先開網站服務器):

此處輸入圖片的描述

使用標准庫中的線程池

線程池直接使用multiprocessing.pool中的ThreadPool

代碼更改如下:

from multiprocessing.pool import ThreadPool #...省略中間部分... #...去掉Fetcher初始化中的self.start() #...刪除自己實現的ThreadPool... if __name__ == '__main__': start = time.time() pool = ThreadPool() tasks = Queue() tasks.put("/") Workers = [Fetcher(tasks) for i in range(4)] pool.map_async(lambda w:w.run(), Workers) tasks.join() pool.close() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start)) 

使用ThreadPool時,它處理的對象可以不是線程對象,實際上Fetcher的線程部分ThreadPool根本用不到。因為它自己內部已開了幾個線程在等待任務輸入。這里偷個懶就只把self.start()去掉了。可以把Fetcher的線程部分全去掉,效果是一樣的。

ThreadPool活用了map函數,這里它將每一個Fetcher對象分配給線程池中的一個線程,線程調用了Fetcherrun函數。這里使用map_async是因為不希望它在那一步阻塞,我們希望在任務隊列join的地方阻塞,那么到隊列為空且任務全部處理完時程序就會繼續執行了。

運行python3 thread.py命令查看效果:

此處輸入圖片的描述

線程池實現的缺陷

我們希望爬蟲的性能能夠進一步提升,但是我們沒辦法開太多的線程,因為線程的內存開銷很大,每創建一個線程可能需要占用50k的內存。以及還有一點,網絡程序的時間開銷往往花在I/O上,socket I/O 阻塞時的那段時間是完全被浪費了的。那么要如何解決這個問題呢?

下節課你就知道啦,下節課見~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM