Python實現基於協程的異步爬蟲
一、課程介紹
1. 課程來源
本課程核心部分來自《500 lines or less》項目,作者是來自 MongoDB 的工程師 A. Jesse Jiryu Davis 與 Python 之父 Guido van Rossum。項目代碼使用 MIT 協議,項目文檔使用 http://creativecommons.org/licenses/by/3.0/legalcode 協議。
課程內容在原文檔基礎上做了稍許修改,增加了部分原理介紹,步驟的拆解分析及源代碼注釋。
2. 內容簡介
傳統計算機科學往往將大量精力放在如何追求更有效率的算法上。但如今大部分涉及網絡的程序,它們的時間開銷主要並不是在計算上,而是在維持多個Socket連接上。亦或是它們的事件循環處理的不夠高效導致了更多的時間開銷。對於這些程序來說,它們面臨的挑戰是如何更高效地等待大量的網絡事件並進行調度。目前流行的解決方式就是使用異步I/O。
本課程將探討幾種實現爬蟲的方法,從傳統的線程池到使用協程,每節課實現一個小爬蟲。另外學習協程的時候,我們會從原理入手,以ayncio協程庫為原型,實現一個簡單的異步編程模型。
本課程實現的爬蟲為爬一個整站的爬蟲,不會爬到站點外面去,且功能較簡單,主要目的在於學習原理,提供實現並發與異步的思路,並不適合直接改寫作為日常工具使用。
3. 課程知識點
本課程項目完成過程中,我們將學習:
- 線程池實現並發爬蟲
- 回調方法實現異步爬蟲
- 協程技術的介紹
- 一個基於協程的異步編程模型
- 協程實現異步爬蟲
二、實驗環境
本課程使用Python 3.4
,所以本課程內運行py
腳本都是使用python3
命令。
打開終端,進入 Code
目錄,創建 crawler
文件夾, 並將其作為我們的工作目錄。
$ cd Code $ mkdir crawler && cd crawler
環保起見,測試爬蟲的網站在本地搭建。
我們使用 Python 2.7 版本官方文檔作為測試爬蟲用的網站
wget http://labfile.oss.aliyuncs.com/courses/574/python-doc.zip unzip python-doc.zip
安裝serve
,一個用起來很方便的靜態文件服務器:
sudo npm install -g serve
啟動服務器:
serve python-doc
如果訪問不了npm
的資源,也可以用以下方式開啟服務器:
ruby -run -ehttpd python-doc -p 3000
訪問localhost:3000
查看網站:
三、實驗原理
什么是爬蟲?
網絡爬蟲(又被稱為網頁蜘蛛,網絡機器人,在FOAF社區中間,更經常的稱為網頁追逐者),是一種按照一定的規則,自動地抓取萬維網信息的程序或者腳本。
爬蟲的工作流程
網絡爬蟲基本的工作流程是從一個根URL開始,抓取頁面,解析頁面中所有的URL,將還沒有抓取過的URL放入工作隊列中,之后繼續抓取工作隊列中的URL,重復抓取、解析,將解析到的url放入工作隊列的步驟,直到工作隊列為空為止。
線程池、回調、協程
我們希望通過並發執行來加快爬蟲抓取頁面的速度。一般的實現方式有三種:
- 線程池方式:開一個線程池,每當爬蟲發現一個新鏈接,就將鏈接放入任務隊列中,線程池中的線程從任務隊列獲取一個鏈接,之后建立socket,完成抓取頁面、解析、將新連接放入工作隊列的步驟。
- 回調方式:程序會有一個主循環叫做事件循環,在事件循環中會不斷獲得事件,通過在事件上注冊解除回調函數來達到多任務並發執行的效果。缺點是一旦需要的回調操作變多,代碼就會非常散,變得難以維護。
- 協程方式:同樣通過事件循環執行程序,利用了
Python
的生成器特性,生成器函數能夠中途停止並在之后恢復,那么原本不得不分開寫的回調函數就能夠寫在一個生成器函數中了,這也就實現了協程。
四、實驗一:線程池實現爬蟲
使用socket
抓取頁面需要先建立連接,之后發送GET
類型的HTTP
報文,等待讀入,將讀到的所有內容存入響應緩存。
def fetch(url): sock = socket.socket() sock.connect(('localhost.com', 3000)) request = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(request.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = parse_links(response) q.add(links)
默認的socket
連接與讀寫是阻塞式的,在等待讀入的這段時間的CPU占用是被完全浪費的。
多線程
默認這部分同學們都是學過的,所以就粗略記幾個重點,沒學過的同學可以直接參考廖雪峰的教程:廖雪峰的官方網站-Python多線程
導入線程庫:
import threading
開啟一個線程的方法:
t = 你新建的線程
t.start() #開始運行線程 t.join() #你的當前函數就阻塞在這一步直到線程運行完
建立線程的兩種方式:
#第一種:通過函數創建線程 def 函數a(): pass t = threading.Thread(target=函數a,name=自己隨便取的線程名字) #第二種:繼承線程類 class Fetcher(threading.Thread): def __init__(self): Thread.__init__(self): #加這一步后主程序中斷退出后子線程也會跟着中斷退出 self.daemon = True def run(self): #線程運行的函數 pass t = Fetcher()
線程同時操作一個全局變量時會產生線程競爭所以需要鎖:
lock = threading.Lock() lock.acquire() #獲得鎖 #..操作全局變量.. lock.release() #釋放鎖
多線程同步-隊列
默認這部分同學們都是學過的,所以就粗略記幾個重點,沒學過的同學可以直接參考PyMOTW3-queue — Thread-safe FIFO Implementation:中文翻譯版
多線程同步就是多個線程競爭一個全局變量時按順序讀寫,一般情況下要用鎖,但是使用標准庫里的Queue
的時候它內部已經實現了鎖,不用程序員自己寫了。
導入隊列類:
from queue import Queue
創建一個隊列:
q = Queue(maxsize=0)
maxsize為隊列大小,為0默認隊列大小可無窮大。
隊列是先進先出的數據結構:
q.put(item) #往隊列添加一個item,隊列滿了則阻塞 q.get(item) #從隊列得到一個item,隊列為空則阻塞
還有相應的不等待的版本,這里略過。
隊列不為空,或者為空但是取得item的線程沒有告知任務完成時都是處於阻塞狀態
q.join() #阻塞直到所有任務完成
線程告知任務完成使用task_done
q.task_done() #在線程內調用
實現線程池
創建thread.py
文件作為爬蟲程序的文件。
我們使用seen_urls
來記錄已經解析到的url
地址:
seen_urls = set(['/'])
創建Fetcher
類:
class Fetcher(Thread): def __init__(self, tasks): Thread.__init__(self) #tasks為任務隊列 self.tasks = tasks self.daemon = True self.start() def run(self): while True: url = self.tasks.get() print(url) sock = socket.socket() sock.connect(('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(get.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) #解析頁面上的所有鏈接 links = self.parse_links(url, response) lock.acquire() #得到新鏈接加入任務隊列與seen_urls中 for link in links.difference(seen_urls): self.tasks.put(link) seen_urls.update(links) lock.release() #通知任務隊列這個線程的任務完成了 self.tasks.task_done()
使用正則庫與url解析庫來解析抓取的頁面,這里圖方便用了正則,同學也可以用Beautifulsoup
等專門用來解析頁面的Python庫:
import urllib.parse import re
在Fetcher
中實現parse_links
解析頁面:
def parse_links(self, fetched_url, response): if not response: print('error: {}'.format(fetched_url)) return set() if not self._is_html(response): return set() #通過href屬性找到所有鏈接 urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body(response))) links = set() for url in urls: #可能找到的url是相對路徑,這時候就需要join一下,絕對路徑的話就還是會返回url normalized = urllib.parse.urljoin(fetched_url, url) #url的信息會被分段存在parts里 parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue #有的頁面會通過地址里的#frag后綴在頁面內跳轉,這里去掉frag的部分 defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links #得到報文的html正文 def body(self, response): body = response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _is_html(self, response): head, body = response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html')
實現線程池類與main
的部分:
class ThreadPool: def __init__(self, num_threads): self.tasks = Queue() for _ in range(num_threads): Fetcher(self.tasks) def add_task(self, url): self.tasks.put(url) def wait_completion(self): self.tasks.join() if __name__ == '__main__': start = time.time() #開4個線程 pool = ThreadPool(4) #從根地址開始抓取頁面 pool.add_task("/") pool.wait_completion() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
運行效果
這里先貼出完整代碼:
from queue import Queue from threading import Thread, Lock import urllib.parse import socket import re import time seen_urls = set(['/']) lock = Lock() class Fetcher(Thread): def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: url = self.tasks.get() print(url) sock = socket.socket() sock.connect(('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(get.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = self.parse_links(url, response) lock.acquire() for link in links.difference(seen_urls): self.tasks.put(link) seen_urls.update(links) lock.release() self.tasks.task_done() def parse_links(self, fetched_url, response): if not response: print('error: {}'.format(fetched_url)) return set() if not self._is_html(response): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body(response))) links = set() for url in urls: normalized = urllib.parse.urljoin(fetched_url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def body(self, response): body = response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _is_html(self, response): head, body = response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') class ThreadPool: def __init__(self, num_threads): self.tasks = Queue() for _ in range(num_threads): Fetcher(self.tasks) def add_task(self, url): self.tasks.put(url) def wait_completion(self): self.tasks.join() if __name__ == '__main__': start = time.time() pool = ThreadPool(4) pool.add_task("/") pool.wait_completion() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
運行python3 thread.py
命令查看效果(記得先開網站服務器):
使用標准庫中的線程池
線程池直接使用multiprocessing.pool
中的ThreadPool
:
代碼更改如下:
from multiprocessing.pool import ThreadPool #...省略中間部分... #...去掉Fetcher初始化中的self.start() #...刪除自己實現的ThreadPool... if __name__ == '__main__': start = time.time() pool = ThreadPool() tasks = Queue() tasks.put("/") Workers = [Fetcher(tasks) for i in range(4)] pool.map_async(lambda w:w.run(), Workers) tasks.join() pool.close() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
使用ThreadPool
時,它處理的對象可以不是線程對象,實際上Fetcher
的線程部分ThreadPool
根本用不到。因為它自己內部已開了幾個線程在等待任務輸入。這里偷個懶就只把self.start()
去掉了。可以把Fetcher
的線程部分全去掉,效果是一樣的。
ThreadPool
活用了map
函數,這里它將每一個Fetcher
對象分配給線程池中的一個線程,線程調用了Fetcher
的run
函數。這里使用map_async
是因為不希望它在那一步阻塞,我們希望在任務隊列join
的地方阻塞,那么到隊列為空且任務全部處理完時程序就會繼續執行了。
運行python3 thread.py
命令查看效果:
線程池實現的缺陷
我們希望爬蟲的性能能夠進一步提升,但是我們沒辦法開太多的線程,因為線程的內存開銷很大,每創建一個線程可能需要占用50k的內存。以及還有一點,網絡程序的時間開銷往往花在I/O上,socket I/O 阻塞時的那段時間是完全被浪費了的。那么要如何解決這個問題呢?
下節課你就知道啦,下節課見~