相關博客:
在啃完《流暢的Python》之后,發現我之前實現的proxypool是有問題的:它雖然使用了asyncio
的,但卻不是並發的,依舊是順序的,所以運行的速度非常慢。在實現並發后,按照現有的5個規則爬取一次這5個代理網站目前用時不到3分鍾,而之前僅爬取西祠就需要1個小時。github上的代碼已更新。
並發訪問網站的例子
下面就是一個並發訪問proxypool
中實現的服務器的例子,以這個例子來說明如何實現並發。
import aiohttp
import asyncio
async def localserver(semaphore):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get('http://127.0.0.1:8088', timeout=5) as resp:
print('hello')
await asyncio.sleep(3) # 模擬網絡延遲
async def coro():
semaphore = asyncio.Semaphore(5) # 限制並發量為5
to_get = [localserver(semaphore) for _ in range(20)] # 同時建立20個協程
await asyncio.wait(to_get) # 等待所有協程結束
loop = asyncio.get_event_loop()
loop.run_until_complete(coro())
print(result)
loop.close()
運行上面的代碼,可以在終端看到每隔3秒就打印出5個"hello",下面是服務器的日志:
2017-06-01 14:45:35,375 DEBUG server started at http://127.0.0.1:8088...
2017-06-01 14:45:44,851 DEBUG 127.0.0.1:35698 GET requested index page
2017-06-01 14:45:44,853 DEBUG 127.0.0.1:35700 GET requested index page
2017-06-01 14:45:44,855 DEBUG 127.0.0.1:35702 GET requested index page
2017-06-01 14:45:44,858 DEBUG 127.0.0.1:35704 GET requested index page
2017-06-01 14:45:44,876 DEBUG 127.0.0.1:35706 GET requested index page
2017-06-01 14:45:47,864 DEBUG 127.0.0.1:35710 GET requested index page
......
2017-06-01 14:45:50,912 DEBUG 127.0.0.1:35732 GET requested index page
2017-06-01 14:45:53,887 DEBUG 127.0.0.1:35734 GET requested index page
2017-06-01 14:45:53,919 DEBUG 127.0.0.1:35736 GET requested index page
2017-06-01 14:45:53,924 DEBUG 127.0.0.1:35738 GET requested index page
2017-06-01 14:45:53,925 DEBUG 127.0.0.1:35740 GET requested index page
2017-06-01 14:45:53,929 DEBUG 127.0.0.1:35742 GET requested index page
可以在 14:45:44 時有5個幾乎同時到達的請求,之后間隔3秒會就會有5個並發請求到達,20個請求一共耗時9秒左右。
並發訪問網站一定要限流,這里是通過asyncio.Semaphore
將並發請求數量控制在5個。
通過上面的例子可以看出實現並發的關鍵就在於同時建立多個協程,然后通過asyncio.wait
方法等待它們結束,各個協程之間的調度交給事件循環完成。
改造 proxypool 以實現並發
主要修改的是proxy_crawler.py
和proxy_validator.py
2個模塊。
並發地爬取
因為每個網站的規則都不同,要實現並發爬取所有的代理網站,需要修改協程間傳遞的數據,為它們添加上各自對應的規則,這樣最終頁面解析函數就可以使用對應的規則來解析爬取到的頁面內容了,使用一個命名元組來包裝這2種數據:
Result = namedtuple('Result', 'content rule')
content
字段是url和爬取到的頁面,rule
字段則是對應的規則。
下面是支持並發的proxy_crawler
的啟動函數:
async def start(self):
to_crawl = [self._crawler(rule) for rule in self._rules] # 協程數等於規則數
await asyncio.wait(to_crawl)
現在可以並發地爬取所有的代理網站,而對於單個網站來說爬取過程依舊是順序的(爬取頁面的page_download
函數的基本邏輯沒變),因為爬取時沒有使用代理,並發訪問可能會被封IP。如果想要實現對單個代理網站的並發爬取,參考上面的例子也很容易實現。
並發地驗證
之前實現的proxypool
中最耗時的部分就是驗證了,如果代理無效,需要等待其超時才能判斷其無效,而免費的代理中絕大多數都是無效的,順序驗證就會非常耗時。
下面是支持並發的proxy_validator
的啟動函數:
async def start(self, proxies=None):
if proxies is not None:
to_validate = [self.validate_many(proxies) for _ in range(50)] # 建立 50 個協程,在爬取過程中驗證代理
else:
proxies = await self._get_proxies()# 從代理池中獲取若干代理,返回一個asyncio.Queue 對象
to_validate = [self.validate_one(proxies) for _ in range(proxies.qsize())] # 協程數等於隊列的長度,定期驗證代理池中的代理
await asyncio.wait(to_validate)
這部分相較之前的版本變化較大,除了為了支持並發而做的修改外,還進行了一點優化,重用了驗證代理的代碼,現在爬取代理時的驗證和對代理池中的代理的定期驗證都使用相同的驗證代碼。
防止日志阻塞事件循環
因為默認日志是輸出到文件的,而asyncio
包目前沒有提供異步文件系統API,為了不讓日志的I/O操作阻塞事件循環,通過調用run_in_executor
方法,把日志操作發給asyncio
的事件循環背后維護着的ThreadPoolExecutor
對象執行。
我定義了一個logger
的代理,由於logger被托管到另一個線程中執行,會丟失當前的上下文信息,如果需要記錄,可以使用traceback
庫獲取它們並作為日志的msg
,exc_info
和 stack_info
都設置為False
,這樣就不需要修改現有的代碼了:
import logging
import logging.config
import yaml
from pathlib import Path
from functools import wraps
PROJECT_ROOT = Path(__file__).parent
def _log_async(func):
"""Send func to be executed by ThreadPoolExecutor of event loop."""
@wraps(func)
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_in_executor(None, partial(func, *args, **kwargs)) # run_in_executor 本身不支持關鍵字參數,logger是有關鍵字參數(如 'extra')的,使用 'functools.partial'
return wrapper
class _LoggerAsync:
"""Logger's async proxy.
Logging were executed in a thread pool executor to avoid blocking the event loop.
"""
def __init__(self, *, is_server=False):
logging.config.dictConfig(
yaml.load(open(str(PROJECT_ROOT / 'logging.yaml'), 'r'))) # load config from YAML file
if is_server:
self._logger = logging.getLogger('server_logger')
elif VERBOSE:
self._logger = logging.getLogger('console_logger') # output to both stdout and file
else:
self._logger = logging.getLogger('file_logger')
def __getattr__(self, name):
if hasattr(self._logger, name):
return getattr(self._logger, name)
else:
msg = 'logger object has no attribute {!r}'
raise AttributeError(msg.format(name))
@_log_async
def debug(self, msg, *args, **kwargs):
self._logger.debug(msg, *args, exc_info=False, stack_info=False, **kwargs)
@_log_async
def info(self, msg, *args, **kwargs):
self._logger.info(msg, *args, exc_info=False, stack_info=False, **kwargs)
@_log_async
def warning(self, msg, *args, **kwargs):
self._logger.warning(msg, *args, exc_info=False, stack_info=False, **kwargs)
@_log_async
def error(self, msg, *args, **kwargs):
self._logger.error(msg, *args, exc_info=False, stack_info=False, **kwargs)
@_log_async
def exception(self, msg, *args, exc_info=True, **kwargs):
self._logger.exception(msg, *args, exc_info=False, stack_info=False, **kwargs)
@_log_async
def critical(self, msg, *args, **kwargs):
self._logger.critical(msg, *args, exc_info=False, stack_info=False, **kwargs)
logger = _LoggerAsync()