Python實現的異步代理爬蟲及代理池2--正確實現並發


相關博客:

在啃完《流暢的Python》之后,發現我之前實現的proxypool是有問題的:它雖然使用了asyncio的,但卻不是並發的,依舊是順序的,所以運行的速度非常慢。在實現並發后,按照現有的5個規則爬取一次這5個代理網站目前用時不到3分鍾,而之前僅爬取西祠就需要1個小時。github上的代碼已更新。

並發訪問網站的例子

下面就是一個並發訪問proxypool中實現的服務器的例子,以這個例子來說明如何實現並發。

import aiohttp
import asyncio


async def localserver(semaphore):
    async with semaphore:       
        async with aiohttp.ClientSession() as session:
            async with session.get('http://127.0.0.1:8088', timeout=5) as resp:
                print('hello')
            await asyncio.sleep(3) # 模擬網絡延遲

async def coro():
    semaphore = asyncio.Semaphore(5) # 限制並發量為5
    to_get = [localserver(semaphore) for _ in range(20)] # 同時建立20個協程
    await asyncio.wait(to_get) # 等待所有協程結束

loop = asyncio.get_event_loop()
loop.run_until_complete(coro())
print(result)
loop.close()

運行上面的代碼,可以在終端看到每隔3秒就打印出5個"hello",下面是服務器的日志:

2017-06-01 14:45:35,375  DEBUG                                 server started at http://127.0.0.1:8088...
2017-06-01 14:45:44,851  DEBUG    127.0.0.1:35698       GET    requested index page
2017-06-01 14:45:44,853  DEBUG    127.0.0.1:35700       GET    requested index page
2017-06-01 14:45:44,855  DEBUG    127.0.0.1:35702       GET    requested index page
2017-06-01 14:45:44,858  DEBUG    127.0.0.1:35704       GET    requested index page
2017-06-01 14:45:44,876  DEBUG    127.0.0.1:35706       GET    requested index page
2017-06-01 14:45:47,864  DEBUG    127.0.0.1:35710       GET    requested index page
......
2017-06-01 14:45:50,912  DEBUG    127.0.0.1:35732       GET    requested index page
2017-06-01 14:45:53,887  DEBUG    127.0.0.1:35734       GET    requested index page
2017-06-01 14:45:53,919  DEBUG    127.0.0.1:35736       GET    requested index page
2017-06-01 14:45:53,924  DEBUG    127.0.0.1:35738       GET    requested index page
2017-06-01 14:45:53,925  DEBUG    127.0.0.1:35740       GET    requested index page
2017-06-01 14:45:53,929  DEBUG    127.0.0.1:35742       GET    requested index page

可以在 14:45:44 時有5個幾乎同時到達的請求,之后間隔3秒會就會有5個並發請求到達,20個請求一共耗時9秒左右。
並發訪問網站一定要限流,這里是通過asyncio.Semaphore將並發請求數量控制在5個。

通過上面的例子可以看出實現並發的關鍵就在於同時建立多個協程,然后通過asyncio.wait方法等待它們結束,各個協程之間的調度交給事件循環完成。

改造 proxypool 以實現並發

主要修改的是proxy_crawler.pyproxy_validator.py2個模塊。

並發地爬取

因為每個網站的規則都不同,要實現並發爬取所有的代理網站,需要修改協程間傳遞的數據,為它們添加上各自對應的規則,這樣最終頁面解析函數就可以使用對應的規則來解析爬取到的頁面內容了,使用一個命名元組來包裝這2種數據:

Result = namedtuple('Result', 'content rule')

content字段是url和爬取到的頁面,rule字段則是對應的規則。

下面是支持並發的proxy_crawler的啟動函數:

async def start(self):
    to_crawl = [self._crawler(rule) for rule in self._rules] # 協程數等於規則數
    await asyncio.wait(to_crawl)

現在可以並發地爬取所有的代理網站,而對於單個網站來說爬取過程依舊是順序的(爬取頁面的page_download函數的基本邏輯沒變),因為爬取時沒有使用代理,並發訪問可能會被封IP。如果想要實現對單個代理網站的並發爬取,參考上面的例子也很容易實現。

並發地驗證

之前實現的proxypool中最耗時的部分就是驗證了,如果代理無效,需要等待其超時才能判斷其無效,而免費的代理中絕大多數都是無效的,順序驗證就會非常耗時。
下面是支持並發的proxy_validator的啟動函數:

async def start(self, proxies=None):
    if proxies is not None:
        to_validate = [self.validate_many(proxies) for _ in range(50)] # 建立 50 個協程,在爬取過程中驗證代理
    else:
        proxies = await self._get_proxies()# 從代理池中獲取若干代理,返回一個asyncio.Queue 對象
        to_validate = [self.validate_one(proxies) for _ in range(proxies.qsize())] # 協程數等於隊列的長度,定期驗證代理池中的代理

    await asyncio.wait(to_validate)

這部分相較之前的版本變化較大,除了為了支持並發而做的修改外,還進行了一點優化,重用了驗證代理的代碼,現在爬取代理時的驗證和對代理池中的代理的定期驗證都使用相同的驗證代碼。

防止日志阻塞事件循環

因為默認日志是輸出到文件的,而asyncio包目前沒有提供異步文件系統API,為了不讓日志的I/O操作阻塞事件循環,通過調用run_in_executor方法,把日志操作發給asyncio的事件循環背后維護着的ThreadPoolExecutor 對象執行。
我定義了一個logger的代理,由於logger被托管到另一個線程中執行,會丟失當前的上下文信息,如果需要記錄,可以使用traceback庫獲取它們並作為日志的msgexc_infostack_info都設置為False,這樣就不需要修改現有的代碼了:

import logging
import logging.config
import yaml
from pathlib import Path
from functools import wraps


PROJECT_ROOT = Path(__file__).parent

def _log_async(func):
    """Send func to be executed by ThreadPoolExecutor of event loop."""

    @wraps(func)
    def wrapper(*args, **kwargs):
        loop = asyncio.get_event_loop()
        return loop.run_in_executor(None, partial(func, *args, **kwargs)) # run_in_executor 本身不支持關鍵字參數,logger是有關鍵字參數(如 'extra')的,使用 'functools.partial'

    return wrapper


class _LoggerAsync:
    """Logger's async proxy.

    Logging were executed in a thread pool executor to avoid blocking the event loop.
    """

    def __init__(self, *, is_server=False):
        logging.config.dictConfig(
            yaml.load(open(str(PROJECT_ROOT / 'logging.yaml'), 'r')))  # load config from YAML file

        if is_server:
            self._logger = logging.getLogger('server_logger')
        elif VERBOSE:
            self._logger = logging.getLogger('console_logger')  # output to both stdout and file
        else:
            self._logger = logging.getLogger('file_logger')

    def __getattr__(self, name):
        if hasattr(self._logger, name):
            return getattr(self._logger, name)
        else:
            msg = 'logger object has no attribute {!r}'
            raise AttributeError(msg.format(name))

    @_log_async
    def debug(self, msg, *args, **kwargs):
        self._logger.debug(msg, *args, exc_info=False, stack_info=False, **kwargs)

    @_log_async
    def info(self, msg, *args, **kwargs):
        self._logger.info(msg, *args, exc_info=False, stack_info=False, **kwargs)

    @_log_async
    def warning(self, msg, *args, **kwargs):
        self._logger.warning(msg, *args, exc_info=False, stack_info=False, **kwargs)

    @_log_async
    def error(self, msg, *args, **kwargs):
        self._logger.error(msg, *args, exc_info=False, stack_info=False, **kwargs)

    @_log_async
    def exception(self, msg, *args, exc_info=True, **kwargs):
        self._logger.exception(msg, *args, exc_info=False, stack_info=False, **kwargs) 

    @_log_async
    def critical(self, msg, *args, **kwargs):
        self._logger.critical(msg, *args, exc_info=False, stack_info=False, **kwargs)

logger = _LoggerAsync()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM