【模塊】:aiohttp(一)


AIOHTTP

用於asyncio和Python的異步HTTP客戶端/服務器

 

主要特點:

  • 支持客戶端和HTTP服務器。
  • 支持服務器WebSockets和 客戶端WebSockets開箱即用,沒有回調地獄。
  • Web服務器具有中間件, 信號和可插拔路由。

 

 入門

客戶端:

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(html)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())  

服務端:

from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

app = web.Application()
app.add_routes([web.get('/', handle),
                web.get('/{name}', handle)])

web.run_app(app)

  

 Web服務

 1、post、get接收參數數據

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import asyncio
from aiohttp import web


class Application(object):
    '''
    aiohttp 接口
    '''

    def __init__(self):
        pass

    async def prepare_init(self):
        '''
        預加載
        :return:
        '''
        self.hearders_setting = [
            web.post('/', self.post),
            web.get('/', self.get),
        ]

    async def get(self,request):
        print(request.app['db'])
        arguments = request.query
        print(arguments)
        # < MultiDictProxy('model': 'aiohttp') >
        return web.Response(text='get')

    async def post(self,request):
        arguments = await request.post()
        print(arguments)
        # < MultiDictProxy('model': 'aiohttp') >
        # 獲取ip地址
        print(request.transport.get_extra_info('peername'))
        # ('127.0.0.1', 53245)
        print(request.path)
        # /
        print(request.raw_path)
        return web.Response(text='post')

    async def app_factory(self):
        '''
        配置app
        :return:
        '''
        await self.prepare_init()
        app = web.Application()
        app.add_routes(self.hearders_setting)
        app['db'] = 'db'
        return app

    def run_forever(self):
        '''
        開啟端口
        :return:
        '''
        web.run_app(self.app_factory())


Application().run_forever()
1 import requests
2 
3 
4 requests.post(url='http://127.0.0.1:8080/',data={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'})
5 requests.get(url='http://127.0.0.1:8080/',params={'model':'aiohttp'},headers={'Content-Type': 'application/x-www-form-urlencoded'})
請求腳本

 

2、web處理程序取消

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import asyncio
from aiohttp import web

async def something():
    await asyncio.sleep(2)
    print('handle out')

async def post(request):
    arguments = await request.post()
    await something()
    return web.Response(text='post')

注:當客戶端請求過程中,中斷請求,此時handle out 並不會打印執行,直接取消運行;

官方:await 如果客戶端斷開連接而不讀取整個響應的BODY,則可以取消每個 Web處理程序執行。這種行為與經典的WSGI框架(如Flask和Django)截然不同

有時它是一種理想的行為:在處理GET請求時,代碼可能從數據庫或其他Web資源獲取數據,提取可能很慢。

  取消此非常好:對等已經斷開連接,沒有理由通過從DB獲取數據而沒有任何機會將其發送回對等來浪費時間和資源(內存等)。

  但有時取消很糟糕:根據POST要求,通常需要將數據保存到數據庫,而不管對等關閉。

取消預防可以通過以下幾種方式實施:

  • 應用於asyncio.shield()將數據保存到DB的協同程序
  • 產生DB保存的新任務
  • 使用aiojobs或其他第三方庫

①  shield

import asyncio
from aiohttp import web

async def something():
    await asyncio.sleep(2)
    print('handle out')

async def post(request):
    arguments = await request.post()
    await asyncio.shield(something())
    return web.Response(text='post')

asyncio.shield()工作得很好。唯一的缺點是你需要將Web處理程序分成兩個異步函數:一個用於處理程序本身,另一個用於受保護的代碼。

async def handler(request):
    await asyncio.shield(write_to_redis(request))
    await asyncio.shield(write_to_postgres(request))
    return web.Response(text='OK')

在REDIS中保存數據后可能會發生取消, write_to_postgres不會被調用

② 創建任務

import asyncio
from aiohttp import web

async def something():
    await asyncio.sleep(2)
    print('handle out')

async def post(request):
    arguments = await request.post()
    request.loop.create_task(something())
    return web.Response(text='post')

產生一項新任務的情況要糟糕得多:沒有地方可以await 產生任務 ;request.loop.create_task(something()) 這個沒有辦法awit獲取值,如果加上awit,則導致中斷時,something停止工作

 

3、數據共享

aiohttp.web不鼓勵使用全局變量,每個變量都應該有自己的非全局上下文。

所以,ApplicationRequest 支持一個collections.abc.MutableMapping接口(即它們是類似dict的對象),允許它們用作數據存儲。

① 應用程序的配置

要存儲類似全局變量,請隨意將它們保存在 Application實例中:

app['my_private_key'] = data

並在Web處理程序中獲取它

async def handler(request):
    data = request.app['my_private_key']

嵌套應用程序情況下,所需的查找策略可能如下:

  1. 搜索當前嵌套應用程序中的鍵。
  2. 如果未找到密鑰,請繼續在父應用程序中進行搜索。

為此,請使用Request.config_dict只讀屬性:

async def handler(request):
    data = request.config_dict['my_private_key']

② 請求的存儲

Variables that are only needed for the lifetime of a Request, can be stored in a Request:

async def handler(request):
  request['my_private_key'] = "data"
  ...

這對於中間件和 信號處理程序來說非常有用,可以存儲數據以供鏈中的下一個處理程序進一步處理。

 ③ 響應的存儲

StreamResponseResponse對象也支持collections.abc.MutableMapping接口。當您希望在處理程序中的所有工作完成后與信號和中間件共享數據時,這非常有用:

async def handler(request):
  [ do all the work ]
  response['my_metric'] = 123
  return response

 

4、中間件

aiohttp.web提供了一種通過中間件自定義請求處理程序的強大機制 。

一個中間件是可以修改請求或響應中的協程。例如,這是一個附加 到響應的簡單中間件:'wink'

from aiohttp.web import middleware

@middleware
async def middleware(request, handler):
    resp = await handler(request)
    resp.text = resp.text + ' wink'
    return resp

注意:該示例不適用於流式響應或websockets

每個中間件都應該接受兩個參數,一個request實例和一個處理程序,並返回響應或引發異常。如果異常不是HTTPException它的實例,則500 HTTPInternalServerError在處理中間件鏈之后將 其轉換為。

警告:第二個參數應該完全命名為handler

創建時Application,這些中間件將傳遞給僅限關鍵字的middlewares參數:

app = web.Application(middlewares=[middleware_1,
                                   middleware_2])

在內部,通過以相反的順序將中間件鏈應用於原始處理程序來構造單個請求處理程序,並由RequestHandler作為常規處理程序調用。

由於中間件本身就是協程,因此await在創建新的處理程序時可能會執行額外的 調用,例如調用數據庫等。

中間件通常會調用處理程序,但是他們可能會選擇忽略它,例如,如果用戶沒有訪問底層資源的權限,則顯示403 Forbidden頁面或引發HTTPForbidden異常。它們還可能呈現處理程序引發的錯誤,執行一些預處理或后處理,如處理CORS等。

以下代碼演示了中間件的執行順序:

from aiohttp import web

async def test(request):
    print('Handler function called')
    return web.Response(text="Hello")

@web.middleware
async def middleware1(request, handler):
    print('Middleware 1 called')
    response = await handler(request)
    print('Middleware 1 finished')
    return response

@web.middleware
async def middleware2(request, handler):
    print('Middleware 2 called')
    response = await handler(request)
    print('Middleware 2 finished')
    return response


app = web.Application(middlewares=[middleware1, middleware2])
app.router.add_get('/', test)
web.run_app(app) 

輸出

Middleware 1 called
Middleware 2 called
Handler function called
Middleware 2 finished
Middleware 1 finished

中間件的常見用途是實現自定義錯誤頁面。以下示例將使用JSON響應呈現404錯誤,因為可能適合JSON REST服務:

from aiohttp import web

@web.middleware
async def error_middleware(request, handler):
    try:
        response = await handler(request)
        if response.status != 404:
            return response
        message = response.message
    except web.HTTPException as ex:
        if ex.status != 404:
            raise
        message = ex.reason
    return web.json_response({'error': message})

app = web.Application(middlewares=[error_middleware])

中間件工廠

一個中間件工廠是創建與傳遞參數的中間件功能。例如,這是一個簡單的中間件工廠:

def middleware_factory(text):
    @middleware
    async def sample_middleware(request, handler):
        resp = await handler(request)
        resp.text = resp.text + text
        return resp
    return sample_middleware

請記住,與常規中間件相反,您需要中間件工廠的結果而不是功能本身。因此,當將中間件工廠傳遞給應用程序時,您實際需要調用它:

app = web.Application(middlewares=[middleware_factory(' wink')])

  

5、信號

雖然middleware可以在准備響應之前或之后定制請求處理程序,但在准備響應時不能定制響應。For this aiohttp.web provides signals.

例如,中間件只能為未准備好的 響應更改HTTP標頭(請參閱參考資料StreamResponse.prepare()),但有時我們需要一個鈎子來更改流式響應和WebSockets的HTTP標頭。這可以通過訂閱Application.on_response_prepare信號來完成 :

async def on_prepare(request, response):
    response.headers['My-Header'] = 'value'

app.on_response_prepare.append(on_prepare)

此外,可以訂閱Application.on_startup和 Application.on_cleanup信號以進行應用程序組件設置並相應地拆除。

以下示例將正確初始化並配置aiopg連接引擎:

from aiopg.sa import create_engine

async def create_aiopg(app):
    app['pg_engine'] = await create_engine(
        user='postgre',
        database='postgre',
        host='localhost',
        port=5432,
        password=''
    )

async def dispose_aiopg(app):
    app['pg_engine'].close()
    await app['pg_engine'].wait_closed()

app.on_startup.append(create_aiopg)
app.on_cleanup.append(dispose_aiopg)

信號處理程序不應返回值,但可以修改傳入的可變參數。

信號處理程序將按順序運行,以便添加它們。aiohttp 3.0開始,所有處理程序必須是異步的

 

6、清理上下文

Application.on_startupApplication.on_cleanup 對仍有陷阱:信號處理程序彼此獨立。

E.g. we have [create_pg, create_redis] in startup signal and [dispose_pg,dispose_redis] in cleanup.

If, for example, create_pg(app) call fails create_redis(app) is not called. But on application cleanup both dispose_pg(app) and dispose_redis(app) are still called: 

清理信號不知道啟動/清理對及其執行狀態。

解決方案是Application.cleanup_ctx用法:

async def pg_engine(app):
    app['pg_engine'] = await create_engine(
        user='postgre',
        database='postgre',
        host='localhost',
        port=5432,
        password=''
    )
    yield
    app['pg_engine'].close()
    await app['pg_engine'].wait_closed()

app.cleanup_ctx.append(pg_engine)

屬性是列表生成器,代碼之前 yield是(稱為上初始化階段的啟動),碼 之后yield被上執行清理。生成器必須只有一個yield

aiohttp保證當且僅當啟動代碼成功完成時才調用 清理代碼。

Python 3.6+支持異步生成器,在Python 3.5上請使用async_generator 庫。

版本3.1中的新功能。

 

 7、后台任務

有時需要在應用程序啟動后執行一些異步操作。

更重要的是,在一些復雜的系統中,可能需要在事件循環中運行一些后台任務以及應用程序的請求處理程序。例如,監聽消息隊列或其他網絡消息/事件源(例如,ZeroMQ,Redis Pub / Sub,AMQP等)以對應用程序內的接收消息作出反應。

例如,后台任務可以在zmq.SUB套接字上偵聽ZeroMQ ,處理並將檢索到的消息轉發到通過WebSocket連接的客戶端,這些客戶端存儲在應用程序中的某個位置(例如,在application['websockets']列表中)。

為了運行這種短期和長期運行的后台任務,aiohttp提供了注冊Application.on_startup將與應用程序的請求處理程序一起運行的信號處理程序的能力。

例如,需要運行一個快速任務和兩個長時間運行的任務,這些任務將一直存在,直到應用程序處於活動狀態。相應的后台任務可以注冊為Application.on_startup 信號處理程序,如下例所示:

async def listen_to_redis(app):
    try:
        sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
        ch, *_ = await sub.subscribe('news')
        async for msg in ch.iter(encoding='utf-8'):
            # Forward message to all connected websockets:
            for ws in app['websockets']:
                ws.send_str('{}: {}'.format(ch.name, msg))
    except asyncio.CancelledError:
        pass
    finally:
        await sub.unsubscribe(ch.name)
        await sub.quit()


async def start_background_tasks(app):
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
    app['redis_listener'].cancel()
    await app['redis_listener']


app = web.Application()
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app)

任務listen_to_redis()將永遠運行。要正確關閉它,Application.on_cleanup信號處理程序可用於向其發送取消。

 

8、處理異常錯誤

https://aiohttp-demos.readthedocs.io/en/latest/tutorial.html#aiohttp-demos-polls-middlewares

 

9、request中獲取add_routes中url

    async def post(self, request):
        url = [resource._path for resource in request.app.router._resources]
        print(url)
        # ['/', '/6773', '/', '/1234/']
        Response = web.Response(text='post')
        return Response

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM