本文主要包括以下內容
- 線程池實現並發爬蟲
- 回調方法實現異步爬蟲
- 協程技術的介紹
- 一個基於協程的異步編程模型
- 協程實現異步爬蟲
線程池、回調、協程
我們希望通過並發執行來加快爬蟲抓取頁面的速度。一般的實現方式有三種:
- 線程池方式:開一個線程池,每當爬蟲發現一個新鏈接,就將鏈接放入任務隊列中,線程池中的線程從任務隊列獲取一個鏈接,之后建立socket,完成抓取頁面、解析、將新連接放入工作隊列的步驟。
- 回調方式:程序會有一個主循環叫做事件循環,在事件循環中會不斷獲得事件,通過在事件上注冊解除回調函數來達到多任務並發執行的效果。缺點是一旦需要的回調操作變多,代碼就會非常散,變得難以維護。
- 協程方式:同樣通過事件循環執行程序,利用了Python 的生成器特性,生成器函數能夠中途停止並在之后恢復,那么原本不得不分開寫的回調函數就能夠寫在一個生成器函數中了,這也就實現了協程。
線程池實現爬蟲
Python多線程建立線程的兩種方式
#第一種:通過函數創建線程 def 函數a(): pass t = threading.Thread(target=函數a,name=自己隨便取的線程名字) #第二種:繼承線程類 class Fetcher(threading.Thread): def __init__(self): Thread.__init__(self): #加這一步后主程序中斷退出后子線程也會跟着中斷退出 self.daemon = True def run(self): #線程運行的函數 pass t = Fetcher()
多線程同步-隊列
多線程同步就是多個線程競爭一個全局變量時按順序讀寫,一般情況下要用鎖,但是使用標准庫里的Queue的時候它內部已經實現了鎖,不用程序員自己寫了。
導入隊列類:
from queue import Queue 創建一個隊列: q = Queue(maxsize=0) maxsize為隊列大小,為0默認隊列大小可無窮大。 隊列是先進先出的數據結構: q.put(item) #往隊列添加一個item,隊列滿了則阻塞 q.get(item) #從隊列得到一個item,隊列為空則阻塞 還有相應的不等待的版本,這里略過。 隊列不為空,或者為空但是取得item的線程沒有告知任務完成時都是處於阻塞狀態 q.join() #阻塞直到所有任務完成 線程告知任務完成使用task_done q.task_done() #在線程內調用
完整代碼
from queue import Queue from threading import Thread, Lock import urllib.parse import socket import re import time seen_urls = set(['/']) lock = Lock() class Fetcher(Thread): def __init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start() def run(self): while True: url = self.tasks.get() print(url) sock = socket.socket() sock.connect(('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(url) sock.send(get.encode('ascii')) response = b'' chunk = sock.recv(4096) while chunk: response += chunk chunk = sock.recv(4096) links = self.parse_links(url, response) lock.acquire() for link in links.difference(seen_urls): self.tasks.put(link) seen_urls.update(links) lock.release() self.tasks.task_done() def parse_links(self, fetched_url, response): if not response: print('error: {}'.format(fetched_url)) return set() if not self._is_html(response): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body(response))) links = set() for url in urls: normalized = urllib.parse.urljoin(fetched_url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def body(self, response): body = response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _is_html(self, response): head, body = response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') class ThreadPool: def __init__(self, num_threads): self.tasks = Queue() for _ in range(num_threads): Fetcher(self.tasks) def add_task(self, url): self.tasks.put(url) def wait_completion(self): self.tasks.join() if __name__ == '__main__': start = time.time() pool = ThreadPool(4) pool.add_task("/") pool.wait_completion() print('{} URLs fetched in {:.1f} seconds'.format(len(seen_urls),time.time() - start))
事件驅動-回調函數實現爬蟲
非阻塞I/O
如果使用非阻塞I/O,程序就不會傻傻地等在那里(比如等連接、等讀取),而是會返回一個錯誤信息,雖然說是說錯誤信息,它其實就是叫你過一會再來的意思,編程的時候都不把它當錯誤看。
非阻塞I/O代碼如下:
sock = socket.socket()
sock.setblocking(False) try: sock.connect(('xkcd.com', 80)) except BlockingIOError: pass
單線程上的多I/O
有了非阻塞I/O這個特性,我們就能夠實現單線程上多個sockets的處理了,學過C語言網絡編程的同學應該都認識select這個函數吧?不認識也不要緊,select函數如果你不設置它的超時時間它就是默認一直阻塞的,只有當有I/O事件發生時它才會被激活,然后告訴你哪個socket上發生了什么事件(讀|寫|異常),在python中也有select,還有跟select功能相同但是更高效的poll,它們都是底層C函數的Python實現。
不過這里我們不使用select,而是用更簡單好用的DefaultSelector,是Python 3.4后才出現的一個模塊里的類,你只需要在非阻塞socket和事件上綁定回調函數就可以了。
代碼如下:
from selectors import DefaultSelector, EVENT_WRITE selector = DefaultSelector() sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass def connected(): selector.unregister(sock.fileno()) print('connected!') selector.register(sock.fileno(), EVENT_WRITE, connected)
這里看一下selector.register的原型
register(fileobj, events, data=None)
其中fileobj可以是文件描述符也可以是文件對象(通過fileno得到),events是位掩碼,指明發生的是什么事件,data 則是與指定文件(也就是我們的socket)與指定事件綁定在一起的數據。
如代碼所示,selector.register 在該socket的寫事件上綁定了回調函數connected(這里作為數據綁定)。在該socket上第一次發生的寫事件意味着連接的建立,connected函數在連接建立成功后再解除了該socket上所有綁定的數據。
事件驅動
看了以上selector的使用方式,我想你會發現它很適合寫成事件驅動的形式。
我們可以創建一個事件循環,在循環中不斷獲得I/O事件:
def loop(): while True: events = selector.select() #遍歷事件並調用相應的處理 for event_key, event_mask in events: callback = event_key.data callback()
完整代碼
from selectors import * import socket import re import urllib.parse import time urls_todo = set(['/']) seen_urls = set(['/']) #追加了一個可以看最高並發數的變量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False class Fetcher: def __init__(self, url): self.response = b'' self.url = url self.sock = None def fetch(self): global concurrency_achieved concurrency_achieved = max(concurrency_achieved, len(urls_todo)) self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): selector.unregister(key.fd) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) self.sock.send(get.encode('ascii')) selector.register(key.fd, EVENT_READ, self.read_response) def read_response(self, key, mask): global stopped chunk = self.sock.recv(4096) # 4k chunk size. if chunk: self.response += chunk else: selector.unregister(key.fd) # Done reading. links = self.parse_links() for link in links.difference(seen_urls): urls_todo.add(link) Fetcher(link).fetch() seen_urls.update(links) urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def parse_links(self): if not self.response: print('error: {}'.format(self.url)) return set() if not self._is_html(): return set() urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) links = set() for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) links.add(defragmented) return links def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') fetcher.fetch() while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback(event_key, event_mask) print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(seen_urls), time.time() - start, concurrency_achieved))
事件驅動-協程實現爬蟲
什么是協程?
協程其實是比起一般的子例程而言更寬泛的存在,子例程是協程的一種特例。
子例程的起始處是惟一的入口點,一旦退出即完成了子例程的執行,子例程的一個實例只會返回一次。
協程可以通過yield來調用其它協程。通過yield方式轉移執行權的協程之間不是調用者與被調用者的關系,而是彼此對稱、平等的。
協程的起始處是第一個入口點,在協程里,返回點之后是接下來的入口點。子例程的生命期遵循后進先出(最后一個被調用的子例程最先返回);相反,協程的生命期完全由他們的使用的需要決定。
還記得我們什么時候會用到yield嗎,就是在生成器(generator)里,在迭代的時候每次執行next(generator)生成器都會執行到下一次yield的位置並返回,可以說生成器就是例程。
生成器實現協程模型
雖然生成器擁有一個協程該有的特性,但光這樣是不夠的,做異步編程仍是困難的,我們需要先用生成器實現一個協程異步編程的簡單模型,它同時也是Python標准庫asyncio的簡化版,正如asyncio的實現,我們會用到生成器,Future類,以及yield from語句。
首先實現Future類, Future類可以認為是專門用來存儲將要發送給協程的信息的類。
class Future: def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self)
Future對象最開始處在掛起狀態,當調用set_result時被激活,並運行注冊的回調函數,該回調函數多半是對協程發送信息讓協程繼續運行下去的函數。
我們改造一下之前從fetch到connected的函數,加入Future與yield。
這是之前回調實現的fetch:
class Fetcher: def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect(('localhost', 3000)) except BlockingIOError: pass selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) def connected(self, key, mask): print('connected!') # ...后面省略...
改造后,我們將連接建立后的部分也放到了fetch中。
class Fetcher: def fetch(self): sock = socket.socket() sock.setblocking(False) try: sock.connect(('localhost', 3000)) except BlockingIOError: pass f = Future() def on_connected(): #連接建立后通過set_result協程繼續從yield的地方往下運行 f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield f selector.unregister(sock.fileno()) print('connected!')
fetcher是一個生成器函數,我們創建一個Future實例,yield它來暫停fetch的運行直到連接建立f.set_result(None)的時候,生成器才繼續運行。那set_result時運行的回調函數是哪來的呢?這里引入Task類:
class Task: def __init__(self, coro): #協程 self.coro = coro #創建並初始化一個為None的Future對象 f = Future() f.set_result(None) #步進一次(發送一次信息) #在初始化的時候發送是為了協程到達第一個yield的位置,也是為了注冊下一次的步進 self.step(f) def step(self, future): try: #向協程發送消息並得到下一個從協程那yield到的Future對象 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) fetcher = Fetcher('/') Task(fetcher.fetch()) loop()
流程大致是這樣的,首先Task初始化,向fetch生成器發送None信息(也可以想象成step調用了fetch,參數是None),fetch得以從開頭運行到第一個yield的地方並返回了一個Future對象給step的next_future,然后step就在這個得到的Future對象注冊了step。當連接建立時on_connected就會被調用,再一次向協程發送信息,協程就會繼續往下執行了。
使用yield from分解協程
一旦socket連接建立成功,我們發送HTTP GET請求到服務器並在之后讀取服務器響應。現在這些步驟不用再分散在不同的回調函數里了,我們可以將其放在同一個生成器函數中:
def fetch(self): # ... 省略連接的代碼 sock.send(request.encode('ascii')) while True: f = Future() def on_readable(): f.set_result(sock.recv(4096)) selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield f selector.unregister(sock.fileno()) if chunk: self.response += chunk else: # 完成讀取 break
但是這樣代碼也會越積越多,可不可以分解生成器函數的代碼呢,從協程中提取出子協程?Python 3 的yield from能幫助我們完成這部分工作。:
>>> def gen_fn():
... result = yield 1 ... print('result of yield: {}'.format(result)) ... result2 = yield 2 ... print('result of 2nd yield: {}'.format(result2)) ... return 'done' ...
yield from得到的子協程最后return的返回值
完整代碼
from selectors import * import socket import re import urllib.parse import time class Future: def __init__(self): self.result = None self._callbacks = [] def result(self): return self.result def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): self.result = result for fn in self._callbacks: fn(self) def __iter__(self): yield self return self.result class Task: def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) urls_seen = set(['/']) urls_todo = set(['/']) #追加了一個可以看最高並發數的變量 concurrency_achieved = 0 selector = DefaultSelector() stopped = False def connect(sock, address): f = Future() sock.setblocking(False) try: sock.connect(address) except BlockingIOError: pass def on_connected(): f.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, on_connected) yield from f selector.unregister(sock.fileno()) def read(sock): f = Future() def on_readable(): f.set_result(sock.recv(4096)) # Read 4k at a time. selector.register(sock.fileno(), EVENT_READ, on_readable) chunk = yield from f selector.unregister(sock.fileno()) return chunk def read_all(sock): response = [] chunk = yield from read(sock) while chunk: response.append(chunk) chunk = yield from read(sock) return b''.join(response) class Fetcher: def __init__(self, url): self.response = b'' self.url = url def fetch(self): global concurrency_achieved, stopped concurrency_achieved = max(concurrency_achieved, len(urls_todo)) sock = socket.socket() yield from connect(sock, ('localhost', 3000)) get = 'GET {} HTTP/1.0\r\nHost: localhost\r\n\r\n'.format(self.url) sock.send(get.encode('ascii')) self.response = yield from read_all(sock) self._process_response() urls_todo.remove(self.url) if not urls_todo: stopped = True print(self.url) def body(self): body = self.response.split(b'\r\n\r\n', 1)[1] return body.decode('utf-8') def _process_response(self): if not self.response: print('error: {}'.format(self.url)) return if not self._is_html(): return urls = set(re.findall(r'''(?i)href=["']?([^\s"'<>]+)''', self.body())) for url in urls: normalized = urllib.parse.urljoin(self.url, url) parts = urllib.parse.urlparse(normalized) if parts.scheme not in ('', 'http', 'https'): continue host, port = urllib.parse.splitport(parts.netloc) if host and host.lower() not in ('localhost'): continue defragmented, frag = urllib.parse.urldefrag(parts.path) if defragmented not in urls_seen: urls_todo.add(defragmented) urls_seen.add(defragmented) Task(Fetcher(defragmented).fetch()) def _is_html(self): head, body = self.response.split(b'\r\n\r\n', 1) headers = dict(h.split(': ') for h in head.decode().split('\r\n')[1:]) return headers.get('Content-Type', '').startswith('text/html') start = time.time() fetcher = Fetcher('/') Task(fetcher.fetch()) while not stopped: events = selector.select() for event_key, event_mask in events: callback = event_key.data callback() print('{} URLs fetched in {:.1f} seconds, achieved concurrency = {}'.format( len(urls_seen), time.time() - start, concurrency_achieved))
總結
至此,我們在學習的過程中掌握了:
- 線程池實現並發爬蟲
- 回調方法實現異步爬蟲
- 協程技術的介紹
- 一個基於協程的異步編程模型
- 協程實現異步爬蟲
三種爬蟲的實現方式中線程池是最壞的選擇,因為它既占用內存,又有線程競爭的危險需要程序員自己編程解決,而且產生的I/O阻塞也浪費了CPU占用時間。再來看看回調方式,它是一種異步方法,所以I/O阻塞的問題解決了,而且它是單線程的不會產生競爭,問題好像都解決了。然而它引入了新的問題,它的問題在於以這種方式編寫的代碼不好維護,也不容易debug。看來協程才是最好的選擇,我們實現的協程異步編程模型使得一個單線程能夠很容易地改寫為協程。那是不是每一次做異步編程都要實現Task、Future呢?不是的,你可以直接使用asyncio官方標准協程庫,它已經幫你把Task、Future封裝好了,你根本不會感受到它們的存在,是不是很棒呢?如果你使用Python 3.5那更好,已經可以用原生的協程了,Python 3.5追加了async def,await等協程相關的關鍵詞。
參考鏈接:Python - Python實現基於協程的異步爬蟲 - 實驗樓
原文地址:http://blog.csdn.net/whuhan2013/article/details/52529477