協程通過 async/await 語法進行聲明,是編寫異步應用的推薦方式
例如新定義一個協程(coroutine object):
async def foo():
return 42
首先先來介紹下:
認識aysn和asyncio都有哪些函數方法:
創建一個future 對象:
task = asyncio.create_task(foo())
或者使用
task=asyncio.ensure_future(foo())
那么如何判斷創建的task到底是不是future 對象呢?
async def exetask():
# task = asyncio.create_task(foo())
task = asyncio.ensure_future(foo())
if isinstance(task,asyncio.Future):
print("yes")
else:
print("no")
s=await task
asyncio.run(exetask())
結果如下:
yes
答案是肯定的。
2.如何運行一個協程:
要真正運行一個協程,asyncio 提供了三種主要機制:
第一種:
-
asyncio.run()
函數用來運行最高層級的入口點 "main()" 函數 (參見上面的示例。)
第二種:
通過loop對象實現:
loop=asyncio.get_event_loop()
async def jobs():
i=10
while i>0:
# time.sleep(0.5)
i=i-1
return 0
tasks=[asyncio.ensure_future(jobs(k)) for k in range(1,4)]
res=loop.run_until_complete(asyncio.gather(*tasks))
print(res)
loop.close()
3.並發
第1種並發運行:
當一個協程通過asyncio.create_task()
等函數被打包為一個 任務,asyncio.
create_task
(coro)將 coro 協程 打包為一個Task
排入日程准備執行,
返回 Task 對象。此函數 在 Python 3.7 中被加入。在 Python 3.7 之前,of course你也可以改用低層級的asyncio.ensure_future()
函數, This works in all Python versions but is less readable
#### ordinal job async
async def jobs():
i=10
while i>0:
# time.sleep(0.5)
i=i-1
return 0
async def get_status():
r=await jobs()
return r
loop=event=asyncio.get_event_loop() tasks = [asyncio.create_task(get_status()) for k in range(1,4)] for task in tasks: r=loop.run_until_complete(asyncio.wait_for(task,timeout=10)) print(r)
這里我采用了比較低級的loop事件對象來調用run_until_complete() 方法來實現:
事實上開發者一般更喜歡采用高級用法asyncio.wait()或者asyncio.wait_for()來實現這寫異步任務調用:
在進行下面介紹之前我想你應該先了解:asyncio.
wait_for
(aw, timeout, *, loop=None)
等待 aw 可等待對象 完成,指定 timeout 秒數后超時。
如果 aw 是一個協程,它將自動作為任務加入日程。
timeout 可以為 None
,也可以為 float 或 int 型數值表示的等待秒數。如果 timeout 為 None
,則等待直到完成。
如果發生超時,任務將取消並引發 asyncio.TimeoutError
.
函數將等待直到目標對象確實被取消,所以總等待時間可能超過 timeout 指定的秒數。
如果等待被取消,則 aw 指定的對象也會被取消。
loop 參數已棄用,計划在 Python 3.10 中移除。
asyncio.
wait
(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
並發運行 aws 指定的 可等待對象 並阻塞線程直到滿足 return_when 指定的條件。
如果 aws 中的某個可等待對象為協程,它將自動作為任務加入日程。直接向 wait()
傳入協程對象已棄用,因為這會導致 令人迷惑的行為。
返回兩個 Task/Future 集合: (done, pending)
。
用法:
done, pending = await asyncio.wait(aws)
loop 參數已棄用,計划在 Python 3.10 中移除。
如指定 timeout (float 或 int 類型) 則它將被用於控制返回之前等待的最長秒數。
請注意此函數不會引發 asyncio.TimeoutError
。當超時發生時,未完成的 Future 或 Task 將在指定秒數后被返回。
return_when 指定此函數應在何時返回。它必須為以下常數之一:
常數 |
描述 |
---|---|
|
函數將在任意可等待對象結束或取消時返回。 |
|
函數將在任意可等待對象因引發異常而結束時返回。當沒有引發任何異常時它就相當於 |
|
函數將在所有可等待對象結束或取消時返回。 |
與 wait_for()
不同,wait()
在超時發生時不會取消可等待對象。
如何驗證wait 不取消,wait_for 取消aw對象呢:
我們來看個例子:
先看wait_for:
async def foo(k=0):
await asyncio.sleep(30)
return k
async def exetask():
task=asyncio.create_task(foo(k=1))
try:
await asyncio.wait_for(task,timeout=1)
print(task.cancelled())
#3.7 改為當 aw 因超時被取消,wait_for 會等待 aw 被取消,3.7之前直接報異常,
except asyncio.TimeoutError:
print("timeout ")
# task.cancel()
print(task.cancelled())
asyncio.run(exetask())
結果:
C:\Python37\python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py
timeout
True
再看wait,這里注意由於waitexpect a list of futures, not Task,我換種寫法:
async def foo(k=0):
await asyncio.sleep(30)
return k
async def exetask():
task = asyncio.create_task(foo(k=1))
try:
#請注意wait函數不會引發 asyncio.TimeoutError
await asyncio.wait([task], timeout=1)
## 3.7 改為當 aw 因超時被取消,wait_for 會等待 aw 被取消,3.7之前直接報異常,
# await asyncio.wait_for(task,timeout=1)
except asyncio.TimeoutError:
print("timeout ")
print(task.cancelled())
asyncio.run(exetask())
結果:
C:\Python37\python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py
False
Process finished with exit code 0
這里完美的展現了阻塞的魅力!!!!!!!
第2種並發運行:
awaitableasyncio.
gather
(*aws, loop=None, return_exceptions=False)
並發 運行 aws 序列中的 可等待對象。
下面來看個簡單的例子:
async def count(k): print(f"start job{k} {time.asctime()} ") await asyncio.sleep(0.6) print(f"end job{k} {time.asctime()}") return k async def mayns(): r=await asyncio.gather(count(1),count(2)) return r def test(): import time st=time.perf_counter() results=asyncio.run(mayns()) elapsed=time.perf_counter()-st print(f"result return :{results} execute in {elapsed:0.2} seconds.")
運行結果:
start job1 Fri Dec 13 23:00:38 2019
start job2 Fri Dec 13 23:00:38 2019
end job1 Fri Dec 13 23:00:39 2019
end job2 Fri Dec 13 23:00:39 2019
result return :[1, 2] execute in 0.6 seconds.
gather直接返回的是調用的task的所有result列表
當然你也可以這樣搜集任務:
最后介紹一下如何實現異步http請求:
# request 庫同步阻塞,aiohttp才是異步的請求,pip install aiohttp from aiohttp import ClientSession as session async def test2(k): r=await other_test(k) return r async def other_test(k): print("start await job %s,%s"%(k,time.asctime())) urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"] async with session() as request: async with request.get(urls[0]) as rq: r=await rq.read() res=r.decode("utf-8") print("end await job %s,%s"%(k,time.asctime())) return res def test_aiohttp(): klist=[100,50,88] loop=asyncio.get_event_loop() tasks=[asyncio.ensure_future(test2(k)) for k in klist] # loop.run_until_complete(asyncio.wait(tasks)) #可通過asyncio.gather(*tasks)將響應全部收集起來 res=loop.run_until_complete(asyncio.gather(*tasks)) print(res) loop.close()
結果:
C:\Python37\python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py
start await job 100,Fri Dec 13 23:54:31 2019
start await job 50,Fri Dec 13 23:54:31 2019
start await job 88,Fri Dec 13 23:54:31 2019
end await job 88,Fri Dec 13 23:54:31 2019
end await job 50,Fri Dec 13 23:54:31 2019
end await job 100,Fri Dec 13 23:54:31 2019
['{"user": "test001", "msg": "this is test index view "}', '{"user": "test001", "msg": "this is test index view "}', '{"user": "test001", "msg": "this is test index view "}']
Process finished with exit code 0
服務是自己用djangO起的:
from django.shortcuts import render
# Create your views here.
from django.http import HttpResponse
import json
from . models import Student,Grade
from django.db import models
def index(request):
data={"user":"test001","msg":"this is test index view "}
js=json.dumps(data)
return HttpResponse(js)
def stuTable(request):
import time
time.sleep(3)
# stu_object=Student.objects.all()
# return render(request,template_name="index.html",context={"student_object":stu_object})
return HttpResponse(json.dumps({"A":888,"NN":899}))