Python實現協程的四種方式


協程


協程不是計算機提供的,是人為創造的上下文切換技術,也可以被稱為微線程。簡而言之 其實就是在一個線程中實現代碼塊相互切換執行
我們知道正常代碼是從上到下依次執行,一個方法或函數操作完畢后才會進入下一個方法或函數執行。例如:

def func1():
	print(1)
	print(2)
	
def func2():
	print(3)
	print(4)

func1()
func2()

此時代碼執行邏輯一定是先執行完func1()對象里的語句再執行func2() ,這種稱為同步。但是如果我們想在func1()對象中print(1)后切換到func2()該怎么做呢?


可以采用以下幾種基於協程的方式:

  • greenlet。
  • yield 關鍵字
  • asyncio 裝飾器(py3.4之后引入)
  • async、await關鍵字(py3.5之后引入)【推薦】

1. greenlet實現協程


# greenlet是第三方模塊需先引入
pip3 install greenlet
# -*- coding: utf-8 -*-
# author: micher.yu
# Time:2022/01/08
# simple_desc :
from greenlet import greenlet


def func1():
	print(1)  # 第二步:輸出1
	gr2.switch()  # 第三步:切換到 func2 函數
	print(2)  # 第六步:輸出2
	gr2.switch()  # 第七步:切換到func2 函數(如果不切換的話句柄會繼續往下執行,也就不會進入func2 輸出4)


def func2():
	print(3)  # 第四步:輸出3
	gr1.switch()  # 第五步:切換到func1 函數
	print(4)  # 第八步:輸出4,func2函數 執行完畢句柄繼續往下執行


def func3():
	print(5)  # 第十步:輸出5


gr1 = greenlet(func1)  # 此處只是生成greenlet包裝的func1對象,代碼並不會實際運行
gr2 = greenlet(func2)  # 此處生成greenlet包裝的func2對象

gr1.switch()  # 第一步:此處是正式執行func1()對象
func3()  # 第九步:實例化func3

# 所以實際輸出會是 1 3 2 4 5

2. yield關鍵字


不推薦,實際應用場景比較少。

如果對yield關鍵字還不太熟悉的話可以參考往期這篇文章詳解python三大器——迭代器、生成器、裝飾器其中生成器部分有詳細講解

def func1():
	yield 1
	yield from func2()  # 這里其實相當於for item in func2(): yield item 
	yield 2


def func2():
	yield 3
	yield 4


for item in func1():
	print(item)

# 輸出結果將會是:1 3 4 2

3. asyncio 模塊


在python3.4及之后的版本才可使用,這個框架使用事件循環來編排回調和異步任務。事件循環位於事件循環策略的上下文中。
下圖是協程,事件循環和策略之間的相互作用
在這里插入圖片描述
注意:asyncio牛逼在於遇到IO阻塞自動切換!

下面我們使用@asyncio.coroutine 裝飾器(py3.10+會移除)定義了兩個協程函數。(基於生成器的協程

import asyncio


@asyncio.coroutine
def func1():
	print(1)
	# 此處用asyncio.sleep(2)來模擬IO耗時(asyncio.sleep也是一個協程對象,不能用time.sleep()),asyncio定義的協程函數遇到IO操作時會自動切換到事件循環中的其他任務
	yield from asyncio.sleep(2)  
	print(2)


@asyncio.coroutine
def func2():
	print(3)
	yield from asyncio.sleep(2)
	print(4)

PS:如果py版本高於3.8依然可以使用asyncio.coroutine裝飾器但是會有告警建議你使用async & await關鍵字來定義協程函數,不會影響使用!
在這里插入圖片描述
協程函數並不能像普通函數一樣直接實例化運行,調用協程函數協程並不會開始運行,只是返回一個協程對象。

fun1() # 此處是不會有結果的

可以通過 asyncio.iscoroutine 來驗證是否是協程對象

print(asyncio.iscoroutine(func1()))  # True

協程對象必須在事件循環中運行,我們可以通過asyncio.get_event_loop方法來獲取當前正在運行的循環實例。如loop對象,然后把協程對象交給 loop.run_until_complete,協程對象隨后會在 loop 里得到運行。

loop = asyncio.get_event_loop()
loop.run_until_complete(func1())
# 運行結果為:
# 1
# 等待2s
# 2

run_until_complete 是一個阻塞(blocking)調用,直到協程運行結束,它才返回;所以他必須接受的是一個可等待對象協程, 任務future對象)。

可等待對象:

  • 協程對象:協程函數實例化后就是協程對象

  • future對象:asyncio.futures.Future對象用來鏈接 底層回調式代碼 和高層異步/等待式代碼,可以簡單理解為future對象是可以使程序hang在某個地方等待有結果了之后再繼續執行。官方文檔

    1. 創建future對象:loop.create_future()
      import asyncio
      
      async def main():
      	# 獲取當前事件循環
      	loop = asyncio.get_running_loop()
      	# 單單只是創建一個future對象
      	fut = loop.create_future()
      	# future對象因為什么都沒做也就沒返回值,所以await會一直等待下去程序就會hang住
      	await fut
      
      
      asyncio.run(main())
      print(1)
      
      在這里插入圖片描述
    2. future對象.set_result()方法
      async def func(fut):
      	fut.set_result("finish")
      
      
      async def main():
      	# 獲取當前事件循環
      	loop = asyncio.get_running_loop()
      	# 單單只是創建一個future對象
      	fut = loop.create_future()
      	# 創建一個task對象,綁定了func函數並且把我們創建的fut對象傳遞給了協程對象func;func協程函數內部又對fut對象設置了result
      	await loop.create_task(func(fut))
      	# 由於設置了fut對象的結果,下面的await就能拿到結果 所以程序就可以繼續往下執行了
      	print(await fut)
      
      asyncio.run(main())
      print(1)
      
      """ 運行結果:
      
      finish
      1
      
      Process finished with exit code 0
      """
      
  • 任務: Task 對象Future對象子類,其作用是在運行某個任務的同時可以並發的運行其他任務。
    Task 對象可以使用 asyncio.create_task() 函數創建,也可以使用低層級的 loop.create_task() asnycio.ensure_future()

    注意:asyncio.create_task() 是python3.7之后才有的。python3.7之前可以改用asnycio.ensure_future()

    1. 取消 Task 對象 cancel()
    2. Task 任務是否被取消 cancelled()
    3. Task 對象是否完成 done()
    4. 返回結果 result()
      4.1 Task 對象被完成,則返回結果
      4.2 Task 對象被取消,則引發 CancelledError 異常
      4.3 Task 對象的結果不可用,則引發 InvalidStateError 異常
    5. 添加回調,任務完成時觸發 add_done_callback(task)
    6. 所有任務列表 asyncio.all_tasks()
    7. 返回當前任務 asyncio.current_task()

run_until_complete 的參數是一個 future,但是我們這里傳給它的卻是協程對象,之所以能這樣,是因為它在內部做了檢查
在這里插入圖片描述
要讓這個協程對象轉成future的話,可以通過 asyncio.ensure_future 方法(本質其實是創建了個task對象)。
在這里插入圖片描述
所以,我們可以寫得更明顯一些:

loop = asyncio.get_event_loop()
loop.run_until_complete(asnycio.ensure_future(func1()))
# 運行結果為:
# 1
# 等待2s
# 2

在有多個協程函數需要同時運行怎么辦?

  1. 我們可以將協程對象包裝成future對象后再放到一個列表中再通過asyncio.wait運行。

    asyncio.wait方法await關鍵字只能傳可等待 對象

    tasks = [
    	asyncio.ensure_future(func1()),  # 把協程對象包轉成一個 future 對象
    	asyncio.ensure_future(func2())
    ]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    # 運行結果為:
    # 1
    # 3
    # 等待2s
    # 2
    # 4
    
  2. 通過asyncio.gather可以直接將協程對象放到列表中(必須解包!也就是*[]):

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*[func1(), func2()]))
    # 運行結果為:
    # 1
    # 3
    # 等待2s
    # 2
    # 4
    

完整代碼為:

import asyncio


@asyncio.coroutine
def func1():
	print(1)
	yield from asyncio.sleep(2)  # 此處用asyncio.sleep(2)來模擬IO耗時(asyncio.sleep也是一個協程對象,不能用time.sleep()),自動切換到tasks中的其他任務
	print(2)


@asyncio.coroutine
def func2():
	print(3)
	yield from asyncio.sleep(2)  # 此處又遇到IO阻塞后,又會自動切換到tasks中其他的任務
	print(4)


func1()  # 調用協程函數,協程並不會開始運行,只是返回一個協程對象。可以通過 asyncio.iscoroutine 來驗證是否是協程對象
print(asyncio.iscoroutine(func1()))  # True

tasks = [
	asyncio.ensure_future(func1()),  # 把協程對象包轉成一個 future 對象
	asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
# 方式一:
loop.run_until_complete(asyncio.wait(tasks))
# 方式二:
loop.run_until_complete(asyncio.gather(*[func1(), func2()]))

同樣我們也可以執行Task對象

使用 loop 對象的 create_task 函數創建一個 Task 對象,在第一次打印 Task 對象時,狀態為 pending,完成執行函數后的狀態為 finished

import asyncio



async def do_something():
	print("這是一個Task例子....")
	# 模擬阻塞1秒
	await asyncio.sleep(1)
	return "Task任務完成"


# 創建一個事件event_loop
loop = asyncio.get_event_loop()

# 創建一個task
task = loop.create_task(do_something())
# 第一次打印task
print(task)

# 將task加入到event_loop中
loop.run_until_complete(task)
# 再次打印task
print(task)
print(task.result())
""" 運行結果
#1 <Task pending name='Task-1' coro=<do_something() running at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97>>
#2 這是一個Task例子....
#3 <Task finished name='Task-1' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task任務完成'>
#4 Task任務完成
"""

Task 對象的 result() 函數可以獲取 do_something() 函數的返回值。

Task 任務回調

import asyncio



async def do_something(task_id):
	print(f"這是一個Task例子,當前task_id:{task_id}")
	# 模擬阻塞1秒
	await asyncio.sleep(1)
	return f"Task-id {task_id} 任務完成"


# 任務完成后的回調函數
def callback(task):
	# 打印參數
	print(task)
	# 打印返回的結果
	print(task.result())


# 創建一個事件event_loop
loop = asyncio.get_event_loop()

# 創建一個task
tasks = []
for i in range(5):
	name = f"task-{i}"
	task = loop.create_task(do_something(name), name=name)
	task.add_done_callback(callback)
	tasks.append(task)

# 將task加入到event_loop中
loop.run_until_complete(asyncio.wait(tasks))
""" 輸出為:
這是一個Task例子,當前task_id:task-0
這是一個Task例子,當前task_id:task-1
這是一個Task例子,當前task_id:task-2
這是一個Task例子,當前task_id:task-3
這是一個Task例子,當前task_id:task-4
<Task finished name='task-0' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-0 任務完成'>
Task-id task-0 任務完成
<Task finished name='task-1' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-1 任務完成'>
Task-id task-1 任務完成
<Task finished name='task-2' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-2 任務完成'>
Task-id task-2 任務完成
<Task finished name='task-3' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-3 任務完成'>
Task-id task-3 任務完成
<Task finished name='task-4' coro=<do_something() done, defined at /Users/mac/Desktop/userspace/TestDemo/test/async_demo.py:97> result='Task-id task-4 任務完成'>
Task-id task-4 任務完成
"""

使用 asyncio.wait() 函數將 Task 任務列表添加到 event_loop 中,也可以使用 asyncio.gather() 函數。

多個任務執行結束后再回調

import asyncio
import functools



async def do_something(t):
	print("暫停" + str(t) + "秒")
	await asyncio.sleep(t)
	return "暫停了" + str(t) + "秒"


def callback(event_loop, gatheringFuture):
	print(gatheringFuture.result())
	print("多個Task任務完成后的回調")


loop = asyncio.get_event_loop()
gather = asyncio.gather(do_something(1), do_something(3))
gather.add_done_callback(functools.partial(callback, loop))

loop.run_until_complete(gather)
""" 輸出為:
暫停1秒
暫停3秒
['暫停了1秒', '暫停了3秒']
多個Task任務完成后的回調
"""

4. async & await 關鍵字【推薦🌟】

py3.5及之后版本


本質上和3.4的asyncio一致,但更強大。
3.5之后yield from 不可以在async定義的函數內使用,需使用await

import asyncio


async def func1():
	print(1)
	await asyncio.sleep(2)  # 遇到IO自動切換任務
	print(2)


async def func2():
	print(3)
	await asyncio.sleep(2)  # 此處又遇到IO阻塞后,又會自動切換到tasks中其他的任務
	print(4)


tasks = [
	asyncio.ensure_future(func1()),  # 把協程對象包轉成一個 future 對象
	asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
# 執行單個協程函數
loop.run_until_complete(func1())  # 由於func1中使用了await關鍵字所以此處等同於asyncio.wait
""" 輸出結果為:
1
等待2s
2
"""
# 執行多個協程函數
loop.run_until_complete(asyncio.wait(tasks))
""" 輸出結果為:
1
3
等待2s
2
4
"""

注:python3.7之后可以不需要自己獲取loop對象,可以直接調用asyncio.run方法內部已經幫我們獲取了loop對象和調用loop.run_until_complete
在這里插入圖片描述

直接使用(但是不支持同時運行多個協程對象):

asyncio.run(func1()) 

async & await 關鍵字簡化代碼的同時並且兼容基於生成器的老式協程

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

一個協程函數中可以使用多次await關鍵字

import asyncio


async def func():
	print("start")
	await asyncio.sleep(5)
	print("end")
	return "finish"


async def main():
	print("執行main方法")
	resp1 = await func()
	print(f"第一次返回值:{resp1}")
	resp2 = await func()
	print(f"第二次返回值:{resp2}")

asyncio.run(main())
"""輸出為:

"""

同樣我們也可以在一個協程函數中獲取多個其他協程對象的返回值:

import asyncio


async def func():
	print(1)
	await asyncio.sleep(2)
	print(2)
	return f"func was done"


async def main():
	print("main開始")
	# 創建協程,將協程封裝到Task對象中並添加到事件循環的任務列表中,等待事件循環去執行(默認是就緒狀態)。
	# 在調用
	task_list = [
		asyncio.create_task(func(), name="n1"),
		asyncio.create_task(func(), name="n2")
	]
	print("main結束")
	# 當執行某協程遇到IO操作時,會自動化切換執行其他任務。
	# 此處的await是等待所有協程執行完畢,並返回兩個集合 done、pending。done存放已完成的task對象,pending存放未完成的task對象
	# 如果設置了timeout值,則意味着此處最多等待的秒,完成的協程返回值寫入到done中,未完成則寫到pending中。
	done, pending = await asyncio.wait(task_list, timeout=None)
	print(done, pending)

	print(list(done)[0].result())


asyncio.run(main())

"""輸出為:
main開始
main結束
1
1
2
2
{<Task finished name='n2' coro=<func() done, defined at /Users/mac/Desktop/userspace/TestDemo/test.py:33> result='func was done'>, <Task finished name='n1' coro=<func() done, defined at /Users/mac/Desktop/userspace/TestDemo/test.py:33> result='func was done'>} set()
func was done
"""

有同學可能就要問了:上面好像都是協程調用協程函數,那我普通函數能不能和協程函數互相調用?

5.協程函數與普通函數搭配使用


普通函數調用協程函數:

async def func3():
	print('i`m func3')
	await asyncio.sleep(2)
	print('func3 finished')


def func4():
	print('i`m func4')
	asyncio.run(func3())
	print('func4 finished')


func4()


"""輸出結果:
i`m func4
i`m func3
等待2s
func3 finished
func4 finished
"""

協程函數調用普通函數

def func5():
	print('i`m func5')
	time.sleep(2)
	print('func5 finished')


async def func6():
	print('i`m func6')
	func5()
	print('func6 finished')


asyncio.run(func6())
print('all finish')
"""
i`m func6
i`m func5
等待2s
func5 finished
func6 finished
all finish

"""

有人就要問了這跟同步沒區別啊,搞得花里胡哨的有啥用;我一個普通函數一樣能實現這種效果。

那大家可以思考下這個問題,假設你的接口處理完必要業務后還有部分后置操作成功與否並不影響你的業務邏輯(比如點贊場景、記錄操作日志這種無關緊要的后置更新場景等);假設插入或更新db非常耗時 ,你為了提高性能會怎么處理呢?

6.concurrent.futures.Future對象


使用線程池、進程池實現異步操作時用到的future對象。它跟asyncio.futures.Future沒有關系,但是也是幫助程序hang住直到拿到結果。

進程池和線程池

線程池:用於偽並發執行,GIL鎖的原因;但是其實遇到IO就切換所以線程啟動間隔時間其實很短

from concurrent.futures.thread import ThreadPoolExecutor


def func(value):
	print("start")
	time.sleep(1)
	print(value)

# 指定同時可用的線程數量
thread_pool = ThreadPoolExecutor(max_workers=5)
for i in range(10):
	# submit(func,*args,**kwargs)方法向線程池提交任務
	fut = thread_pool.submit(func, i)
	# 返回值為future對象
	print(type(fut))  # <class 'concurrent.futures._base.Future'>

執行邏輯為:
先提交一個任務到線程池中 》 打印start 》遇到IO切換任務 》 打印 future 對象類型 》提交第二個任務到線程池中 》打印 start 》遇到IO切換任務 》打印 future 對象類型 》...打印 start 》 遇到IO切換任務 》打印 future 對象類型..(如果IO處理完了會在上述步驟中穿插輸出 0/1/2/3/4)
等前5個有執行完畢了再提交后面的任務,同一時刻線程池中只維護5個活躍線程。處理流程與前5個一致

所以輸出為:

start
<class 'concurrent.futures._base.Future'>
start
<class 'concurrent.futures._base.Future'>
start
<class 'concurrent.futures._base.Future'>
start
<class 'concurrent.futures._base.Future'>
start
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
<class 'concurrent.futures._base.Future'>
1
start
2
start
3
start0
start
4
start
5
678
9

進程池:創建多個進程並發執行

from concurrent.futures.process import ProcessPoolExecutor



def func(value):
	print("start")
	time.sleep(1)
	print(value)


process_pool = ProcessPoolExecutor(max_workers=5)

if __name__ == '__main__':

	for i in range(10):
		# 返回值為concurrent.futures._base.Future對象
		fut = process_pool.submit(func, i)


執行邏輯為:
啟動5個進程分別執行任務》打印5次start 》IO阻塞一秒 》打印01234
等前5個有執行完畢了再創建新的進程提交后面的任務,同一時刻進程池中只維護5個活躍進程。處理流程與前5個一致
輸出為:

start
start
start
start
start
0
start
1
2
start
start
3
start
4
start
5
6
7
8
9

回到上面的問題在不采用異步的方式時:
![在這里插入圖片描述](https://img-blog.csdnimg.cn/bbdf9b7bf9844b01865621846c6fcc8d.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBA572R5rCR5L2Z5p-Q5Lq6,size_20,color_FFFFFF,t_70,g_se,x_1

程序執行需要5s多這顯然不是我們想要的,采用異步我們可以通過以下幾種方式解決:

  • loop.run_in_executor(None,func)

    
    def insert_db(second=random.randint(1, 5)):
    	print("開始執行插入語句")
    	time.sleep(second)
    	print(f"數據插入完成,耗時:{second} s")
    
    
    async def main():
    	print("執行main")
    	loop = asyncio.get_running_loop()
    	# 第一步:第一個參數傳None則內部會先調用ThreadPoolExecutor 的submit方法去線程池中申請一個線程去執行insert_db函數,並返回一個concurrent.futures.Future對象
    	# 第二步:調用asyncio.wrap_future將concurrent.futures.Future對象包裝為asyncio.Future對象
    	# asyncio.Future對象才能使用await語法
    	loop.run_in_executor(None, insert_db) # 不等待函數執行完畢,異步
    	# await loop.run_in_executor(None, insert_db) # 等待執行完畢,就是同步
    	
    	print("main執行結束")
    
    
    before = time.time()
    print(f"執行前時間:{before}")
    asyncio.run(main())
    
    after = time.time()
    print("程序結束")
    print(f"執行后時間:{after},總耗時:{after - before}")
    

    執行結果為:
    在這里插入圖片描述

  • ThreadPoolExecutor.submit(func,*args):替換成loop.run_in_executor(ThreadPoolExecutor(), insert_db)即可

  • ProcessPoolExecutor.submit(func,*args)
    在這里插入圖片描述

7.普通函數使用多線程或多進程


多線程:
在這里插入圖片描述
多進程:上文ThreadPoolExecutor替換成ProcessPoolExecutor即可

我們也可以結合裝飾器將一個普通函數包裝成concurrent.futures.Future對象從而進一步簡化,如:

from functools import wraps


def wrap_to_async(thread_num=2):
	def wrapper(func):
		@wraps(func)
		def inner_wrapper(*args):
			return ThreadPoolExecutor(thread_num).submit(func, *args)

		return inner_wrapper

	return wrapper


async def func1():
	print('i`m func1')
	func2()
	print('func1 finished')


@wrap_to_async(2)
def func2():
	print('i`m func2\n')
	time.sleep(2)
	print('func2 finished')


asyncio.run(func1())
print("all finished")

后續使用時只需在需異步操作的普通函數上加上裝飾器即可實現協程。
在這里插入圖片描述


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM