協程,又稱為微線程,看上去像是子程序,但是它和子程序又不太一樣,它在執行的過程中,可以在中斷當前的子程序后去執行別的子程序,再返回來執行之前的子程序,但是它的相關信息還是之前的。
優點:
- 極高的執行效率,因為子程序切換而不是線程切換,沒有了線程切換的開銷;
- 不需要多線程的鎖機制,因為只有一個線程在執行;
如果要充分利用CPU多核,可以通過使用多進程+協程的方式
打開asyncio的源代碼,可以發現asyncio中的需要用到的文件如下:

下面的則是接下來要總結的文件
文件 |
解釋 |
base_events |
基礎的事件,提供了BaseEventLoop事件 |
coroutines |
提供了封裝成協程的類 |
events |
提供了事件的抽象類,比如BaseEventLoop繼承了AbstractEventLoop |
futures |
提供了Future類 |
tasks |
提供了Task類和相關的方法 |
函數 |
解釋 |
coroutine(func) |
為函數加上裝飾器 |
iscoroutinefunction(func) |
判斷函數是否使用了裝飾器 |
iscoroutine(obj) |
判斷該對象是否是裝飾器 |
如果在函數使用了coroutine
裝飾器,就可以通過yield from
去調用async def
聲明的函數,如果已經使用async def
聲明,就沒有必要再使用裝飾器了,這兩個功能是一樣的。
import asyncio
@asyncio.coroutine
def hello_world():
print("Hello World!")
async def hello_world2():
print("Hello World2!")
print('------hello_world------')
print(asyncio.iscoroutinefunction(hello_world))
print('------hello_world2------')
print(asyncio.iscoroutinefunction(hello_world2))
print('------event loop------')
loop = asyncio.get_event_loop()
# 一直阻塞該函數調用到函數返回
loop.run_until_complete(hello_world())
loop.run_until_complete(hello_world2())
loop.close()
上面的代碼分別使用到了coroutine
裝飾器和async def
,其運行結果如下:
------hello_world------True------hello_world2------True------event loop------Hello World!Hello World2!
注意:不可以直接調用協程,需要一個event loop
去調用。
如果想要在一個函數中去得到另外一個函數的結果,可以使用yield from
或者await
,例子如下:
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
函數print_sum
會一直等到函數compute
返回結果,執行過程如下:

這個文件里面漏出來的只有BaseEventLoop
一個類,它的相關方法如下:
函數 |
解釋 |
create_future() |
創建一個future對象並且綁定到事件上 |
create_task() |
創建一個任務 |
run_forever() |
除非調用stop,否則事件會一直運行下去 |
run_until_complete(future) |
直到future對象執行完畢,事件才停止 |
stop() |
停止事件 |
close() |
關閉事件 |
is_closed() |
判斷事件是否關閉 |
time() |
返回事件運行時的時間 |
call_later(delay, callback, *args) |
設置一個回調函數,並且可以設置延遲的時間 |
call_at(when, callback, *args) |
同上,但是設置的是絕對時間 |
call_soon(callback, *args) |
馬上調用 |
函數 |
解釋 |
get_event_loop() |
返回一個異步的事件 |
... |
... |
返回的就是BaseEventLoop的對象。
Future類的相關方法如下:
方法 |
解釋 |
cancel() |
取消掉future對象 |
cancelled() |
返回是否已經取消掉 |
done() |
如果future已經完成則返回true |
result() |
返回future執行的結果 |
exception() |
返回在future中設置了的exception |
add_done_callback(fn) |
當future執行時執行回調函數 |
remove_done_callback(fn) |
刪除future的所有回調函數 |
set_result(result) |
設置future的結果 |
set_exception(exception) |
設置future的異常 |
設置future的例子如下:
import asyncio
async def slow_operation(future):
await asyncio.sleep(1) # 睡眠
future.set_result('Future is done!') # future設置結果
loop = asyncio.get_event_loop()
future = asyncio.Future() # 創建future對象
asyncio.ensure_future(slow_operation(future)) # 創建任務
loop.run_until_complete(future) # 阻塞直到future執行完才停止事件
print(future.result())
loop.close()
run_until_complete
方法在內部通過調用了future的add_done_callback
,當執行future完畢的時候,就會通知事件。
下面這個例子則是通過使用future的add_done_callback
方法實現和上面例子一樣的效果:
import asyncio
async def slow_operation(future):
await asyncio.sleep(1)
future.set_result('Future is done!')
def got_result(future):
print(future.result())
loop.stop() # 關閉事件
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result) # future執行完畢就執行該回調
try:
loop.run_forever()
finally:
loop.close()
一旦slow_operation
函數執行完畢的時候,就會去執行got_result
函數,里面則調用了關閉事件,所以不用擔心事件會一直執行。
Task類是Future的一個子類,也就是Future中的方法,task都可以使用,類方法如下:
方法 |
解釋 |
current_task(loop=None) |
返回指定事件中的任務,如果沒有指定,則默認當前事件 |
all_tasks(loop=None) |
返回指定事件中的所有任務 |
cancel() |
取消任務 |
並行執行三個任務的例子:
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
))
loop.close()
執行結果為
Task A: Compute factorial(2)...Task B: Compute factorial(2)...Task C: Compute factorial(2)...Task A: factorial(2) = 2Task B: Compute factorial(3)...Task C: Compute factorial(3)...Task B: factorial(3) = 6Task C: Compute factorial(4)...Task C: factorial(4) = 24
可以發現,ABC同時執行,直到future執行完畢才退出。
下面一些方法是和task相關的方法
方法 |
解釋 |
as_completed(fs, *, loop=None, timeout=None) |
返回是協程的迭代器 |
ensure_future(coro_or_future, *, loop=None) |
調度執行一個 coroutine object:並且它封裝成future。返回任務對象 |
async(coro_or_future, *, loop=None) |
丟棄的方法,推薦使用ensure_future |
wrap_future(future, *, loop=None) |
Wrap a concurrent.futures.Future object in a Future object. |
gather(*coros_or_futures, loop=None, return_exceptions=False) |
從給定的協程或者future對象數組中返回future匯總的結果 |
sleep(delay, result=None, *, loop=None) |
創建一個在給定時間(以秒為單位)后完成的協程 |
shield(arg, *, loop=None) |
等待future,屏蔽future被取消 |
wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED) |
等待由序列futures給出的Futures和協程對象完成。協程將被包裹在任務中。返回含兩個集合的Future:(done,pending) |
wait_for(fut, timeout, *, loop=None) |
等待單個Future或coroutine object完成超時。如果超時為None,則阻止直到future完成 |
參考文章
官方文檔