一、協程介紹
協程:是單線程下的並發,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。
協程的好處:
-
無需線程上下文切換的開銷
-
無需原子操作鎖定及同步的開銷 "原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。
-
方便切換控制流,簡化編程模型
-
高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。
缺點:
-
無法利用多核資源:協程的本質是個單線程,它不能同時將單個 CPU 的多個核用上,協程需要和進程配合才能運行在多 CPU 上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是 CPU 集型應用。
-
進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
總結協程特點:
-
必須在只有一個單線程里實現並發
-
修改共享數據不需加鎖
-
用戶程序里自己保存多個控制流的上下文棧
-
附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))
Python2.x協程
類庫:
-
yield
-
greenlet
-
gevent
Python3.x協程
-
asyncio
Python3.x系列的gevent用法和python2.x系列是一樣的
在學習前,我們先來理清楚同步/異步的概念:
·同步是指完成事務的邏輯,先執行第一個事務,如果阻塞了,會一直等待,直到這個事務完成,再執行第二個事務,順序執行。。。也稱作串行執行。
·異步是和同步相對的,異步是指在處理調用這個事務的之后,不會等待這個事務的處理結果,直接處理第二個事務去了,通過狀態、通知、回調來通知調用者處理結果。也稱作並行執行。
二、greenlet模塊
第三方模塊,可以在pycharm中選擇虛擬環境安裝,
也可以通過 pip install greenlet 安裝
greenlet 通過 greenlet(func) 啟動一個協程,通過 switch() 手動切換程序的執行
示例
from greenlet import greenlet
def func1(name):
print("%s from func1"%name) #2執行這一句
g2.switch("jack") #3切換執行func2(),第一次執行要傳入參數保存現在執行的狀態
print("from func1 end") #6執行這一句
g2.switch()#7切換執行play(),保存現在執行的狀態
def func2(name):
print("%s from func2"%name) #4執行這一句
g1.switch() #5切換執行func1(),保存現在執行的狀態
print("from func2 end") #8執行這一句
g1 = greenlet(func1)
g2 = greenlet(func2)
g1.switch("nick") #1執行func1(),在switch()里傳參數 ,注意與一般的線程、進程傳參方式的不同
#可以在第一次switch時傳入參數,以后都不需要
分析:就是通過創建greenlet(func)對象,通過對象的switch()方法轉移程序執行的不同步驟,但是這里無法自動識別IO后自動切換。
三、gevent模塊
gevent 是一個第三方庫,可以輕松通過gevent實現並發同步或異步編程,在gevent中用到的主要模式是greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。
安裝 pip3 install gevent 或者在pycharm中選擇虛擬環境安裝
用法
#用法 g1=gevent.spawn(func,1,,2,3,x=4,y=5)創建一個協程對象g1,spawn括號內第一個參數是函數名,如func1,后面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數func1的 g2=gevent.spawn(func2) g1.join() #等待g1結束 g2.join() #等待g2結束 #或者上述兩步合作一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值
示例
import gevent
def func1():
print('from func1: 1')
gevent.sleep(0)
print('from func1: 2')
gevent.sleep(1)
def func2():
print('from func2: 1')
gevent.sleep(2)
print('from func2: 2')
def func3():
print('from func3: 1')
gevent.sleep(1)
print('from func3: 2')
gevent.joinall([
gevent.spawn(func1),
gevent.spawn(func2),
gevent.spawn(func3),
])
輸出結果
from func1: 1 from func2: 1 from func3: 1 from func1: 2 from func3: 2 from func2: 2
分析:可以從輸出結果看到程序不斷的在三個函數中跳躍執行,遇到IO了就去執行另外的函數,但是請注意一點
gevent.sleep() 是用於模仿 IO 操作的,實際使用中不需要 gevent.sleep(),這里如果單純執行上述代碼的話,gevent模塊也是只能識別 gevent.sleep()產生的IO,而對系統產生的IO或者網絡IO之類無法識別,所有需要打上補丁,使得gevent模塊識別其他IO。
gevent是不能直接識別的需要用下面一行代碼,打補丁
要用gevent,需要將from gevent import monkey;monkey.patch_all()放到文件的開頭
示例
需求:爬取三個網站並打印網頁字符串長度
from gevent import monkey;monkey.patch_all()
# 把當前程序的所有 IO 操作標記起來,否則模塊無法知道 IO 操作
import gevent
import time
import requests
def get_page(url):
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.121 Safari/537.36'
}
page_text = requests.get(url=url, headers=headers).text
print('網站長度', len(page_text))
def main():
urls = [
'https://www.sogou.com',
'https://cn.bing.com',
'https://cnblogs.com/Nicholas0707/',
]
time_start = time.time()
for url in urls:
get_page(url)
print('同步耗時:', time.time() - time_start)
print("-"*50)
async_time_start = time.time()
gevent.joinall([
gevent.spawn(get_page, 'https://www.sogou.com'),
gevent.spawn(get_page, 'https://cn.bing.com'),
gevent.spawn(get_page, 'https://cnblogs.com/Nicholas0707/'),
])
print('異步協程耗時:', time.time() - async_time_start)
if __name__ == '__main__':
main()
輸出結果
網站長度 23795 網站長度 130248 網站長度 13761 同步耗時: 2.5321450233459473 -------------------------------------------------- 網站長度 23795 網站長度 130221 網站長度 13761 異步協程耗時: 0.36602067947387695
分析:從結果可以看出采用協程異步明顯更快
四、asyncio模塊
asyncio是Python3.4(2014年)引進的標准庫,直接內置了對IO的支持。
python2x沒有加這個庫,python3.5又加入了async/await特性,python3.7新增了asyncio.run() api來執行異步函數.
協程示例
先簡單看一個協程示例
運行協程函數的第一種方式(loop.run_until_complete())
#python 3.7+,本次測試環境python3.8
import asyncio,time
async def fun(): #定義一個協程函數
print('hello')
await asyncio.sleep(1) #模擬IO操作,等待調用
print('word')
if __name__ == '__main__':
begin = time.time()
# 創建一個事件loop
loop = asyncio.get_event_loop()
# 將協程函數加入到事件循環loop,並啟動事件循環
loop.run_until_complete(fun())
loop.close()
print('用時共計',time.time()-begin)
print(fun)
print(loop)
輸出結果
hello word 用時共計 1.0010573863983154 <function fun at 0x00000000022CD0D0> <ProactorEventLoop running=False closed=True debug=False>
上面代碼等同於下面(不推薦使用,python3.8已經不支持此寫法了)
##python 3.7,本次測試環境python3.7
import asyncio,time
@asyncio.coroutine #這種寫法在python3.8之后被拋棄了
def fun(): #定義一個協程函數
print('hello')
yield from asyncio.sleep(1) #模擬IO操作,等待調用
print('word')
if __name__ == '__main__':
begin = time.time()
# 創建一個事件loop
loop = asyncio.get_event_loop()
# 將協程函數加入到事件循環loop,並啟動事件循環
loop.run_until_complete(fun())
loop.close()
print('用時共計',time.time()-begin)
分析:使用async關鍵字定義一個協程函數,用asyncio.get_event_loop()創建一個事件循環,然后使用run_until_complete將協程注冊到事件循環,並啟動事件循環。
運行協程函數的第二種方式( asyncio.gather()---asyncio.run())
示例
# ## python 3.7+,本次測試環境python3.8
#
import asyncio,time
async def foo():
print('start foo')
await asyncio.sleep(1)
print('end foo')
return 'foo'
async def bar():
print('start bar')
await asyncio.sleep(2)
print('end bar')
return ('1','2')
async def main():
res = await asyncio.gather(foo(), bar())
#同時將兩個異步函數對象加入事件循環,
# 但並不運行,等待調用。
print(res)
if __name__ == '__main__':
begin = time.time()
asyncio.run(main())
print('共計用時',time.time()-begin)
# 執行協程事件循環並返回結果。
輸出結果
start foo
start bar
end foo
end bar
['foo', ('1', '2')]
共計用時 2.003114700317383
分析:如果要同時異步執行兩個異步函數,需要用asyncio.gather(fun1(), fun2())將兩個異步函數對象加入事件循環,這里不用顯示的創建異步事件循環,因為asyncio.gather()方法中如果檢測到你沒有創建異步事件循環會自動幫你創建,見源代碼
def gather(*coros_or_futures, loop=None, return_exceptions=False):
"""..."""
if not coros_or_futures:
if loop is None:
loop = events.get_event_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
啟動事件循環是通過 asyncio.run()方法進行啟動
運行協程函數的第三種方式( asyncio.create_task()---asyncio.run())
## python 3.7+,本次測試環境python3.8
import asyncio,time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(f"{what} at {time.strftime('%X')}")
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello')) #創建任務事件,異步函數加入參數,
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
await task1 #將任務事件加入異步事件循環,等待調用
await task2
print(f"finished at {time.strftime('%X')}")
if __name__ == '__main__':
begin = time.time()
asyncio.run(main()) #啟動異步事件循環
print('共計用時',time.time()-begin)
輸出結果
started at 20:01:51 hello at 20:01:52 world at 20:01:53 finished at 20:01:53 共計用時 2.002114772796631
分析:通過asyncio.create_task()創建等待異步執行的任務事件,這里也是自動創建了事件循環loop,
源碼
def create_task(coro, *, name=None):
"""...
"""
loop = events.get_running_loop()
然后使用await將任務事件加入異步事件循環。
關於asyncio的一些關鍵字的說明:
-
event_loop 事件循環:程序開啟一個無限循環,把一些函數注冊到事件循環上,當滿足事件發生的時候,調用相應的協程函數
-
coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會立即執行函數,而是會返回一個協程對象。協程對象需要注冊到事件循環,由事件循環調用。
-
task 任務:一個協程對象就是一個原生可以掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各種狀態
-
future: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質上的區別
-
async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口,等待調用。
-
sleep:暫停執行此任務,為事件循環分配要競爭的任務,並且它(事件循環)監視其所有任務的狀態並從一個任務切換到另一個,這里是模擬io任務花費的時間。
asyncio方法
""" Asyncio.get_event_loop() 返回一個事件循環對象,是asyncio.Baseeventloop的實例 Abstracteventloop.stop() 停止運行事件循環 Abstracteventloop.run_forever() 一直運行直到stop() Abstracteventloop.run_until_complete(future) 運行直至future對象運行完 Abstracteventloop.close() 關閉事件循環 Abstracteventloop.is_running() 返回事件循環的是否運行 asyncio.gather(*aws, loop=None, return_exceptions=False) 同時在協程事件循環中運行定義的異步函數對象 task = asyncio.create_task(func());task.cancel() 請求取消任務。調用它將導致Task將CancelledError異常拋出到協程事件循環中。 """
為異步函數綁定回調函數
## python 3.7+,本次測試環境python3.8
import asyncio
async def fun():
print('hello word')
return 'nick'
def callback(future):
print('Callback: ', future.result()) # 通過result()方法獲得異步函數的返回值
loop = asyncio.get_event_loop() # 創建異步事件循環
task = loop.create_task(fun()) # 將異步函數加入loop
task.add_done_callback(callback) # 添加回調函數
loop.run_until_complete(task)
輸出結果
hello word Callback: nick
示例二
## python 3.7+,本次測試環境python3.8
import asyncio
async def fun():
print('hello')
await asyncio.sleep(1)
print('fun --end')
return 'nick'
async def bar():
print('word')
await asyncio.sleep(2)
print('bar --end')
return 'jack'
def callback(future):
print('Callback: ', future.result()) # 通過result()方法獲得異步函數的返回值
async def main():
loop = asyncio.get_event_loop() # 創建異步事件循環
task1 = loop.create_task(fun()) # 將異步函數加入loop
task2 = loop.create_task(bar()) # 將異步函數加入loop
task1.add_done_callback(callback) # 添加回調函數
task2.add_done_callback(callback) # 添加回調函數
await task1
await task2
if __name__ == '__main__':
asyncio.run(main())
輸出結果
hello word fun --end Callback: nick bar --end Callback: jack
