python協程總結


 

概述

python多線程中因為有GIL(Global Interpreter Lock 全局解釋器鎖 )的存在,所以對CPU密集型程序顯得很雞肋;但對IO密集型的程序,GIL會在調用IO操作前釋放,所以對IO密集型多線程還是挺有作用。

然而多線程是競爭型的,調度由CPU決定,有時會顯得沒那么容易控制;所以python中也實現了一種可以由程序自己來調度的異步方式,叫做協程。

 

協程是一種用戶態的輕量級線程,又稱微線程。

協程擁有自己的寄存器上下文和棧,調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

 

簡單說協程在執行某個子程序(函數)時,可以指定或者隨機地中斷,然后去執行其他的子程序(函數),在合適的時候再返回到中斷子程序停止時的狀態繼續執行。聽起像生成器的特性,實際上協程也是基於生成器的。所以協程是通過程序自身的控制,去切換不同任務,實現並發的效果。也就是協程是單線程執行,沒有多線程由CPU調度時線程切換的開銷,所以效率較高。

 

再多說直白一點就是:

多線程執行多個任務時,CPU分配線程資源給每個任務,每個任務並行(多核才行,每個單位時間內,一個CPU只能處理一個線程)的執行,但如果任務多了,並且線程有限,CPU會調度線程資源一會執行一個程序,在不同程序間切換。(並且由於python GIL存在,同一時刻只能執行一個線程任務,並行也就成了並發,宏觀上也實際是單線程(單核)了)。總結就是多線程由CPU分配調度線程資源給子程序。

而協程的執行不同,它是單一的線程(主線程),將這個線程從開始到結束的時間作為資源分配給子程序,每個子程序能使用這個時間資源可以由我們來控制。同時由於協程具有生成器那樣保存狀態的特性,遇到阻塞時可以去執行其他的程序,返回來執行時又不會丟失狀態,所以可以通過這種異步的方式實現單一線程的並發。

同時因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。

 

通過liaoxuefeng.com上的一個例子來演示下協程:

傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。

如果改用協程,生產者生產消息后,直接通過yield跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產,效率極高:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

輸出

[PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK

注意到consumer函數是一個generator,把一個consumer傳入produce后:

  1. 首先調用c.send(None)啟動生成器;

  2. 然后,一旦生產了東西,通過c.send(n)切換到consumer執行;

  3. consumer通過yield拿到消息,處理,又通過yield把結果傳回;

  4. produce拿到consumer處理的結果,繼續生產下一條消息;

  5. produce決定不生產了,通過c.close()關閉consumer,整個過程結束。

整個流程無鎖,由一個線程執行,produceconsumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。

注意的是,生成器啟動或恢復執行一次,將會在yield處暫停。上面的第1步僅僅執行到了yield r,並沒有執行到賦值語句 n = yield r ,到了第2步,生成器恢復執行通過send(n)才給consumer中n賦值。

send(value)方法:作用是發送值給yield表達式。啟動generator則是調用send(None)。具體流程,可以通過ide調試來直觀的看懂

但上面示例並不能體現協程並發的特性,下面由asyncio這內置庫來實現

 

asyncio (一)

(基於3.5后版本)

asyncio 是用來編寫並發代碼的庫,使用 async/await 語法。

asyncio 被用作多個提供高性能 Python 異步框架的基礎,包括網絡和網站服務,數據庫連接庫,分布式任務隊列等等。

關於asyncio的一些關鍵字的說明:

  • event_loop 事件循環:程序開啟一個無限循環,把一些函數注冊到事件循環上,當滿足事件發生的時候,調用相應的協程函數

  • coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會立即執行函數,而是會返回一個協程對象。協程對象需要注冊到事件循環,由事件循環調用。

  • task 任務:一個協程對象就是一個原生可以掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各種狀態

  • future: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質上的區別

  • async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。

 

創建協程

通過async關鍵字定義一個協程(coroutine),協程也是一種對象。下面say_after,main就是一個協程

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print("started ")
    s_time = time.time()
    await say_after(1, 'hello')
    await say_after(2, 'world')
    print("runtime : ",time.time()-s_time)
    print("finished ")

asyncio.run(main())

asyncio.run() 函數用來運行一個協程對象,這里我們將main()作為入口函數。await等待一個協程。上面代碼段會在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world"。asyncio.sleep表示阻塞多少秒,運行結果如下

started
hello
world
runtime :  3.000959634780884
finished

可以觀察到上面的代碼,是同步運行的,兩個await say_after之間遇到了阻塞。因為asyncio.run() 只是單純的運行一個協程,並不會並發運行

 

運行協程

運行協程對象的方法主要有:

1. 通過asyncio.run(main) 運行一個協程,同步的方式,主要用於運行入口協程

2. 在另一個已經運行的協程中用 `await` 等待它,比如上面運行了main協程,其中等待的say_after協程也會運行

3. 將協程封裝成task或future對象,然后掛到事件循環loop上,使用loop來運行。主要方法為loop.run_until_complete。此方法可以異步的並發運行

實際上參考源碼asyncio.run本質也是獲取loop,運行協程,即協程依靠loop運行

 

並發協程

asyncio.create_task() 函數用來並發運行多個協程,更改上面的例子

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print("started ")
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    s_time = time.time()
    await task1
    await task2
    print("runtime : ",time.time()-s_time)
    print("finished ")

asyncio.run(main())

運行輸出,比上面快一秒。這里我們使用create_task將協程封裝成task對象(會自動的添加到事件循環中),然后我們在main這個入口協程中掛起task1和task2。使用run運行main入口協程,它會自動檢測循環事件,並將等待task1和task2兩個task執行完成

started
hello
world
runtime :  2.0009524822235107
finished

asyncio.create_task方法實際是封裝了獲取事件循環asyncio.get_running_loop()與創建循環任務loop.create_task(coro)的一種高級方法,后面具體會講這些

 

可等待對象

跟在await后面的對象都是可等待對象,主要有協程, 任務 和 Future。

  • 協程對象:async def 的函數對象
  • task任務:將協程包裝成的一個任務(task)對象,用於注冊到事件循環上
  • Future:是一種特殊的低層級可等待對象,表示一個異步操作的最終結果

可等待的意思就是跳轉到等待對象,並將當前任務掛起。當等待對象的任務處理完了,才會跳回當前任務繼續執行。實際上與yield from功能相同,不同的是await后面是awaitable,yield from后面是生成器對象

yield from的一個示例(來源於https://zhuanlan.zhihu.com/p/30275154這篇協程演進講的很好)

def gen_3():
   yield 3

def gen_234():
   yield 2
   yield from gen_3()
   yield 4

def main():
   yield 1
   yield from gen_234()
   yield 5

for element in main():
   print(element)  

輸出
1
2
3
4
5

但是對於協程中進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序,如下面我們使用time.sleep()替代asyncio.sleep(),會發現在timesleep協程時程序阻塞,最后時間為4s

import asyncio
import time

async def say_after(delay, what):
    await timesleep(delay)
    return what

async def timesleep(delay):
    time.sleep(delay)

async def main():
    print("started ")
    task1 = asyncio.create_task(say_after(2, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    s_time = time.time()
    await task1
    await task2
    print(task1.result(),task2.result())
    print("runtime : ",time.time()-s_time)
    print("finished ")

asyncio.run(main())

如果將上面的改為如下

async def timesleep(delay):
    # time.sleep(delay)
    await asyncio.sleep(delay)

則最后運行時間為2s,這是因為asyncio.sleep()不同於time.sleep(),它其實在內部實現了一個future對象,事件循環會異步的等待這個對象完成

所以

在事件循環中,使用await可以針對耗時的操作進行掛起,就像生成器里的yield一樣,函數讓出控制權。對於task與future對象,await可以將他們掛在事件循環上,由於他們相比於協程對象增加了運行狀態(Pending、Running、Done、Cancelled等),事件循環則可以讀取他們的狀態,實現異步的操作,比如上面並發的示例。同時對於阻塞的操作(沒有實現異步的操作,如request就會阻塞,aihttp則不會),由於協程是單線程,會阻塞整個程序

 

asyncio (二)

事件循環

事件循環是每個 asyncio 應用的核心。 事件循環會運行異步任務和回調,執行網絡 IO 操作,以及運行子程序。

簡單說我們將協程任務(task)注冊到事件循環(loop)上,事件循環(loop)會循環遍歷任務的狀態,當任務觸發條件發生時就會執行對應的任務。類似JavaScript事件循環,當onclick被觸發時,就會執行對應的js腳本或者回調。同時當遇到阻塞,事件循環回去查找其他可運行的任務。所以事件循環被認為是一個循環,因為它在不斷收集事件並遍歷它們從而找到如何處理該事件。

通過以下偽代碼理解

while (1) {
    events = getEvents();
    for (e in events)
        processEvent(e);
}

所有的時間都在 while 循環中捕捉,然后經過事件處理者處理。事件處理的部分是系統唯一活躍的部分,當一個事件處理完成,流程繼續處理下一個事件。如果遇到阻塞,循環會去執行其他任務,當阻塞任務完成后再回調(具體如何實現不太清楚,應該是將阻塞任務標記狀態或者放進其它列來實現)其實可以參考javascript的事件循環理解,都是單線程的異步操作http://www.ruanyifeng.com/blog/2013/10/event_loop.html

asyncio 中主要的事件循環方法有:

  • asyncio.get_running_loop() :返回當前 OS 線程中正在運行的事件循環對象。
  • asyncio.get_event_loop() :獲取當前事件循環。 如果當前 OS 線程沒有設置當前事件循環並且 set_event_loop() 還沒有被調用,asyncio 將創建一個新的事件循環並將其設置為當前循環。
  • asyncio.new_event_loop() :創建一個新的事件循環。
  • loop.run_until_complete() :運行直到 future ( Future 的實例 ) 被完成。如果參數是 coroutine object ,將被隱式調度為 asyncio.Task 來運行。返回 Future 的結果 或者引發相關異常。
  • loop.create_future() :創建一個附加到事件循環中的 asyncio.Future 對象。
  • loop.create_task(coro) :安排一個 協程 的執行。返回一個 Task 對象。
  • loop.run_forever() :運行事件循環直到 stop() 被調用。
  • loop.stop() :停止事件循環
  • loop.close() :關閉事件循環。

上面的並發例子就可以改成下面形式:

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

def main():
    print("started ")
    s_time = time.time()
    loop = asyncio.get_event_loop() #獲取一個事件循環
    tasks = [
        asyncio.ensure_future(say_after(1,"hello")), #asyncio.ensure_future()包裝協程或可等待對象在將來等待。如果參數是Future,則直接返回。
        asyncio.ensure_future(say_after(2,"world")),
        loop.create_task(say_after(1,"hello")), #loop.create_task()包裝協程為task。
        loop.create_task(say_after(2,"world"))
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    print("runtime : ",time.time()-s_time)
    print("finished ")

main()

asyncio.get_event_loop方法可以創建一個事件循環,然后使用 run_until_complete 將協程注冊到事件循環,並啟動事件循環。asyncio.ensure_future(coroutine) 和loop.create_task(coroutine)都可以創建一個task,run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task,task是Future的子類。asyncio.wait類似與await 不過它可以接受一個list,asyncio.wait()返回的是一個協程。

總結:使用async可以定義協程對象,使用await可以針對耗時的操作進行掛起,就像生成器里的yield一樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其他的協程也掛起或者執行完畢,再進行下一個協程的執行,協程的目的也是讓一些耗時的操作異步化。

 

Task對象

Asyncio是用來處理事件循環中的異步進程和並發任務執行的。它還提供了 asyncio.Task() 類,可以在任務中使用協程。它的作用是,在同一事件循環中,運行某一個任務的同時可以並發地運行多個任務。當協程被包在任務中,它會自動將任務和事件循環連接起來,當事件循環啟動的時候,任務自動運行。這樣就提供了一個可以自動驅動協程的機制。

如果被包裹的協程要等待一個 future 對象,那么任務會被掛起,等待future的計算結果。當future計算完成,被包裹的協程將會拿到future返回的結果或異常(exception)繼續執行。另外,需要注意的是,事件循環一次只能運行一個任務,除非還有其它事件循環在不同的線程並行運行,此任務才有可能和其他任務並行。當一個任務在等待future執行的期間,事件循環會運行一個新的任務。

即Task對象封裝協程(async標記的函數),將其掛到事件循環上運行,如果遇到等待 future 對象(await 后面等待的),那么該事件循環會運行其他 Task、回調或執行 IO 操作

相關的主要方法有:

  • asyncio.create_task() :高層級的方法,創建Task對象,並自動添加進loop,即get_running_loop()和loop.create_task(coro)的封裝
  • asyncio.Task() :打包一個協程為Task對象
  • asyncio.current_task(loop=None) :返回當前運行的 Task 實例,如果沒有正在運行的任務則返回 None。如果 loop 為 None 則會使用 get_running_loop() 獲取當前事件循環
  • asyncio.all_tasks(loop=None) :返回事件循環所運行的未完成的 Task 對象的集合。
  • Task.cancel() :請求取消 Task 對象。這將安排在下一輪事件循環中拋出一個 CancelledError 異常給被封包的協程。
  • Task.result() :返回 Task 的結果。如果 Task 對象 已完成,其封包的協程的結果會被返回 (或者當協程引發異常時,該異常會被重新引發。)。如果 Task 對象 被取消,此方法會引發一個 CancelledError 異常。如果 Task 對象的結果還不可用,此方法會引發一個 InvalidStateError 異常。

通過網上的一個示例來理解一下,Task與loop之間的工作流程

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

流程圖如下

期間loop兩次訪問compute(),第一次是遇到阻塞await(yield from)掛起,第二次是掛起的事件有結果了去取結果,生成器return時會raise StopIteration()異常

 

通過task.result()獲取返回的結果

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    return what

async def main():
    print("started ")
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    s_time = time.time()
    await task1
    await task2
    print(task1.result(),task2.result())
    print("runtime : ",time.time()-s_time)
    print("finished ")

asyncio.run(main())

建議使用高層級的 asyncio.create_task() 函數來創建 Task 對象,也可用低層級的 loop.create_task() 或 ensure_future() 函數。不建議手動實例化 asyncio.Task() 對象。

 

Future對象 

Future如它的名字一樣,是一種對未來的一種抽象,代表將來執行或沒有執行的任務的結果。它和task上沒有本質上的區別,task是Future的子類。實際上Future包裹協程,添加上各種狀態,而task則是在Future上添加一些特性便於掛在事件循環上執行,所以Future就是一個內部底層的對象,平時我們只要關注task就可以了。Future可以通過回調函數處理結果

相關的主要方法有:

  • asyncio.isfuture(obj) :判斷對象是不是future對象
  • asyncio.ensure_future(obj,loop=None) :接收一個協程或者future或者task對象,如果是future則直接返回future,其它則返回task
  • Future.result() :返回future結果
  • Future.set_result(result) :將 Future 標記為完成並設置結果
  • Future.add_done_callback(callback, *, context=None) :添加一個在 Future 完成 時運行的回調函數。調用 callback 時,Future 對象是它的唯一參數。

官網的一個例子,體現的是Future的四個狀態:Pending、Running、Done、Cancelled。創建future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done

import asyncio

async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)

    # Set *value* as a result of *fut* Future.
    fut.set_result(value)

async def main():
    # Get the current event loop.
    loop = asyncio.get_running_loop()

    # Create a new Future object.
    fut = loop.create_future()

    # Run "set_after()" coroutine in a parallel Task.
    # We are using the low-level "loop.create_task()" API here because
    # we already have a reference to the event loop at hand.
    # Otherwise we could have just used "asyncio.create_task()".
    loop.create_task(
        set_after(fut, 1, '... world'))

    print('hello ...')

    # Wait until *fut* has a result (1 second) and print it.
    print(await fut)

asyncio.run(main())

如果注釋掉fut.set_result(value),那么future永遠不會done

 

綁定回調,future與task都可以使用add_done_callback方法,因為task是future子類

import time
import asyncio

async def say_after(delay, what):
    await asyncio.sleep(delay)
    return what

def callback(future):
    print('Callback: ', future.result())


coroutine = say_after(2,"hello")
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)

 

總結

實際上官網在3.5后建議使用高層的封裝如:asyncio.run(),asyncio.create_task()等,忽略底層的一些實現,雖然方便使用,但是對asyncio的流程理解幫助不大,還是要看底層的一些實現。

總的來說主要重點如下:

  1. 協程在asyncio里就是 async定義的函數 
  2. await將可等待對象(協程,future,task)掛起,異步或者同步地等待它們完成
  3. task對象與future對象沒有多大的區別,它們都有四個狀態,用於異步的實現
  4. 對於沒有異步實現的阻塞操作,程序會被阻塞,使用實現異步的庫(aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 這里列出了已經支持的內容,並在持續更新)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM