python協程


不知道你有沒有被問到過有沒有使用過的python協程?

協程是什么?

協程是一種用戶態輕量級,是實現並發編程的一種方式。說到並發,就能想到了多線程 / 多進程模型,是解決並發問題的經典模型之一。

但是隨刻客戶端數量達到一定量級,進程上下文切換占用了大量的資源,線程也頂不住如此巨大的壓力,對於多線程應用,CPU通過切片的方式來切換線程間的執行,

線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。線程是搶占式的調度,而協程是協同式的調度,也就是說,協程需要自己做調度。

協程有什么用?

在別的語言中協程意義不大,多線程可解決問題,但是python因為他有GIL(Global Interpreter Lock 全局解釋器鎖 )在同一時間只有一個線程在工作,如果一個線程里面I/O操作特別多,協程就比較適用;

在python中多線程的執行情況如下圖:

怎么實現協程?

python2.7中用生成器實現協程

在python3.7以后使用asyncio 和 async / await 的方法實現協程,實現過程就變得很簡單

具體說明:簡單的代碼示例

 
         
import asyncio
import time


async def sub_function(str):
print(' {}'.format(str))
sleep_time = int(str.split('_')[-1])
await asyncio.sleep(sleep_time)
print('OK {}'.format(str))


async def main(strs):
for str in strs:
await sub_function(str)

# asyncio.run(main()) 作為主程序的入口函數,在程序運行周期內,只調用一次 asyncio.run
t0 = time.time()
asyncio.run(main(['str_1', 'str_2', 'str_3', 'str_4']))
t1 = time.time()
print("Total time running: %s seconds" %(str(t1 - t0)))

輸出結果:一共是 10s 和我們順序分析 分別等待 1 2 3 4 秒 好像沒有什么提升作用,繼續往下看

 

 

執行協程的方法有三種:

1. 入上例所示,用await 來調用實現,但是await 執行的效果,和 Python 正常執行是一樣的,也就是說程序會阻塞在這里,進入被調用的協程函數,執行完畢返回后再繼續,而這也是 await 的字面意思。

  await 是同步調用,相當於我們用異步接口寫了個同步代碼,所以運行時間沒有得到提升。

2. 上栗的異步想過沒有體現出來,接下來我們使用另一個概念  task

async def sub_function(str):
    print(' {}'.format(str))
    sleep_time = int(str.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(str))


async def main(strs):
   # tasks
= [asyncio.create_task(sub_function(str)) for str in strs] for task in tasks: await task # asyncio.run(main()) 作為主程序的入口函數,在程序運行周期內,只調用一次 asyncio.run t0 = time.time() asyncio.run(main(['str_1', 'str_2', 'str_3', 'str_4'])) t1 = time.time() print("Total time running: %s seconds" %(str(t1 - t0)))

們有了協程對象后,便可以通過 asyncio.create_task 來創建任務。任務創建后很快就會被調度執行,

這樣,我們的代碼也不會阻塞在任務這里。用for task in tasks: await task 即可。這次,你就看到效果了吧,結果顯示,運行總時長等於運行時間最長一句。

運行結果:

 

 3. 上面的task也可以用下面的方式寫:

async def main(strs):
    tasks = [asyncio.create_task(sub_function(str)) for str in strs]
    await asyncio.gather(*tasks)

唯一要注意的是,*tasks 解包列表,將列表變成了函數的參數;與之對應的是, ** dict 將字典變成了函數的參數。

在實際中,我們會遇到接口超時,我們就需要取消的情況,這種情況該怎么處理呢?再進一步,如果某些協程運行時出現錯誤,又該怎么處理呢?

import time
import asyncio

async def worker_1():
    await asyncio.sleep(1)
    return 1

async def worker_2():
    await asyncio.sleep(2)
    return 2 / 0

async def worker_3():
    await asyncio.sleep(3)
    return 3

async def main():
    task_1 = asyncio.create_task(worker_1())
    task_2 = asyncio.create_task(worker_2())
    task_3 = asyncio.create_task(worker_3())

    await asyncio.sleep(2)
    task_3.cancel()

    res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
    print(res)

t0 = time.time()
asyncio.run(main())
t1 = time.time()
print("Total time running: %s seconds" %(str(t1 - t0)))

要注意return_exceptions=True這行代碼。這個參數默認值為False, 如果不設置這個參數,錯誤就會完整地 throw 到我們這個執行層,從而需要 try except 來捕捉,

這也就意味着其他還沒被執行的任務會被全部取消掉。為了避免這個局面,我們將 return_exceptions 設置為 True 即可。

 

 線程能實現的,協程都能做到.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM