AIOHTTP
用於asyncio和Python的異步HTTP客戶端/服務器
主要特點:
- 支持客戶端和HTTP服務器。
- 支持服務器WebSockets和 客戶端WebSockets開箱即用,沒有回調地獄。
- Web服務器具有中間件, 信號和可插拔路由。
入門
客戶端:
import aiohttp import asyncio async def fetch(session, url): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: html = await fetch(session, 'http://python.org') print(html) loop = asyncio.get_event_loop() loop.run_until_complete(main())
服務端:
from aiohttp import web async def handle(request): name = request.match_info.get('name', "Anonymous") text = "Hello, " + name return web.Response(text=text) app = web.Application() app.add_routes([web.get('/', handle), web.get('/{name}', handle)]) web.run_app(app)
Web服務
1、post、get接收參數數據
#!/usr/bin/env python # -*- coding:utf-8 -*- import asyncio from aiohttp import web class Application(object): ''' aiohttp 接口 ''' def __init__(self): pass async def prepare_init(self): ''' 預加載 :return: ''' self.hearders_setting = [ web.post('/', self.post), web.get('/', self.get), ] async def get(self,request): print(request.app['db']) arguments = request.query print(arguments) # < MultiDictProxy('model': 'aiohttp') > return web.Response(text='get') async def post(self,request): arguments = await request.post() print(arguments) # < MultiDictProxy('model': 'aiohttp') > # 獲取ip地址 print(request.transport.get_extra_info('peername')) # ('127.0.0.1', 53245) print(request.path) # / print(request.raw_path) return web.Response(text='post') async def app_factory(self): ''' 配置app :return: ''' await self.prepare_init() app = web.Application() app.add_routes(self.hearders_setting) app['db'] = 'db' return app def run_forever(self): ''' 開啟端口 :return: ''' web.run_app(self.app_factory()) Application().run_forever()

1 import requests 2 3 4 requests.post(url='http://127.0.0.1:8080/',data={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'}) 5 requests.get(url='http://127.0.0.1:8080/',params={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'})
2、web處理程序取消
#!/usr/bin/env python # -*- coding:utf-8 -*- import asyncio from aiohttp import web async def something(): await asyncio.sleep(2) print('handle out') async def post(request): arguments = await request.post() await something() return web.Response(text='post')
注:當客戶端請求過程中,中斷請求,此時handle out 並不會打印執行,直接取消運行;
官方:await
如果客戶端斷開連接而不讀取整個響應的BODY,則可以取消每個 Web處理程序執行。這種行為與經典的WSGI框架(如Flask和Django)截然不同
有時它是一種理想的行為:在處理GET
請求時,代碼可能從數據庫或其他Web資源獲取數據,提取可能很慢。
取消此非常好:對等已經斷開連接,沒有理由通過從DB獲取數據而沒有任何機會將其發送回對等來浪費時間和資源(內存等)。
但有時取消很糟糕:根據POST
要求,通常需要將數據保存到數據庫,而不管對等關閉。
取消預防可以通過以下幾種方式實施:
- 應用於
asyncio.shield()
將數據保存到DB的協同程序 - 產生DB保存的新任務
- 使用aiojobs或其他第三方庫
① shield
import asyncio from aiohttp import web async def something(): await asyncio.sleep(2) print('handle out') async def post(request): arguments = await request.post() await asyncio.shield(something()) return web.Response(text='post')
asyncio.shield()
工作得很好。唯一的缺點是你需要將Web處理程序分成兩個異步函數:一個用於處理程序本身,另一個用於受保護的代碼。
async def handler(request): await asyncio.shield(write_to_redis(request)) await asyncio.shield(write_to_postgres(request)) return web.Response(text='OK')
在REDIS中保存數據后可能會發生取消, write_to_postgres
不會被調用
② 創建任務
import asyncio from aiohttp import web async def something(): await asyncio.sleep(2) print('handle out') async def post(request): arguments = await request.post() request.loop.create_task(something()) return web.Response(text='post')
產生一項新任務的情況要糟糕得多:沒有地方可以await
產生任務 ;request.loop.create_task(something()) 這個沒有辦法awit獲取值,如果加上awit,則導致中斷時,something停止工作
3、數據共享
aiohttp.web
不鼓勵使用全局變量,每個變量都應該有自己的非全局上下文。
所以,Application
並Request
支持一個collections.abc.MutableMapping
接口(即它們是類似dict的對象),允許它們用作數據存儲。
① 應用程序的配置
要存儲類似全局變量,請隨意將它們保存在 Application
實例中:
app['my_private_key'] = data
並在Web處理程序中獲取它:
async def handler(request): data = request.app['my_private_key']
在嵌套應用程序的情況下,所需的查找策略可能如下:
- 搜索當前嵌套應用程序中的鍵。
- 如果未找到密鑰,請繼續在父應用程序中進行搜索。
為此,請使用Request.config_dict
只讀屬性:
async def handler(request): data = request.config_dict['my_private_key']
② 請求的存儲
Variables that are only needed for the lifetime of a Request
, can be stored in a Request
:
async def handler(request): request['my_private_key'] = "data" ...
這對於中間件和 信號處理程序來說非常有用,可以存儲數據以供鏈中的下一個處理程序進一步處理。
③ 響應的存儲
StreamResponse
和Response
對象也支持collections.abc.MutableMapping
接口。當您希望在處理程序中的所有工作完成后與信號和中間件共享數據時,這非常有用:
async def handler(request): [ do all the work ] response['my_metric'] = 123 return response
4、中間件
aiohttp.web
提供了一種通過中間件自定義請求處理程序的強大機制 。
一個中間件是可以修改請求或響應中的協程。例如,這是一個附加 到響應的簡單中間件:'wink'
from aiohttp.web import middleware @middleware async def middleware(request, handler): resp = await handler(request) resp.text = resp.text + ' wink' return resp
注意:該示例不適用於流式響應或websockets
每個中間件都應該接受兩個參數,一個request
實例和一個處理程序,並返回響應或引發異常。如果異常不是HTTPException
它的實例,則500
HTTPInternalServerError
在處理中間件鏈之后將 其轉換為。
警告:第二個參數應該完全命名為handler
創建時Application
,這些中間件將傳遞給僅限關鍵字的middlewares
參數:
app = web.Application(middlewares=[middleware_1, middleware_2])
在內部,通過以相反的順序將中間件鏈應用於原始處理程序來構造單個請求處理程序,並由RequestHandler
作為常規處理程序調用。
由於中間件本身就是協程,因此await
在創建新的處理程序時可能會執行額外的 調用,例如調用數據庫等。
中間件通常會調用處理程序,但是他們可能會選擇忽略它,例如,如果用戶沒有訪問底層資源的權限,則顯示403 Forbidden頁面或引發HTTPForbidden
異常。它們還可能呈現處理程序引發的錯誤,執行一些預處理或后處理,如處理CORS等。
以下代碼演示了中間件的執行順序:
from aiohttp import web async def test(request): print('Handler function called') return web.Response(text="Hello") @web.middleware async def middleware1(request, handler): print('Middleware 1 called') response = await handler(request) print('Middleware 1 finished') return response @web.middleware async def middleware2(request, handler): print('Middleware 2 called') response = await handler(request) print('Middleware 2 finished') return response app = web.Application(middlewares=[middleware1, middleware2]) app.router.add_get('/', test) web.run_app(app)
輸出
Middleware 1 called Middleware 2 called Handler function called Middleware 2 finished Middleware 1 finished
中間件的常見用途是實現自定義錯誤頁面。以下示例將使用JSON響應呈現404錯誤,因為可能適合JSON REST服務:
from aiohttp import web @web.middleware async def error_middleware(request, handler): try: response = await handler(request) if response.status != 404: return response message = response.message except web.HTTPException as ex: if ex.status != 404: raise message = ex.reason return web.json_response({'error': message}) app = web.Application(middlewares=[error_middleware])
中間件工廠
一個中間件工廠是創建與傳遞參數的中間件功能。例如,這是一個簡單的中間件工廠:
def middleware_factory(text): @middleware async def sample_middleware(request, handler): resp = await handler(request) resp.text = resp.text + text return resp return sample_middleware
請記住,與常規中間件相反,您需要中間件工廠的結果而不是功能本身。因此,當將中間件工廠傳遞給應用程序時,您實際需要調用它:
app = web.Application(middlewares=[middleware_factory(' wink')])
5、信號
雖然middleware可以在准備響應之前或之后定制請求處理程序,但在准備響應時不能定制響應。For this aiohttp.web
provides signals.
例如,中間件只能為未准備好的 響應更改HTTP標頭(請參閱參考資料StreamResponse.prepare()
),但有時我們需要一個鈎子來更改流式響應和WebSockets的HTTP標頭。這可以通過訂閱Application.on_response_prepare
信號來完成 :
async def on_prepare(request, response): response.headers['My-Header'] = 'value' app.on_response_prepare.append(on_prepare)
此外,可以訂閱Application.on_startup
和 Application.on_cleanup
信號以進行應用程序組件設置並相應地拆除。
以下示例將正確初始化並配置aiopg連接引擎:
from aiopg.sa import create_engine async def create_aiopg(app): app['pg_engine'] = await create_engine( user='postgre', database='postgre', host='localhost', port=5432, password='' ) async def dispose_aiopg(app): app['pg_engine'].close() await app['pg_engine'].wait_closed() app.on_startup.append(create_aiopg) app.on_cleanup.append(dispose_aiopg)
信號處理程序不應返回值,但可以修改傳入的可變參數。
信號處理程序將按順序運行,以便添加它們。從aiohttp 3.0開始,所有處理程序必須是異步的。
Application.on_startup
/ Application.on_cleanup
對仍有陷阱:信號處理程序彼此獨立。
E.g. we have [create_pg, create_redis]
in startup signal and [dispose_pg,dispose_redis]
in cleanup.
If, for example, create_pg(app)
call fails create_redis(app)
is not called. But on application cleanup both dispose_pg(app)
and dispose_redis(app)
are still called:
清理信號不知道啟動/清理對及其執行狀態。
解決方案是Application.cleanup_ctx
用法:
async def pg_engine(app): app['pg_engine'] = await create_engine( user='postgre', database='postgre', host='localhost', port=5432, password='' ) yield app['pg_engine'].close() await app['pg_engine'].wait_closed() app.cleanup_ctx.append(pg_engine)
屬性是列表生成器,代碼之前 yield
是(稱為上初始化階段的啟動),碼 之后yield
被上執行清理。生成器必須只有一個yield
。
aiohttp保證當且僅當啟動代碼成功完成時才調用 清理代碼。
Python 3.6+支持異步生成器,在Python 3.5上請使用async_generator 庫。
版本3.1中的新功能。
7、后台任務
有時需要在應用程序啟動后執行一些異步操作。
更重要的是,在一些復雜的系統中,可能需要在事件循環中運行一些后台任務以及應用程序的請求處理程序。例如,監聽消息隊列或其他網絡消息/事件源(例如,ZeroMQ,Redis Pub / Sub,AMQP等)以對應用程序內的接收消息作出反應。
例如,后台任務可以在zmq.SUB
套接字上偵聽ZeroMQ ,處理並將檢索到的消息轉發到通過WebSocket連接的客戶端,這些客戶端存儲在應用程序中的某個位置(例如,在application['websockets']
列表中)。
為了運行這種短期和長期運行的后台任務,aiohttp提供了注冊Application.on_startup
將與應用程序的請求處理程序一起運行的信號處理程序的能力。
例如,需要運行一個快速任務和兩個長時間運行的任務,這些任務將一直存在,直到應用程序處於活動狀態。相應的后台任務可以注冊為Application.on_startup
信號處理程序,如下例所示:
async def listen_to_redis(app): try: sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop) ch, *_ = await sub.subscribe('news') async for msg in ch.iter(encoding='utf-8'): # Forward message to all connected websockets: for ws in app['websockets']: ws.send_str('{}: {}'.format(ch.name, msg)) except asyncio.CancelledError: pass finally: await sub.unsubscribe(ch.name) await sub.quit() async def start_background_tasks(app): app['redis_listener'] = app.loop.create_task(listen_to_redis(app)) async def cleanup_background_tasks(app): app['redis_listener'].cancel() await app['redis_listener'] app = web.Application() app.on_startup.append(start_background_tasks) app.on_cleanup.append(cleanup_background_tasks) web.run_app(app)
任務listen_to_redis()
將永遠運行。要正確關閉它,Application.on_cleanup
信號處理程序可用於向其發送取消。
8、處理異常錯誤
https://aiohttp-demos.readthedocs.io/en/latest/tutorial.html#aiohttp-demos-polls-middlewares
9、request中獲取add_routes中url
async def post(self, request): url = [resource._path for resource in request.app.router._resources] print(url) # ['/', '/6773', '/', '/1234/'] Response = web.Response(text='post') return Response