aiohttp初識(請求&響應)


aiohttp客戶端使用

用於asyncio和Python的異步HTTP客戶端/服務器:Asynchronous HTTP Client/Server for asyncio and Python.

發起請求

讓我們從導入aiohttp模塊開始:

import aiohttp

好啦,我們來嘗試獲取一個web頁面。比如我們來獲取下GitHub的時間軸。

async with aiohttp.ClientSession() as session:    
    async with session.get('https://api.github.com/events') as resp:        
        print(resp.status)        
        print(await resp.text())

我們現在有了一個會話(session)對象,由ClientSession對象賦值而來,還有一個變量resp,它其實是ClientResponse對象。我們可以從這個響應對象中獲取我們任何想要的信息。協程方法ClientSession.get()的主要參數接受一個HTTP URL。

發起HTTP POST請求我們可以使用協程方法ClientSession.post():

session.post('http://httpbin.org/post', data=b'data')

其他的HTTP方法也同樣支持:

session.put('http://httpbin.org/put',data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

注意:

不要為每個請求都創建一個會話。大多數情況下每個應用程序只需要一個會話就可以執行所有的請求。
每個會話對象都包含一個連接池,可復用的連接和持久連接狀態(keep-alives,這兩個是默認的)可提升總體的執行效率。

發起JSON請求:

每個會話的請求方法都可接受json參數。

async with aiohttp.ClientSession() as session:    
    async with session.post(json={'test': 'object'})

默認情況下會話(session)使用Python標准庫里的json模塊解析json信息。但還可使用其他的json解析器。可以給ClientSession指定json_serialize參數來實現:

import ujson
async with aiohttp.ClientSession(json_serialize=ujson.dumps) as session:
    async with session.post(json={'test': 'object'})

傳遞URL中的參數:

你可能經常想在URL中發送一系列的查詢信息。如果你手動構建他們,這些信息會以鍵值對的形式出現在?后面,比如: httpbin.org/get?key=val。請求對象允許你使用dict(字典,python中的數據類型)發送它們,使用params參數即可。例如: 如果你要把 key1=value1,key2=value2放到httpbin.org/get后面,你可以用下面的方式:

params = {'key1': 'value1', 'key2': 'value2'}
async with session.get('http://httpbin.org/get', params=params) as resp:    
    assert str(resp.url) == 'http://httpbin.org/get?key2=value2&key1=value1'

看,URL已經被正確的編碼啦。
同鍵不同值的並聯字典(MultiDict) 也同樣支持。
可使用帶有兩個tuples(元組,python中的數據類型)的list(列表,python中的數據類型)來構建:

params = [('key', 'value1'), ('key', 'value2')]
async with session.get('http://httpbin.org/get', params=params) as r:    
    assert str(r.url) == 'http://httpbin.org/get?key=value2&key=value1'

同樣也允許你傳遞str(字符串)給params,但要小心一些不能被編碼的字符。+就是一個不能被編碼的字符:

async with session.get('http://httpbin.org/get', params='key=value+1') as r:        
    assert str(r.url) == 'http://httpbin.org/get?key=value+1'

注意:

aiohttp會在發送請求前標准化URL。
域名部分會用IDNA 編碼,路徑和查詢條件會重新編譯(requoting)
比如:URL('http://example.com/путь%30?a=%31') 會被轉化為URL('http://example.com/%D0%BF%D1%83%D1%82%D1%8C/0?a=1')
如果服務器需要接受准確的表示並不要求編譯URL,那標准化過程應是禁止的。
禁止標准化可以使用encoded=True:

await session.get(URL('http://example.com/%30', encoded=True))

警告:

傳遞params時不要用encode=True,這倆參數不能同時使用。

獲取響應內容

我們可以讀取服務器的響應內容。想想我們獲取GitHub時間軸的例子:

async with session.get('https://api.github.com/events') as resp:    
    print(await resp.text())

這樣會打印出類似於下面的信息:

'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

aiohttp將會自動解碼內容。你可以為text()方法指定編碼(使用encoding參數):

await resp.text(encoding='windows-1251')

獲取二進制響應內容

你也可以以字節形式獲取響應,這樣得到的就不是文本了:

print(await resp.read())b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

gzipdefalte傳輸編碼會自動解碼。
你也可以使其支持brotli傳輸編碼的解碼,只需安裝brotlipy即可。

獲取JSON響應內容

以防你需要處理JSON數據,內置了一個JSON解碼器:

async with session.get('https://api.github.com/events') as resp:    
    print(await resp.json())

如果JSON解碼失敗,json()方法將會拋出一個異常。你還可以在調用json()時指定編碼器和解碼器函數。

注意:

這些方法會讀出內存中所有響應的內容。如果你要讀非常多的數據,考慮使用流式響應方法進行讀取。請看之后的文檔。

獲取流式響應內容

read(), json(), text()等方法使用起來很方便,但也要注意謹慎地使用。上述方法會將所有的響應內容加載到內存。舉個例子,如果你要下載幾個G的文件,這些方法還是會將所有內容都加載到內存,內存會表示”臣妾做不到啊~”(如果內存不夠的話)。作為代替你可以用content屬性。content其實是 aiohttp.StreamReader類的實例。gzipdeflate傳輸編碼同樣會自動解碼。

async with session.get('https://api.github.com/events') as resp:    
    await resp.content.read(10)

一般情況下你可以使用下列模式將內容保存在一個文件中:

with open(filename, 'wb') as fd:
    while True:
        chunk = await resp.content.read(chunk_size)
        if not chunk:
            break
        fd.write(chunk)

在使用content讀了數據后,就不要在用read(), json(), text()了。

獲取請求信息

ClientResponse(客戶端響應)對象含有request_info(請求信息),主要是urlheaders信息。 raise_for_status結構體上的信息會被復制給ClientResponseError實例。

自定義Headers

如果你需要給某個請求添加HTTP頭,可以使用headers參數,傳遞一個dict對象即可。
比如,如果你想給之前的例子指定 content-type可以這樣:

import jsonurl = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}
headers = {'content-type': 'application/json'}
await session.post(url, data=json.dumps(payload), headers=headers)

自定義Cookies

發送你自己的cookies給服務器,你可以為ClientSession對象指定cookies參數:

url = 'http://httpbin.org/cookies'
cookies = {'cookies_are': 'working'}
async with ClientSession(cookies=cookies) as session:    
    async with session.get(url) as resp:        
        assert await resp.json() == {"cookies": {"cookies_are": "working"}}

注意:

訪問httpbin.org/cookies 會看到以JSON形式返回的cookies。查閱會話中的cookies請看ClientSession.cookie_jar

發起更復雜的POST請求

一般來說,如果你想以表單形式發送一些數據 - 就像HTML表單。那么只需要簡單的將一個dict通過data參數傳遞就可以。傳遞的dict數據會自動編碼:

payload = {'key1': 'value1', 'key2': 'value2'}
async with session.post('http://httpbin.org/post',
                        data=payload) as resp:
    print(await resp.text())
"""    
{
  ...
  "form": {
    "key2": "value2",
    "key1": "value1"
  },
  ...
}
"""

如果你想發送非表單形式的數據你可用str(字符串)代替dict(字典)。這些數據會直接發送出去。
例如,GitHub API v3 接受JSON編碼POST/PATCH數據:

import jsonurl = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}
async with session.post(url, data=json.dumps(payload)) as resp:    
    ...

發送多部分編碼文件(Multipart-Encoded)

上傳多部分編碼文件:

url = 'http://httpbin.org/post'
files = {'file': open('report.xls', 'rb')}
await session.post(url, data=files)

你也可以顯式地設置文件名,文件類型:

url = 'http://httpbin.org/post'
data = FormData()
data.add_field('file', open('report.xls', 'rb'), filename='report.xls', content_type='application/vnd.ms-excel')
await session.post(url, data=data)

如果你把一個文件對象傳遞給data參數,aiohttp會自動將其以流的形式上傳。查看StreamReader以獲取支持的格式信息。

參見:

使用Multipart.

流式上傳

aiohttp 支持多種形式的流式上傳,允許你直接發送大文件而不必讀到內存。

下面是個簡單的例子,提供類文件對象即可:

with open('massive-body', 'rb') as f:   
    await session.post('http://httpbin.org/post', data=f)

或者你也可以使用aiohttp.streamer對象:

@aiohttp.streamer
def file_sender(writer, file_name=None):
    with open(file_name, 'rb') as f:
        chunk = f.read(2**16)
        while chunk:
            yield from writer.write(chunk)
            chunk = f.read(2**16)
# 之后你可以使用’file_sender‘傳遞給data:
async with session.post('http://httpbin.org/post', data=file_sender(file_name='huge_file')) as resp:
    print(await resp.text())

同樣可以使用StreamReader對象.

我們來看下如何把來自於另一個請求的內容作為文件上傳並計算其SHA1值:

async def feed_stream(resp, stream):
    h = hashlib.sha256()
    while True:
        chunk = await resp.content.readany()
        if not chunk:
            break
        h.update(chunk)
        stream.feed_data(chunk)
    return h.hexdigest()
resp = session.get('http://httpbin.org/post')
stream = StreamReader()
loop.create_task(session.post('http://httpbin.org/post', data=stream))
file_hash = await feed_stream(resp, stream)

因為響應對象的content屬性是一個StreamReader實例,所以你可以將get和post請求連在一起用:

r = await session.get('http://python.org')
await session.post('http://httpbin.org/post', data=r.content)

上傳預壓縮過的數據

上傳一個已經壓縮過的數據,需要為Headers中的Content-Encoding指定算法名(通常是deflate或者是zlib).

async def my_coroutine(session, headers, my_data):
    data = zlib.compress(my_data)
    headers = {'Content-Encoding': 'deflate'}
    async with session.post('http://httpbin.org/post', data=data, headers=headers)
        pass

持久連接(keep-alive), 連接池和cookies共享

ClientSession可以在多個請求之間共享cookies:

async with aiohttp.ClientSession() as session:
    await session.get(
        'http://httpbin.org/cookies/set?my_cookie=my_value')
    filtered = session.cookie_jar.filter_cookies('http://httpbin.org')
    assert filtered['my_cookie'].value == 'my_value'
    async with session.get('http://httpbin.org/cookies') as r:
        json_body = await r.json()
        assert json_body['cookies']['my_cookie'] == 'my_value'

你也可以為所有的會話請求設置headers:

async with aiohttp.ClientSession(
    headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session:
    async with session.get("http://httpbin.org/headers") as r:
        json_body = await r.json()
        assert json_body['headers']['Authorization'] == 'Basic bG9naW46cGFzcw=='

ClientSession支持持久連接和連接池,可直接使用,不需要額外操作。

安全cookies

ClientSession中的默認的aiohttp.CookiesJar使用的是嚴苛模式,RFC 2109明確禁止使用ip地址形式的URL攜帶cookies信息。比如: http://127.0.0.1:80/cookie
這樣很好,不過有些時候我們測試時需要允許攜帶cookies。在aiohttp.CookiesJar中傳遞unsafe=True來實現這一效果:

jar = aiohttp.CookieJar(unsafe=True)session = aiohttp.ClientSession(cookie_jar=jar)

使用虛假Cookie Jar

有時不想處理cookie。這時可以在會話中使用aiohttp.DummyCookieJar來達到目的。

jar = aiohttp.DummyCookieJar()session = aiohttp.ClientSession(cookie_jar=jar)

使用連接器

想要調整請求的傳輸層你可以為ClientSession及其同類組件傳遞自定義的連接器。例如:

conn = aiohttp.TCPConnector()session = aiohttp.ClientSession(connector=conn)

注解:

不要給多個會話對象使用同一個連接器,某一會話對象擁有其所有權。

參見:

查看連接器部分了解更多不同的連接器類型和配置選項信息。

限制連接池的容量

限制同一時間打開的連接數可以傳遞limit參數:

conn = aiohttp.TCPConnector(limit=30)

這樣就將總數限制在30.

默認情況下是100.

如果你不想有限制,傳遞0即可:

conn = aiohttp.TCPConnector(limit=0)

限制同一時間在同一個端點((host, port, is_ssl) 3者都一樣的情況)打開的連接數可指定limit_per_host參數:

conn = aiohttp.TCPConnector(limit_per_host=30)

這樣會限制在30.
默認情況下是0(也就是不做限制)。

使用自定義域名服務器

底層需要aiodns支持:

from aiohttp.resolver import AsyncResolver
resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
conn = aiohttp.TCPConnector(resolver=resolver)

為TCP sockets添加SSL控制:

默認情況下aiohttp總會對使用了HTTPS協議(的URL請求)查驗其身份。但也可將verify_ssl設置為False讓其不檢查:

r = await session.get('https://example.com', verify_ssl=False)

如果你需要設置自定義SSL信息(比如使用自己的證書文件)你可以創建一個ssl.SSLContext實例並傳遞到ClientSession中:

sslcontext = ssl.create_default_context(cafile='/path/to/ca-bundle.crt')
r = await session.get('https://example.com', ssl_context=sslcontext)

如果你要驗證自簽名的證書,你也可以用之前的例子做同樣的事,但是用的是load_cert_chain():

sslcontext = ssl.create_default_context(
   cafile='/path/to/ca-bundle.crt')
sslcontext.load_cert_chain('/path/to/client/public/device.pem',
                           '/path/to/client/private/device.jey')
r = await session.get('https://example.com', ssl_context=sslcontext)

SSL驗證失敗時拋出的錯誤:

aiohttp.ClientConnectorSSLError:

aiohttp.ClientConnectorSSLError:

try:
    await session.get('https://expired.badssl.com/')
except aiohttp.ClientConnectorSSLError as e:
    assert isinstance(e, ssl.SSLError)

aiohttp.ClientConnectorCertificateError:

try:
    await session.get('https://wrong.host.badssl.com/')
except aiohttp.ClientConnectorCertificateError as e:
    assert isinstance(e, ssl.CertificateError)

如果你需要忽略所有SSL的錯誤:

aiohttp.ClientSSLError:

aiohttp.ClientSSLError:

try:
    await session.get('https://expired.badssl.com/')
except aiohttp.ClientSSLError as e:
    assert isinstance(e, ssl.SSLError)
try:
    await session.get('https://wrong.host.badssl.com/')
except aiohttp.ClientSSLError as e:
    assert isinstance(e, ssl.CertificateError)

你還可以通過SHA256指紋驗證證書:

# Attempt to connect to https://www.python.org
# with a pin to a bogus certificate:
bad_fingerprint = b'0'*64
exc = None
try:
    r = await session.get('https://www.python.org',
                          fingerprint=bad_fingerprint)
except aiohttp.FingerprintMismatch as e:
    exc = e
assert exc is not None
assert exc.expected == bad_fingerprint

# www.python.org cert's actual fingerprint
assert exc.got == b'...'

注意這是以DER編碼的證書的指紋。如果你的證書是PEM編碼,你需要轉換成DER格式:

openssl x509 -in crt.pem -inform PEM -outform DER > crt.der

注解:

提示: 從16進制數字轉換成二進制字節碼,你可以用binascii.unhexlify().

TCPConnector中設置的verify_ssl, fingerprint和ssl_context都會被當做默認的verify_ssl, fingerprint和ssl_context,ClientSession或其他同類組件中的設置會覆蓋默認值。

警告:

verify_ssl 和 ssl_context互斥的。
MD5SHA1指紋雖不贊成使用但是是支持的 - 這倆是非常不安全的哈希函數。

Unix 域套接字

如果你的服務器使用UNIX域套接字你可以用UnixConnector:

conn = aiohttp.UnixConnector(path='/path/to/socket')
session = aiohttp.ClientSession(connector=conn)

代理支持

aiohttp 支持 HTTP/HTTPS形式的代理。你需要使用proxy參數:

async with aiohttp.ClientSession() as session:
    async with session.get("http://python.org", proxy="http://some.proxy.com") as resp:
        print(resp.status)

同時支持認證代理:

async with aiohttp.ClientSession() as session:
    proxy_auth = aiohttp.BasicAuth('user', 'pass')
    async with session.get("http://python.org",
                           proxy="http://some.proxy.com",
                           proxy_auth=proxy_auth) as resp:
        print(resp.status)

也可將代理的驗證信息放在url中:

session.get("http://python.org", proxy="http://user:pass@some.proxy.com")

requests(另一個廣受歡迎的http包)不同,aiohttp默認不會讀取環境變量中的代理值。但你可以通過傳遞trust_env=True來讓aiohttp.ClientSession讀取HTTP_PROXYHTTPS_PROXY環境變量中的代理信息(不區分大小寫)。

async with aiohttp.ClientSession() as session:
    async with session.get("http://python.org", trust_env=True) as resp:
        print(resp.status)

查看響應狀態碼

我們可以查詢響應狀態碼:

async with session.get('http://httpbin.org/get') as resp:    
    assert resp.status == 200

獲取響應頭信息

我們可以查看服務器的響應信息, ClientResponse.headers使用的數據類型是CIMultiDcitProxy:

>>> resp.headers
{'ACCESS-CONTROL-ALLOW-ORIGIN': '*',
 'CONTENT-TYPE': 'application/json',
 'DATE': 'Tue, 15 Jul 2014 16:49:51 GMT',
 'SERVER': 'gunicorn/18.0',
 'CONTENT-LENGTH': '331',
 'CONNECTION': 'keep-alive'}

這是一個特別的字典,它只為HTTP頭信息而生。根據 RFC 7230,HTTP頭信息中的名字是不分區大小寫的。同時也支持多個不同的值對應同一個鍵。

所以我們可以通過任意形式訪問它:

>>> resp.headers['Content-Type']
'application/json'
>>> resp.headers.get('content-type')
'application/json'

所有的header信息都是由二進制數據轉換而來,使用帶有surrogateescape選項的UTF-8編碼方式(surrogateescape是一種錯誤處理方式,詳情看))。大部分時候都可以很好的工作,但如果服務器使用的不是標准編碼就不能正常解碼了。從 RFC 7230的角度來看這樣的headers並不是合理的格式,你可以用ClientReponse.resp.raw_headers來查看原形:

>>> resp.raw_headers
((b'SERVER', b'nginx'),
 (b'DATE', b'Sat, 09 Jan 2016 20:28:40 GMT'),
 (b'CONTENT-TYPE', b'text/html; charset=utf-8'),
 (b'CONTENT-LENGTH', b'12150'),
 (b'CONNECTION', b'keep-alive'))

獲取響應cookies:

如果某響應包含一些Cookies,你可以很容易地訪問他們:

url = 'http://example.com/some/cookie/setting/url'
async with session.get(url) as resp:
    print(resp.cookies['example_cookie_name'])

注意:

響應中的cookies只包含重定向鏈中最后一個請求中的Set-Cookies頭信息設置的值。如果每一次重定向請求都收集一次cookies請使用 aiohttp.ClientSession對象.

獲取響應歷史

如果一個請求被重定向了,你可以用history屬性查看其之前的響應:

>>> resp = await session.get('http://example.com/some/redirect/')
>>> resp
<ClientResponse(http://example.com/some/other/url/) [200]>
>>> resp.history
(<ClientResponse(http://example.com/some/redirect/) [301]>,)

如果沒有重定向或allow_redirects設置為False,history會被設置為空。

使用WebSockets

aiohttp提供開箱即用的客戶端websocket。
你需要使用aiohttp.ClientSession.ws_connect()協程對象。它的第一個參數接受URL,返回值是ClientWebSocketResponse,這樣你就可以用響應的方法與websocket服務器進行通信。

session = aiohttp.ClientSession()
async with session.ws_connect('http://example.org/websocket') as ws:
    async for msg in ws:
        if msg.type == aiohttp.WSMsgType.TEXT:
            if msg.data == 'close cmd':
                await ws.close()
                break
            else:
                await ws.send_str(msg.data + '/answer')
        elif msg.type == aiohttp.WSMsgType.CLOSED:
            break
        elif msg.type == aiohttp.WSMsgType.ERROR:
            break

你只能使用一種讀取方式(例如await ws.receive() 或者 async for msg in ws:)和寫入方法,但可以有多個寫入任務,寫入任務也是異步完成的(ws.send_str('data'))。

設置超時

默認情況下每個IO操作有5分鍾超時時間。可以通過給ClientSession.get()及其同類組件傳遞timeout來覆蓋原超時時間:

async with session.get('https://github.com', timeout=60) as r:
    ...

None 或者0則表示不檢測超時。
還可通過調用async_timeout.timeout上下文管理器來為連接和解析響應內容添加一個總超時時間:

import async_timeout
with async_timeout.timeout(0.001):
    async with session.get('https://github.com') as r:
        await r.text()

注意:

超時時間是累計的,包含如發送情況,重定向,響應解析,處理響應等所有操作在內…

愉快地結束:

當一個包含ClientSessionasync with代碼塊的末尾行結束時(或直接調用了.close()),因為asyncio內部的一些原因底層的連接其實沒有關閉。在實際使用中,底層連接需要有一個緩沖時間來關閉。然而,如果事件循環在底層連接關閉之前就結束了,那么會拋出一個 資源警告: 存在未關閉的傳輸(通道)(ResourceWarning: unclosed transport),如果警告可用的話。
為了避免這種情況,在關閉事件循環前加入一小段延遲讓底層連接得到關閉的緩沖時間。
對於非SSL的ClientSession, 使用0即可(await asyncio.sleep(0)):

async def read_website():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://example.org/') as response:
            await response.read()
loop = asyncio.get_event_loop()
loop.run_until_complete(read_website())
# Zero-sleep to allow underlying connections to close
loop.run_until_complete(asyncio.sleep(0))
loop.close()

對於使用了SSL的ClientSession, 需要設置一小段合適的時間:

...
# Wait 250 ms for the underlying SSL connections to close
loop.run_until_complete(asyncio.sleep(0.250))
loop.close()

合適的時間因應用程序而異。

當asyncio內部的運行機制改變時就可以讓aiohttp去等待底層連接關閉在退出啦,上面這種額外的方法總會廢棄啦。你也可以跟進問題#1925來參與改進。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM