關於asyncio知識(二)


一、asyncio之—-入門初探

通過上一篇關於asyncio的整體介紹,看過之后基本對asyncio就有一個基本認識,如果是感興趣的小伙伴相信也會嘗試寫一些小代碼嘗試用了,那么這篇文章會通過一個簡單的爬蟲程序,從簡單到復雜,一點一點的改進程序以達到我們想要的效果.

https://github.com/HackerNews/API 這里是關於HN的API的使用說明,這次寫的爬蟲就是調用這里的api接口,用到的模塊是aiohttp 發起的請求,切記這里是不能用requests模塊的。關於aiohttp的文檔:https://aiohttp.readthedocs.io/en/stable/

下面我們看具體的代碼實現,這個代碼主要就是爬取其中一個連接下的所有評論,如果不傳遞id的情況,默認就是爬取id為8863的評論

import asyncio
import argparse
import logging
from urllib.parse import urlparse, parse_qs
from datetime import datetime
import aiohttp
import async_timeout
LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
FETCH_TIMEOUT = 10
parser = argparse.ArgumentParser(
    description='獲取所有請求url的所有評論')
parser.add_argument('--id', type=int, default=8863,
                    help='請求的id, 默認id 是8863')
parser.add_argument('--url', type=str, help='HN的url地址')
parser.add_argument('--verbose', action='store_true', help='詳細的輸出')
logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)
fetch_counter = 0
async def fetch(session, url):
    """
    通過aiohttp訪問url並返回json格式數據
    """
    global fetch_counter
    with async_timeout.timeout(FETCH_TIMEOUT):
        fetch_counter += 1
        # 因為接口需要翻牆才能訪問,所以這里我用的是我本地的代理
        async with session.get(url,proxy='http://127.0.0.1:1081') as response:
            return await response.json()
async def post_number_of_comments(loop, session, post_id):
    """
    遞歸獲取當前請求url的所有評論
    """
    url = URL_TEMPLATE.format(post_id)
    now = datetime.now()
    response = await fetch(session, url)
    log.debug('{:^6} > Fetching of {} took {} seconds'.format(
        post_id, url, (datetime.now() - now).total_seconds()))
    if 'kids' not in response:  #表示沒有評論
        return 0
    # 獲取當前請求的url的評論的數量
    number_of_comments = len(response['kids'])
    log.debug('{:^6} > Fetching {} child posts'.format(
        post_id, number_of_comments))
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]
    # 獲取所有協程的執行的結果
    results = await asyncio.gather(*tasks)
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))
    return number_of_comments
def id_from_HN_url(url):
    """
    獲取運行時傳遞的參數中的id
    """
    parse_result = urlparse(url)
    try:
        return parse_qs(parse_result.query)['id'][0]
    except (KeyError, IndexError):
        return None
async def main(loop, post_id):
    now = datetime.now()
    async with aiohttp.ClientSession(loop=loop) as session:
        now = datetime.now()
        comments = await post_number_of_comments(loop, session, post_id)
        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() - now).total_seconds(), fetch_counter))
    return comments
if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)
    post_id = id_from_HN_url(args.url) if args.url else args.id
    loop = asyncio.get_event_loop()
    comments = loop.run_until_complete(main(loop, post_id))
    log.info("-- Post {} has {} comments".format(post_id, comments))
    loop.close()

再次提醒該url請求的時候是需要翻牆才能訪問到,所以我這里加了本地的代理,以便能夠爬取到內容,正常的請求結果如下:

[23:24:37] > Calculating comments took 2.98 seconds and 73 fetches
[23:24:37] -- Post 8863 has 72 comments

如果沒有翻牆就是如下錯誤了:

Traceback (most recent call last):
  File "/Users/zhaofan/vs_python/python_asyncio/ex1.py", line 41, in fetch
    async with session.get(url) as response:
  File "/usr/local/lib/python3.7/site-packages/aiohttp/client.py", line 1005, in __aenter__
    self._resp = await self._coro
  File "/usr/local/lib/python3.7/site-packages/aiohttp/client.py", line 476, in _request
    timeout=real_timeout
  File "/usr/local/lib/python3.7/site-packages/aiohttp/connector.py", line 522, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "/usr/local/lib/python3.7/site-packages/aiohttp/connector.py", line 854, in _create_connection
    req, traces, timeout)
  File "/usr/local/lib/python3.7/site-packages/aiohttp/connector.py", line 974, in _create_direct_connection
    req=req, client_error=client_error)
  File "/usr/local/lib/python3.7/site-packages/aiohttp/connector.py", line 924, in _wrap_create_connection
    await self._loop.create_connection(*args, **kwargs))
  File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 946, in create_connection
    await self.sock_connect(sock, address)
  File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/selector_events.py", line 464, in sock_connect
    return await fut
concurrent.futures._base.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/Users/zhaofan/vs_python/python_asyncio/ex1.py", line 115, in <module>
    comments = loop.run_until_complete(main(loop, post_id))
  File "/usr/local/Cellar/python/3.7.2_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users/zhaofan/vs_python/python_asyncio/ex1.py", line 99, in main
    comments = await post_number_of_comments(loop, session, post_id)
  File "/Users/zhaofan/vs_python/python_asyncio/ex1.py", line 51, in post_number_of_comments
    response = await fetch(session, url)
  File "/Users/zhaofan/vs_python/python_asyncio/ex1.py", line 42, in fetch
    return await response.json()
  File "/usr/local/lib/python3.7/site-packages/async_timeout/__init__.py", line 45, in __exit__
    self._do_exit(exc_type)
  File "/usr/local/lib/python3.7/site-packages/async_timeout/__init__.py", line 92, in _do_exit
    raise asyncio.TimeoutError
concurrent.futures._base.TimeoutError

還有就是上面的代碼中我們使用了results = await asyncio.gather(*tasks)
等待所有的協程執行完成並返回結果,關於gather的官網文檔地址:https://docs.python.org/3/library/asyncio-task.html#asyncio.gather

並且在上面的使用中我們也用到了遞歸,你可能感覺還挺簡單的,代碼看着和我們平時的寫的阻塞式的代碼好像區別也不是特別大,保持這種愉悅感,接着看

二、asyncio之—-更進一步

那么我們現在想要的是當我們的爬蟲程序爬取評論的時候,我們想要當評論超過一定閾值的貼帖子發郵件通知告訴我們,其實這個功能是非常有必要的,就拿我的個人博客站來說,如果你想要經常看我的文章,又不想經常來我的站看,只想看大家都關注的那些文章,或者評論比較多的文章,所以我們接着將我們的代碼進行更改:

import asyncio
import argparse
import logging
import random
from urllib.parse import urlparse, parse_qs
from datetime import datetime
import aiohttp
import async_timeout
LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
FETCH_TIMEOUT = 10
# 我們設置的評論的閾值
MIN_COMMENTS = 2
parser = argparse.ArgumentParser(
    description='獲取所有請求url的所有評論')
parser.add_argument('--id', type=int, default=8863,
                    help='請求的id, 默認id 是8863')
parser.add_argument('--url', type=str, help='HN的url地址')
parser.add_argument('--verbose', action='store_true', help='詳細的輸出')
logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)
fetch_counter = 0
async def fetch(session, url):
    """
    通過aiohttp訪問url並返回json格式數據
    """
    global fetch_counter
    with async_timeout.timeout(FETCH_TIMEOUT):
        fetch_counter += 1
        # 因為接口需要翻牆才能訪問,所以這里我用的是我本地的代理
        async with session.get(url,proxy='http://127.0.0.1:1081') as response:
            return await response.json()
async def post_number_of_comments(loop, session, post_id):
    """
    遞歸獲取當前請求url的所有評論
    """
    url = URL_TEMPLATE.format(post_id)
    now = datetime.now()
    response = await fetch(session, url)
    log.debug('{:^6} > Fetching of {} took {} seconds'.format(
        post_id, url, (datetime.now() - now).total_seconds()))
    if 'kids' not in response:  #表示沒有評論
        return 0
    # 獲取當前請求的url的評論的數量
    number_of_comments = len(response['kids'])
    log.debug('{:^6} > Fetching {} child posts'.format(
        post_id, number_of_comments))
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]
    # 獲取所有任務的結果
    results = await asyncio.gather(*tasks)
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))
    if number_of_comments> MIN_COMMENTS:
        await email_post(response)
    return number_of_comments
async def email_post(post):
    """
    模擬發郵件的動作,並沒有真的發郵件
    """
    await asyncio.sleep(random.random()*3)
    log.info("email send success")
def id_from_HN_url(url):
    """
    獲取運行時傳遞的參數中的id
    """
    parse_result = urlparse(url)
    try:
        return parse_qs(parse_result.query)['id'][0]
    except (KeyError, IndexError):
        return None
async def main(loop, post_id):
    now = datetime.now()
    async with aiohttp.ClientSession(loop=loop) as session:
        now = datetime.now()
        comments = await post_number_of_comments(loop, session, post_id)
        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() - now).total_seconds(), fetch_counter))
    return comments
if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)
    post_id = id_from_HN_url(args.url) if args.url else args.id
    loop = asyncio.get_event_loop()
    comments = loop.run_until_complete(main(loop, post_id))
    log.info("-- Post {} has {} comments".format(post_id, comments))
    loop.close()

運行結果如下:

[23:24:17] email send success
[23:24:18] email send success
[23:24:18] email send success
[23:24:19] email send success
[23:24:19] email send success
[23:24:20] email send success
[23:24:21] email send success
[23:24:21] email send success
[23:24:24] email send success
[23:24:24] > Calculating comments took 10.09 seconds and 73 fetches
[23:24:24] -- Post 8863 has 72 comments

你會發現這次花費的時間比我們之前多了,因為我們在發送郵件的地方是 await email_post(response) 那么我們的的程序再這里就會等到知道這個任務完成,其實對我們來說我們更關注的是我們的主要任務,獲取所有的評論結果,而發送郵件通知我們的次級任務,那么我們需要怎么改進,讓我們的主要的任務繼續執行,不用去等待子任務的執行呢?在asyncio的api文檔中有ensure_future ,這個需要注意:在python3.7之前用的是這個方法,但3.7之后更推薦用create_task的方法 具體地址為:https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
這里明確說明了:

asyncio.create_task(coro)
Wrap the coro coroutine into a Task and schedule its execution. Return the Task object.
The task is executed in the loop returned by get_running_loop(), RuntimeError is raised if there is no running loop in current thread.
This function has been added in Python 3.7. Prior to Python 3.7, the low-level asyncio.ensure_future() function can be used instead:

 

通過這個方法我們可以將我們的任務安排一個協程運行,將其包裝在Task對象中並返回它,既然這樣我們就將代碼繼續更改:
將await email_post(response) 這樣代碼替換為:asyncio.ensure_future(email_post(response))

但是當我們運行后發現不幸的事情發生了:

[23:40:06] email send success
[23:40:06] > Calculating comments took 3.30 seconds and 73 fetches
[23:40:06] -- Post 8863 has 72 comments
[23:40:06] Task was destroyed but it is pending!
task: <Task pending coro=<email_post() done, defined at ex1.py:76> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1087dde58>()]>>
[23:40:06] Task was destroyed but it is pending!
task: <Task pending coro=<email_post() done, defined at ex1.py:76> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108a9e4f8>()]>>
[23:40:06] Task was destroyed but it is pending!
task: <Task pending coro=<email_post() done, defined at ex1.py:76> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108a9e9a8>()]>>
[23:40:06] Task was destroyed but it is pending!
task: <Task pending coro=<email_post() done, defined at ex1.py:76> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108a9e918>()]>>
[23:40:06] Task was destroyed but it is pending!
task: <Task pending coro=<email_post() done, defined at ex1.py:76> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108a9ee88>()]>>
[23:40:06] Task was destroyed but it is pending!
task: <Task pending coro=<email_post() done, defined at ex1.py:76> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108a9ef48>()]>>
[23:40:06] Task was destroyed but it is pending!
task: <Task pending coro=<email_post() done, defined at ex1.py:76> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108a9efd8>()]>>
[23:40:06] Task was destroyed but it is pending!
task: <Task pending coro=<email_post() done, defined at ex1.py:76> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1087dde28>()]>>

看到這個錯誤不要慌,這個也是很多初學asyncio的或者剛開始用的時候都會碰到的問題,並且這個問題我們在上一篇asyncio的文章也說明了原因,在這里其實就是post_number_of_comments協程返回后立即強行關閉循環,讓我們的log_post任務沒有時間完成,怎么解決呢? 我們繼續改代碼:

import asyncio
import argparse
import logging
import random
from urllib.parse import urlparse, parse_qs
from datetime import datetime
import aiohttp
import async_timeout
LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
FETCH_TIMEOUT = 10
# 我們設置的評論的閾值
MIN_COMMENTS = 2
parser = argparse.ArgumentParser(
    description='獲取所有請求url的所有評論')
parser.add_argument('--id', type=int, default=8863,
                    help='請求的id, 默認id 是8863')
parser.add_argument('--url', type=str, help='HN的url地址')
parser.add_argument('--verbose', action='store_true', help='詳細的輸出')
logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)
fetch_counter = 0
async def fetch(session, url):
    """
    通過aiohttp訪問url並返回json格式數據
    """
    global fetch_counter
    with async_timeout.timeout(FETCH_TIMEOUT):
        fetch_counter += 1
        # 因為接口需要翻牆才能訪問,所以這里我用的是我本地的代理
        async with session.get(url,proxy='http://127.0.0.1:1081') as response:
            return await response.json()
async def post_number_of_comments(loop, session, post_id):
    """
    遞歸獲取當前請求url的所有評論
    """
    url = URL_TEMPLATE.format(post_id)
    now = datetime.now()
    response = await fetch(session, url)
    log.debug('{:^6} > Fetching of {} took {} seconds'.format(
        post_id, url, (datetime.now() - now).total_seconds()))
    if 'kids' not in response:  #表示沒有評論
        return 0
    # 獲取當前請求的url的評論的數量
    number_of_comments = len(response['kids'])
    log.debug('{:^6} > Fetching {} child posts'.format(
        post_id, number_of_comments))
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]
    # 獲取所有任務的結果
    results = await asyncio.gather(*tasks)
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))
    if number_of_comments> MIN_COMMENTS:
        # await email_post(response)
        asyncio.ensure_future(email_post(response))
    return number_of_comments
async def email_post(post):
    """
    模擬發郵件的動作,並沒有真的發郵件
    """
    await asyncio.sleep(random.random()*3)
    log.info("email send success")
def id_from_HN_url(url):
    """
    獲取運行時傳遞的參數中的id
    """
    parse_result = urlparse(url)
    try:
        return parse_qs(parse_result.query)['id'][0]
    except (KeyError, IndexError):
        return None
async def main(loop, post_id):
    now = datetime.now()
    async with aiohttp.ClientSession(loop=loop) as session:
        now = datetime.now()
        comments = await post_number_of_comments(loop, session, post_id)
        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() - now).total_seconds(), fetch_counter))
    return comments
if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)
    post_id = id_from_HN_url(args.url) if args.url else args.id
    loop = asyncio.get_event_loop()
    comments = loop.run_until_complete(main(loop, post_id))
    log.info("-- Post {} has {} comments".format(post_id, comments))
    pending_tasks = [
        task for task in asyncio.Task.all_tasks() if not task.done()
    ]
    loop.run_until_complete(asyncio.gather(*pending_tasks))
    loop.close()

運行之后結果如下:

[23:47:24] email send success
[23:47:25] email send success
[23:47:25] > Calculating comments took 3.29 seconds and 73 fetches
[23:47:25] -- Post 8863 has 72 comments
[23:47:25] email send success
[23:47:25] email send success
[23:47:25] email send success
[23:47:26] email send success
[23:47:26] email send success
[23:47:27] email send success
[23:47:27] email send success

一切似乎又恢復了正常,這里我們用到了asyncio的一個方法
asyncio.Task.all_tasks()

這個其實還是非常有用的可以獲取當前我們的loop的所有的任務的情況,我們這里是通過task.done() 來判斷任務是否完成了,從而把沒有讓沒有完成的任務都能夠繼續完成,但是我們這樣做有一個不好的地方就是asyncio.Task.all_tasks() 將所有的任務都拿到手了,可是有些並不是我們關注的,我們就只想要控制我們自己關注的,那么我們就可以將發郵件這個次級任務專門放到一起,這樣方面我們后面處理,代碼更改為:

import asyncio
import argparse
import logging
import random
from urllib.parse import urlparse, parse_qs
from datetime import datetime
import aiohttp
import async_timeout
LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
FETCH_TIMEOUT = 10
# 我們設置的評論的閾值
MIN_COMMENTS = 2
parser = argparse.ArgumentParser(
    description='獲取所有請求url的所有評論')
parser.add_argument('--id', type=int, default=8863,
                    help='請求的id, 默認id 是8863')
parser.add_argument('--url', type=str, help='HN的url地址')
parser.add_argument('--verbose', action='store_true', help='詳細的輸出')
logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)
fetch_counter = 0
async def fetch(session, url):
    """
    通過aiohttp訪問url並返回json格式數據
    """
    global fetch_counter
    with async_timeout.timeout(FETCH_TIMEOUT):
        fetch_counter += 1
        # 因為接口需要翻牆才能訪問,所以這里我用的是我本地的代理
        async with session.get(url,proxy='http://127.0.0.1:1081') as response:
            return await response.json()
async def post_number_of_comments(loop, session, post_id):
    """
    遞歸獲取當前請求url的所有評論
    """
    url = URL_TEMPLATE.format(post_id)
    now = datetime.now()
    response = await fetch(session, url)
    log.debug('{:^6} > Fetching of {} took {} seconds'.format(
        post_id, url, (datetime.now() - now).total_seconds()))
    if 'kids' not in response:  #表示沒有評論
        return 0
    # 獲取當前請求的url的評論的數量
    number_of_comments = len(response['kids'])
    log.debug('{:^6} > Fetching {} child posts'.format(
        post_id, number_of_comments))
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]
    # 獲取所有任務的結果
    results = await asyncio.gather(*tasks)
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))
    if number_of_comments> MIN_COMMENTS:
        # await email_post(response)
        task_registry.append(asyncio.ensure_future(email_post(response)))
    return number_of_comments
async def email_post(post):
    """
    模擬發郵件的動作,並沒有真的發郵件
    """
    await asyncio.sleep(random.random()*3)
    log.info("email send success")
def id_from_HN_url(url):
    """
    獲取運行時傳遞的參數中的id
    """
    parse_result = urlparse(url)
    try:
        return parse_qs(parse_result.query)['id'][0]
    except (KeyError, IndexError):
        return None
async def main(loop, post_id):
    now = datetime.now()
    async with aiohttp.ClientSession(loop=loop) as session:
        now = datetime.now()
        comments = await post_number_of_comments(loop, session, post_id)
        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() - now).total_seconds(), fetch_counter))
    return comments
if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)
    post_id = id_from_HN_url(args.url) if args.url else args.id
    task_registry = []  # 用於存放我們發送郵件的次級任務
    loop = asyncio.get_event_loop()
    comments = loop.run_until_complete(main(loop, post_id))
    log.info("-- Post {} has {} comments".format(post_id, comments))
    pending_tasks = [
        task for task in task_registry if not task.done()
    ]
    loop.run_until_complete(asyncio.gather(*pending_tasks))
    loop.close()

執行結果如下:

[23:54:10] > Calculating comments took 8.33 seconds and 73 fetches
[23:54:10] -- Post 8863 has 72 comments
[23:54:11] email send success
[23:54:11] email send success
[23:54:11] email send success
[23:54:12] email send success
[23:54:12] email send success
[23:54:12] email send success
[23:54:12] email send success
[23:54:13] email send success
[23:54:13] email send success

看到這里,你是不是發現其實python的asyncio也沒有那么難,貌似還挺好用的,那么我們接着最后一部分

三、asyncio之—-華山論劍

通過上面的代碼的不斷改進, 我們也漸漸更加熟悉asyncio 的用法,但是相對來說還是太簡單,因為到目前為止,我們都在爬取一個url 下的所有評論,那么如果我想要獲取多個url下的評論信息需要怎么改進代碼。在HN 的API文檔中有一個獲取top 500的接口, 那么我們只獲取前500中的前幾個的所有評論,當然這個top 500 的內容每天肯能都會更新,甚至可能一天之內都會更新,所以我們的任務需要可以獲取一次之后過一會再次獲取一次數據,這樣我們就能總是獲取最新的數據,我們將代碼繼續改進:

import asyncio
import argparse
import logging
from datetime import datetime
import aiohttp
import async_timeout
LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json"
FETCH_TIMEOUT = 10
parser = argparse.ArgumentParser(
    description='獲取Hacker News 文章的評論數')
parser.add_argument(
    '--period', type=int, default=5, help='每個任務的間隔時間')
parser.add_argument(
    '--limit', type=int, default=5,
    help='獲取top 500的前n 數量內容默認是前500的前5個')
parser.add_argument('--verbose', action='store_true', help='更加詳細的輸出')
logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)
fetch_counter = 0
async def fetch(session, url):
    """
    請求url地址返回json格式數據
    """
    global fetch_counter
    with async_timeout.timeout(FETCH_TIMEOUT):
        fetch_counter += 1
        async with session.get(url, proxy="http://127.0.0.1:1080") as response:
            return await response.json()
async def post_number_of_comments(loop, session, post_id):
    """
    獲取當前文章的數據,並遞歸獲取所有評論
    """
    url = URL_TEMPLATE.format(post_id)
    now = datetime.now()
    response = await fetch(session, url)
    log.debug('{:^6} > Fetching of {} took {} seconds'.format(
        post_id, url, (datetime.now() - now).total_seconds()))
    if 'kids' not in response:  # 沒有評論
        return 0
    # 獲取當前文章的評論數量
    number_of_comments = len(response['kids'])
    log.debug('{:^6} > Fetching {} child posts'.format(
        post_id, number_of_comments))
    tasks = [post_number_of_comments(
        loop, session, kid_id) for kid_id in response['kids']]
    # 這里遞歸請求獲取每條評論的評論
    results = await asyncio.gather(*tasks)
    # 獲取當前文章的總評論數
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))
    return number_of_comments
async def get_comments_of_top_stories(loop, session, limit, iteration):
    """
    獲取top 500de 前5個
    """
    response = await fetch(session, TOP_STORIES_URL)
    tasks = [post_number_of_comments(
        loop, session, post_id) for post_id in response[:limit]]
    results = await asyncio.gather(*tasks)
    for post_id, num_comments in zip(response[:limit], results):
        log.info("Post {} has {} comments ({})".format(
            post_id, num_comments, iteration))
async def poll_top_stories_for_comments(loop, session, period, limit):
    """
    定時去請求獲取前top 500 url
    """
    global fetch_counter
    iteration = 1
    while True:
        now = datetime.now()
        log.info("Calculating comments for top {} stories. ({})".format(
            limit, iteration))
        await get_comments_of_top_stories(loop, session, limit, iteration)
        log.info(
            '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                (datetime.now() - now).total_seconds(), fetch_counter))
        log.info("Waiting for {} seconds...".format(period))
        iteration += 1
        fetch_counter = 0
        #  每個任務的間隔
        await asyncio.sleep(period)
async def main(loop, period, limit):
    async with aiohttp.ClientSession(loop=loop) as session:
        comments = await poll_top_stories_for_comments(loop, session, period, limit)
    return comments
if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop, args.period, args.limit))
    loop.close()

查看運行結果如下:

[16:24:28] Calculating comments for top 5 stories. (1)
[16:24:41] Post 19334909 has 156 comments (1)
[16:24:41] Post 19333600 has 147 comments (1)
[16:24:41] Post 19335363 has 9 comments (1)
[16:24:41] Post 19330812 has 341 comments (1)
[16:24:41] Post 19333479 has 81 comments (1)
[16:24:41] > Calculating comments took 12.17 seconds and 740 fetches
[16:24:41] Waiting for 5 seconds...
[16:24:46] Calculating comments for top 5 stories. (2)
[16:24:50] Post 19334909 has 156 comments (2)
[16:24:50] Post 19333600 has 147 comments (2)
[16:24:50] Post 19335363 has 9 comments (2)
[16:24:50] Post 19330812 has 341 comments (2)
[16:24:50] Post 19333479 has 81 comments (2)
[16:24:50] > Calculating comments took 4.75 seconds and 740 fetches
[16:24:50] Waiting for 5 seconds...
Traceback (most recent call last):

運行結果我們看出來其實我們的每個任務並不是間隔5s,因為我的任務在 await get_comments_of_top_stories(loop, session, limit, iteration)
我們必須等到這個地方完成之后才會進入下次循環,但是其實有時候我們並不想等待,而是直接想要繼續往下走,那么我們還是通過老辦法通過ensure_future 實現,我們將那一行代碼更改為:

asyncio.ensure_future(get_comments_of_top_stories(loop, session, limit, iteration))

再次運行結果之后:

[16:44:07] Calculating comments for top 5 stories. (1)
[16:44:07] > Calculating comments took 0.00 seconds and 0 fetches
[16:44:07] Waiting for 5 seconds...
[16:44:12] Calculating comments for top 5 stories. (2)
[16:44:12] > Calculating comments took 0.00 seconds and 49 fetches
[16:44:12] Waiting for 5 seconds...
[16:44:17] Calculating comments for top 5 stories. (3)
[16:44:17] > Calculating comments took 0.00 seconds and 1044 fetches
[16:44:17] Waiting for 5 seconds...
[16:44:21] Post 19334909 has 159 comments (1)
[16:44:21] Post 19333600 has 150 comments (1)
[16:44:21] Post 19335363 has 13 comments (1)
[16:44:21] Post 19330812 has 342 comments (1)
[16:44:21] Post 19333479 has 81 comments (1)
[16:44:22] Post 19334909 has 159 comments (3)
[16:44:22] Post 19333600 has 150 comments (3)
[16:44:22] Post 19335363 has 13 comments (3)
[16:44:22] Post 19330812 has 342 comments (3)
[16:44:22] Post 19333479 has 81 comments (3)
[16:44:22] Calculating comments for top 5 stories. (4)
[16:44:22] > Calculating comments took 0.00 seconds and 1158 fetches
[16:44:22] Waiting for 5 seconds...
[16:44:23] Post 19334909 has 159 comments (2)
[16:44:23] Post 19333600 has 150 comments (2)
[16:44:23] Post 19335363 has 13 comments (2)
[16:44:23] Post 19330812 has 342 comments (2)
[16:44:23] Post 19333479 has 81 comments (2)
[16:44:26] Post 19334909 has 159 comments (4)
[16:44:26] Post 19333600 has 150 comments (4)
[16:44:26] Post 19335363 has 13 comments (4)
[16:44:26] Post 19330812 has 343 comments (4)
[16:44:26] Post 19333479 has 81 comments (4)
[16:44:27] Calculating comments for top 5 stories. (5)
[16:44:27] > Calculating comments took 0.00 seconds and 754 fetches
[16:44:27] Waiting for 5 seconds...

這樣我們每次任務的間隔倒是是5s了但是又一個問題出現了,花費0s並且0個fetch到,並且續的fetch數量也都不對 ,其實造成這個的原因都是因為不再等待get_comments_of_top_stories(loop, session, limit, iteration)造成的

這個時候你是不是又想到了你的老朋友 callback 呢 哈哈哈! 改進代碼如下:

import asyncio
import argparse
import logging
from datetime import datetime
import aiohttp
import async_timeout
LOGGER_FORMAT = '%(asctime)s %(message)s'
URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json"
TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json"
FETCH_TIMEOUT = 10
parser = argparse.ArgumentParser(
    description='獲取Hacker News 文章的評論數')
parser.add_argument(
    '--period', type=int, default=5, help='每個任務的間隔時間')
parser.add_argument(
    '--limit', type=int, default=5,
    help='獲取top 500的前n 數量內容默認是前500的前5個')
parser.add_argument('--verbose', action='store_true', help='更加詳細的輸出')
logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]')
log = logging.getLogger()
log.setLevel(logging.INFO)
class URLFetcher():
    def __init__(self):
        self.fetch_counter = 0
    async def fetch(self, session, url):
        with async_timeout.timeout(FETCH_TIMEOUT):
            self.fetch_counter += 1
            async with session.get(url, proxy="http://127.0.0.1:1080") as response:
                return await response.json()
async def post_number_of_comments(loop, session, fetcher, post_id):
    """
    獲取當前文章的數據,並遞歸獲取所有評論
    """
    url = URL_TEMPLATE.format(post_id)
    response = await fetcher.fetch(session, url)
    # 沒有評論
    if response is None or 'kids' not in response:
        return 0
    number_of_comments = len(response['kids'])
    # # 獲取當前文章的評論數量
    tasks = [post_number_of_comments(
        loop, session, fetcher, kid_id) for kid_id in response['kids']]
    # 這里遞歸請求獲取每條評論的評論
    results = await asyncio.gather(*tasks)
    # 獲取當前文章的總評論數
    number_of_comments += sum(results)
    log.debug('{:^6} > {} comments'.format(post_id, number_of_comments))
    return number_of_comments
async def get_comments_of_top_stories(loop, session, limit, iteration):
    """
    獲取top 500de 前5個
    """
    fetcher = URLFetcher()
    response = await fetcher.fetch(session, TOP_STORIES_URL)
    tasks = [post_number_of_comments(
        loop, session, fetcher, post_id) for post_id in response[:limit]]
    results = await asyncio.gather(*tasks)
    for post_id, num_comments in zip(response[:limit], results):
        log.info("Post {} has {} comments ({})".format(
            post_id, num_comments, iteration))
    return fetcher.fetch_counter
async def poll_top_stories_for_comments(loop, session, period, limit):
    """
    定時去請求獲取前top 500 url
    """
    iteration = 1
    while True:
        log.info("Calculating comments for top {} stories. ({})".format(
            limit, iteration))
        future = asyncio.ensure_future(
            get_comments_of_top_stories(loop, session, limit, iteration))
        now = datetime.now()
        # 這里通過回調的方式獲取每次爬取評論的耗時以及爬取的評論的數量
        def callback(fut):
            fetch_count = fut.result()
            log.info(
                '> Calculating comments took {:.2f} seconds and {} fetches'.format(
                    (datetime.now() - now).total_seconds(), fetch_count))
        future.add_done_callback(callback)
        log.info("Waiting for {} seconds...".format(period))
        iteration += 1
        await asyncio.sleep(period)
async def main(loop, period, limit):
    async with aiohttp.ClientSession(loop=loop) as session:
        comments = await poll_top_stories_for_comments(loop, session, period, limit)
    return comments
if __name__ == '__main__':
    args = parser.parse_args()
    if args.verbose:
        log.setLevel(logging.DEBUG)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop, args.period, args.limit))
    loop.close()

這次當我們再次執行代碼運行結果如下:

[17:00:17] Calculating comments for top 5 stories. (1)
[17:00:17] Waiting for 5 seconds...
[17:00:22] Calculating comments for top 5 stories. (2)
[17:00:22] Waiting for 5 seconds...
[17:00:27] Calculating comments for top 5 stories. (3)
[17:00:27] Waiting for 5 seconds...
[17:00:30] Post 19334909 has 163 comments (1)
[17:00:30] Post 19333600 has 152 comments (1)
[17:00:30] Post 19335363 has 14 comments (1)
[17:00:30] Post 19330812 has 346 comments (1)
[17:00:30] Post 19335853 has 1 comments (1)
[17:00:30] > Calculating comments took 2.31 seconds and 682 fetches
[17:00:32] Calculating comments for top 5 stories. (4)
[17:00:32] Waiting for 5 seconds...
[17:00:33] Post 19334909 has 163 comments (2)
[17:00:33] Post 19333600 has 152 comments (2)
[17:00:33] Post 19335363 has 14 comments (2)
[17:00:33] Post 19330812 has 346 comments (2)
[17:00:33] Post 19335853 has 1 comments (2)
[17:00:33] > Calculating comments took 0.80 seconds and 682 fetches
[17:00:34] Post 19334909 has 163 comments (3)
[17:00:34] Post 19333600 has 152 comments (3)
[17:00:34] Post 19335363 has 14 comments (3)
[17:00:34] Post 19330812 has 346 comments (3)
[17:00:34] Post 19335853 has 1 comments (3)
[17:00:34] > Calculating comments took 1.24 seconds and 682 fetches
[17:00:37] Calculating comments for top 5 stories. (5)
[17:00:37] Waiting for 5 seconds...
[17:00:42] Post 19334909 has 163 comments (5)
[17:00:42] Post 19333600 has 152 comments (5)
[17:00:42] Post 19335363 has 15 comments (5)
[17:00:42] Post 19330812 has 346 comments (5)
[17:00:42] Post 19335853 has 1 comments (5)
[17:00:42] > Calculating comments took 4.55 seconds and 683 fetches
[17:00:42] Calculating comments for top 5 stories. (6)
[17:00:42] Waiting for 5 seconds...

到這里為止,我們的代碼基本已經改的可以了,我們的結果也終於達到了一個我們滿意的結果。

四、小結

其實對我個人來說,在整理整理之前我自己對asyncio的用法也有很多地方理解的不清楚,也是摸着石頭過河,碰到問題解決問題,在整理的過程中,其實對我自己來說很多之前模糊的地方也清晰了很多。
同時也歡迎大家也來分享自己使用python asyncio 的相關知識,歡迎加入群號:948510543

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM