python3.6異步IO包asyncio部分核心源碼思路梳理


關於python異步編程的演進過程,兩篇文章闡述得妥妥當當,明明白白。

中文資料:https://mp.weixin.qq.com/s?__biz=MzIxMjY5NTE0MA==&mid=2247483720&idx=1&sn=f016c06ddd17765fd50b705fed64429c

英文資料:http://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html

其實中文資料就是參考的英文資料,英文資料是開源書《500 Lines or Less》中的一個主題章節,整書地址:https://github.com/aosabook/500lines

 

python的asyncio源碼的核心思路其實跟基於生成器的協程異步編程思路大體一致,只是前者做了大量的代碼優化和功能擴充。所以對照生成器協程代碼來理解asyncio是很有幫助的。以下的這一小段代碼就是采用基於生成器的協程的異步編程方式寫的一個小爬蟲案例,來自上述中文資料,asyncio的核心代碼的思路大體上能從這段代碼中找到原型。

該腳本命名為:yield_from.py

import socket
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stopped = False
urls_todo = {"/", "/1", "/2", "/3"}

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

    def __iter__(self):
        """
        yield的出現使得__iter__函數變成一個生成器,生成器本身就有next方法,所以不需要額外實現。
        yield from x語句首先調用iter(x)獲取一個迭代器(生成器也是迭代器)
        """
        yield self  # 外面使用yield from把f實例本身返回
        return self.result  # 在Task.step中send(result)的時候再次調用這個生成器,但是此時會拋出stopInteration異常,並且把self.result返回

def connect(sock, address):
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())

def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    """
    此處的chunck接收的是f中return的f.result,同時會跑出一個stopIteration的異常,只不過被yield from處理了。
    這里也可直接寫成chunck = yiled f
    """
    chunck = yield from f
    selector.unregister(sock.fileno())
    return chunck

def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)
    return b"".join(response)

class Crawler:
    def __init__(self, url):
        self.url = url
        self.response = b""

    def fetch(self):
        global stopped
        sock = socket.socket()
        yield from connect(sock, ("xkcd.com", 80))
        get = "GET {0} HTTP/1.0\r\nHost:xkcd.com\r\n\r\n".format(self.url)
        sock.send(get.encode('ascii'))
        self.response = yield from read_all(sock)
        print(self.response)
        urls_todo.remove(self.url)
        if not urls_todo:
            stopped = True

class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)  # 激活Task包裹的生成器

    def step(self, future):
        try:
            # next_future = self.coro.send(future.result)
            next_future = self.coro.send(None)  # 驅動future
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

if __name__ == "__main__":
    import time
    start = time.time()
    for url in urls_todo:
        crawler = Crawler(url)
        Task(crawler.fetch())
    loop()
    print(time.time() - start)

 

下面用asyncio寫一個簡單的案例,通過對照上述基於生成器協程的代碼來梳理asyncio源碼的邏輯。

基於asyncio編寫的小demo,該腳本命名為:asyncio_test.py

import asyncio
import time

async def get_html(url):
    print("start get url")
    await asyncio.sleep(3)
    await asyncio.sleep(3)
    print("end get url")

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()

    cora = get_html("http://www.baidu.com")
    task = loop.create_task(cora)
    loop.run_until_complete(task)

    # tasks = [get_html("http://www.baidu.com") for i in range(10)]
    # loop.run_until_complete(asyncio.wait(tasks))
    print(time.time() - start_time)

asyncio源碼包位置在python安裝目錄下的Lib/asyncio下。

首先是loop = asyncio.get_event_loop(),光是看名字就知道和yield_from.py中的loop有不可告人的關系,其實這個loop就是獲取一個事件循環,不停地循環檢測是否有事件准備好,如果有,則立刻調用注冊在事件上的回調函數,直到stopped置位才退出循環。get_event_loop()返回一個事件循環類,該類會繼承BaseEventLoop,BaseEventLoop才是分析的重點,因為很多重要函數的具體實現在這個類中,BaseEventLoop位於asyncio/base_events.py中。

一、create_task()

接下來看asyncio_test.py代碼中的task = loop.create_task()都干了嘛。

在asyncio/base_events.py中找到BaseEventLoop定義,在BaseEventLoop類中有個create_task方法,看看這個方法的的代碼

    def create_task(self, coro):  
        self._check_closed()
        if self._task_factory is None:  
            task = tasks.Task(coro, loop=self)
            if task._source_traceback:
                del task._source_traceback[-1]
        else:
            task = self._task_factory(self, coro)
        return task

這個函數接收一個協程作為參數,其核心代碼其實就如下兩句,新建一個Task類實例並返回。

def create_task(self, coro):    
        task = tasks.Task(coro, loop=self)
        return task

Task類的定義在asyncio/tasks.py中,這是理解asyncio邏輯的一個比較關鍵的類,上面新建Task類實例時,傳入的coro就是通過這個類中實現的邏輯來一步一步驅動的,這個Task類看起來挺復雜的,其實和上面yield_from.py中的Task類的作用很相似,對照着分析還是不難的。

二、Task類

先來簡單捋一捋yield_from.py中Task類的邏輯。

在yield_from.py中Task類實例都會包裹一個協程(即self.coro),然后通過step函數中的send方法來驅動包裹在實例中的協程。協程可以理解為由多個future組合而成,一個future完成了才能讓下一個future上場,future翻譯為未來對象,要怎么理解這個呢?想了半天沒想出個好的生活場景,那就粗糙一點吧,想象一下食堂排隊打飯,每個排隊的都是一個future,只有當前排到的future打完了才能yield from下一個future,而整個隊伍就是一個協程,只有整個隊伍都打完了協程才算結束,這實在很牽強,不過意思到了就差不多吧,看懂代碼才能有精准的理解,這里只是一個大概意思,輔助更快理解代碼邏輯。畫了一張圖表達一下心意,后面還會有future代碼的分析。

 

 

 即,Task類中包裹一個協程,協程可理解為包含多個future類,多個future按順序來執行,一個future死了,下一個才能活。

在step中yield from回來的next_future,又會把當前實例的step方法注冊為其回調函數,所以每個future結束,都會調用step方法,以此來激活下一個future,不斷推進協程向下執行,直到沒有future了,就拋出StopIteration,來結束該Task。

其實asyncio中的Task類跟這個邏輯是一樣的,首先,它也會包裹一個協程,再去看下create_task的時候,是不是傳入了一個coro參數,在Task類的__init__函數中有個self._coro 來接收保存這個協程。另外,Task類中有一個_step函數,其作用就類似於yield_from.py中的step函數。找到_step函數

 

 畫框這里是不是和yield_from.py中的step函數中的很像,沒錯,是他是他就是他,很類似的邏輯,只是_step中多了很多參數檢查和其他一下異常檢查等,如果我們只關注核心邏輯,那些都可以暫時忽略掉。

可類似歸類似,差別還是有的:

在yield_from.py中Task實例一創建,在Task實例的__init__函數中就會馬上調用self.step()函數來激活被其包裹的協程,那asyncio中的Task實例是如何激活它的協程的呢?

其實答案也在__init__函數中,只不過這里不是馬上就調用_step函數,而是會在下一幀(一次循環就是一幀)循環的時候調用,來看下Task類的__init__函數

 

 這里調用call_soon函數來把該Task實例的_step函數添加到待執行隊列中,call_soon函數也是定義在asyncio/base_events.py的BaseEventLoop類中的,點過去看下,其中代碼對我們本次分析而言,有用的就一句:handle = self._call_soon(callback, args),調用_call_soon函數把callback注冊到待執行隊列中。往下翻,就能看到self._call_soon:

    def _call_soon(self, callback, args):
        handle = events.Handle(callback, args, self)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)  # 事件添加到隊列
        return handle

很容易理解,就是返回一個Handle類,然后把它添加到self._ready隊列中,這里稍微解釋一下Handle類和self._ready隊列的作用,點到Handle類的定義很容易就知道,它其實就是包裹了就緒事件的回調函數的,其中定義了一個run方法,就是直接執行回調函數,而self._ready保存着Handle類的實例,我們由yield_from.py中可以知道,有個loop死循環不斷檢測是否有事件就緒,一旦有就緒事件,就調用其回調函數,在asyncio中當然也是有這種死循環的,后面會講到,這個循環也是不斷檢測self._ready是否有為空,不為空就從其中彈出Handle實例,然后調用handle實例的run方法,說白了就是執行注冊在就緒事件上的回調函數。

接着回到Task類。

Task實例初始化時,就通過call_soon把self._step添加到_ready隊列中,所以下一輪循環中會從_ready中彈出包裹_step函數的handle,然后執行_step,這樣就實現了激活task包裹的協程。這樣來看,和yield_from.py中的Task類是不是很類似,只是一個在初始化時立馬執行step函數,一個是在下一輪循環中執行_step,其實也沒啥區別。

現在在asyncio_test.py中,已經通過task = loop.create_task(cora)創建了一個task實例,該task實例包裹了我們自己定義的協程cora,並且在task初始化的時候在__init__函數中通過call_soon通知下一次循環立即執行task的_step函數來激活cora協程。接下來就是run_until_complete函數登場了

三、run_until_complete

這個函數同樣是定義在asyncio/base_events.py的BaseEventLoop類中。在這個函數中,我們上述提到的死循環(類比yield_from.py中的loop())就要閃亮登場了,點到函數定義處(把不妨礙本次說明的部分代碼刪除了)

    def run_until_complete(self, future):
        future = tasks.ensure_future(future, loop=self)  # ensure_future,即,確保是future。返回的是future(task也是future)

        future.add_done_callback(_run_until_complete_cb)  # 用來結束循環
        try:
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')

        return future.result()

首先調用ensure_future來確保傳進來的future參數是個future,我們之前說過Task是繼承自Future的,所以task也是future,而我們外面傳進來的參數是個task實例,所以這個函數調用返回的其實就是本身(傳進去是啥返回就是啥),然后給我們傳進來的task實例通過調用add_done_callback添加_run_until_complete_cb回調函數,這個回調函數比較關鍵,run_until_complete的做的最重要的事就是給傳進來的task實例添加這個回調,點進_run_until_complete_cb,可以看到就是調用了loop的stop函數,這個的意義就是,當我們傳進來的task包裹的協程運行結束后,就調用這個回調,跳出循環(就是相當於yield_from.py中的stopped變量的作用),否則死循環就真的是死循環了,永遠跳不出。

然后就是調用run_forever,死循環正式登場

四、run_forever

這個函數前面一長串,但是現在,我們統統都不看,只看關鍵地方,刪除無關代碼后,只留下:

    def run_forever(self):try:
            events._set_running_loop(self)
            while True:
                self._run_once()
                if self._stopping:
                    break
        finally:
            ...

是不是看起來和yield_from.py中的loop函數像極了,這個循環不斷地調用_run_once(),就像yield_from.py的loop函數中不斷地調用下面這段代碼:

events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

所以推測,_run_once()中是不是真的就是實現上述代碼的邏輯呢?沒錯的,是他是他就是他。

點進_run_once()看一下,這個函數的代碼量有點多了,主要是這里面還實現了一個定時功能(asyncio.sleep()),關於這個功能不展開了,其實也很簡單,主要還是抓住我們的主線來講,我們來看_run_once()中是如何實現上述代碼段的邏輯的,注意到其中有這樣兩行代碼:

        else:
            event_list = self._selector.select(timeout)  # 篩選就緒事件,將其回調添加到self._ready中
        self._process_events(event_list)  # 該函數具體實現在selector_events.py中

這里的event_list = self._selector.select(timeout)和上述的events = selector.select()是不是很相似?這里也就是選出就緒事件,然后添加到self._ready隊列中,隨后執行,之前已經解釋過self._ready隊列的作用了,馬上就要登場的就是調用就緒事件的回調函數的執行,在_run_once()的尾部,我們看到如下代碼:

ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

這就是之前解釋過的self._ready的作用,先看_ready隊列中是否有待處理的Handle實例,如果有,那就一個一個執行,handle中的_run()方法就是執行就緒事件的回調函數。

至此,就把yield_from.py中的loop()函數的邏輯對應到了asyncio源碼的循環之中。

接下來,就來看看那個難以理解的Future類是怎么回四

五、Future類

首先看看yield_from.py中的Future類是怎么回事,就知道asyncio中的Future是怎么回事了,他們長得都很像。

    def __iter__(self):
        """
        yield的出現使得__iter__函數變成一個生成器,生成器本身就有next方法,所以不需要額外實現。
        yield from x語句首先調用iter(x)獲取一個迭代器(生成器也是迭代器)
        """
        yield self  # 外面使用yield from把f實例本身返回
        return self.result  # 在Task.step中send(result)的時候再次調用這個生成器,但是此時會拋出stopInteration異常,並且把self.result返回

Future類最關鍵也是最難理解的就是__iter__方法,__iter__中的yield的出現,使得__iter__變成一個生成器。

再通過yield_from.py中具體例子來梳理一下future的使用方法。

    def fetch(self):
        global stopped
        sock = socket.socket()
        yield from connect(sock, ("xkcd.com", 80))
        get = "GET {0} HTTP/1.0\r\nHost:xkcd.com\r\n\r\n".format(self.url)
        sock.send(get.encode('ascii'))
        self.response = yield from read_all(sock)
        print(self.response)
        urls_todo.remove(self.url)
        if not urls_todo:
            stopped = True

假設代碼已經執行到yield from read_all(sock)這一行,yield from的出現使得調用fetch生成器的調用方(即step中的send方法)和read_all()建立了一個直通的通道,數據流由send直接傳到read_all,再來read_all中看看,read_all中也有yield from read(sock),這個yield from讓send和read函數之間又建立了一個直連通道,再看看read函數有什么,read中又有一個yield from f,這次直接讓send和f建立了一個直連通道, f中的__iter__就只有一個yield了,不再有yield from,所以通道終於到頭了,於是整個下來就是send和f的__iter__生成器建立了一個直連通道,當上一個future執行完畢,調用其回調函數(即step函數)時,就會用send發送一個none。。。啊,講不清楚了,暴斃而亡,這邏輯太難表述了,如果把yield from句法弄清楚了,然后多看幾遍yield_from.py的代碼,反復研究,應該就能明白的,其實也不難理解,就是太難表述了,生成器一層嵌套一層。。。

把yield_from.py中的future搞懂了,再看asyncio中的future,發現其實結構是一樣的,功能也類似。

先寫到這里,已經凌晨三點,睡~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM