本文翻譯自aiohttp的官方文檔,如有紕漏,歡迎指出。
aiohttp分為服務器端和客戶端,本文只介紹客戶端。
由於上下文的緣故,請求代碼必須在一個異步的函數中進行:
async def fn():
pass
1. aiohttp安裝
pip install aiohttp
1.1. 基本請求用法
-
async with aiohttp.request('GET','https://github.com') as r:
-
await r.text()
其中r.text(), 可以在括號中指定解碼方式,編碼方式,例如
await resp.text(encoding='windows-1251')
或者也可以選擇不編碼,適合讀取圖像等,是無法編碼的
await resp.read()
-
#使用示例, 進行一次請求
-
-
import aiohttp, asyncio
-
-
async def main():#aiohttp必須放在異步函數中使用
-
async with aiohttp.request('GET', 'https://api.github.com/events') as resp:
-
json = await resp.json()
-
print(json)
-
-
loop = asyncio.get_event_loop()
-
loop.run_until_complete(main())
-
-
------------------------------------------------------------------------------
-
#使用示例,進行多次請求
-
-
import aiohttp, asyncio
-
-
async def main():#aiohttp必須放在異步函數中使用
-
tasks = []
-
[tasks.append(fetch( 'https://api.github.com/events?a={}'.format(i))) for i in range(10)]#十次請求
-
await asyncio.wait(tasks)
-
-
async def fetch(url):
-
async with aiohttp.request('GET', url) as resp:
-
json = await resp.json()
-
print(json)
-
-
loop = asyncio.get_event_loop()
-
loop.run_until_complete(main())
-
------------------------------------------------------------------------------
-
#使用示例,進行多次請求,並限制同時請求的數量
-
-
import aiohttp, asyncio
-
-
async def main(pool):#aiohttp必須放在異步函數中使用
-
tasks = []
-
sem = asyncio.Semaphore(pool) #限制同時請求的數量
-
[tasks.append(control_sem(sem, 'https://api.github.com/events?a={}'.format(i))) for i in range(10)]#十次請求
-
await asyncio.wait(tasks)
-
-
async def control_sem(sem, url):#限制信號量
-
async with sem:
-
await fetch(url)
-
-
async def fetch(url):
-
async with aiohttp.request('GET', url) as resp:
-
json = await resp.json()
-
print(json)
-
-
loop = asyncio.get_event_loop()
-
loop.run_until_complete(main(pool= 2))
上面的示例中可以正確的使用協程進行請求,但是由於aiohttp自身的原因會報 Unclosed client session 的警告。官方不推薦使用aiohttp.request的方式請求,可以將 aiohttp.request 換成 aiohttp.ClientSession(**kw).request的方式即可。
具體請看2.發起一個session請求
2.發起一個session請求
首先是導入aiohttp模塊:
import aiohttp
然后我們試着獲取一個web源碼,這里以GitHub的public Time-line頁面為例:
-
async with aiohttp.ClientSession() as session:
-
async with session.get('https://api.github.com/events') as resp:
-
print(resp.status)
-
print(await resp.text())
上面的代碼中,我們創建了一個 ClientSession 對象命名為session,然后通過session的get方法得到一個 ClientResponse 對象,命名為resp,get方法中傳入了一個必須的參數url,就是要獲得源碼的http url。至此便通過協程完成了一個異步IO的get請求。
有get請求當然有post請求,並且post請求也是一個協程:
session.post('http://httpbin.org/post', data=b'data')
用法和get是一樣的,區別是post需要一個額外的參數data,即是需要post的數據。
除了get和post請求外,其他http的操作方法也是一樣的:
-
session.put('http://httpbin.org/put', data=b'data')
-
session.delete('http://httpbin.org/delete')
-
session.head('http://httpbin.org/get')
-
session.options('http://httpbin.org/get')
-
session.patch('http://httpbin.org/patch', data=b'data')
小記:
不要為每次的連接都創建一次session,一般情況下只需要創建一個session,然后使用這個session執行所有的請求。
每個session對象,內部包含了一個連接池,並且將會保持連接和連接復用(默認開啟)可以加快整體的性能。
-
#使用示例
-
-
import aiohttp, asyncio
-
-
async def main(pool):#啟動
-
sem = asyncio.Semaphore(pool)
-
async with aiohttp.ClientSession() as session:#給所有的請求,創建同一個session
-
tasks = []
-
[tasks.append(control_sem(sem, 'https://api.github.com/events?a={}'.format(i), session)) for i in range(10)]#十次請求
-
await asyncio.wait(tasks)
-
-
async def control_sem(sem, url, session):#限制信號量
-
async with sem:
-
await fetch(url, session)
-
-
async def fetch(url, session):#開啟異步請求
-
async with session.get(url) as resp:
-
json = await resp.json()
-
print(json)
-
-
loop = asyncio.get_event_loop()
-
loop.run_until_complete(main(pool= 2))
速度快的不要不要的
3.在URL中傳遞參數
我們經常需要通過 get 在url中傳遞一些參數,參數將會作為url問號后面的一部分發給服務器。在aiohttp的請求中,允許以dict的形式來表示問號后的參數。舉個例子,如果你想傳遞 key1=value1 key2=value2 到 httpbin.org/get 你可以使用下面的代碼:
-
params = {'key1': 'value1', 'key2': 'value2'}
-
async with session.get('http://httpbin.org/get',
-
params=params) as resp:
-
assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'
可以看到,代碼正確的執行了,說明參數被正確的傳遞了進去。不管是一個參數兩個參數,還是更多的參數,都可以通過這種方式來傳遞。除了這種方式之外,還有另外一個,使用一個 list 來傳遞(這種方式可以傳遞一些特殊的參數,例如下面兩個key是相等的也可以正確傳遞):
-
params = [('key', 'value1'), ('key', 'value2')]
-
async with session.get('http://httpbin.org/get',
-
params=params) as r:
-
assert r.url == 'http://httpbin.org/get?key=value2&key=value1'
除了上面兩種,我們也可以直接通過傳遞字符串作為參數來傳遞,但是需要注意,通過字符串傳遞的特殊字符不會被編碼:
-
async with session.get('http://httpbin.org/get',
-
params='key=value+1') as r:
-
assert r.url == 'http://httpbin.org/get?key=value+1'
4.響應的內容
還是以GitHub的公共Time-line頁面為例,我們可以獲得頁面響應的內容:
-
async with session.get('https://api.github.com/events') as resp:
-
print(await resp.text())
運行之后,會打印出類似於如下的內容:
'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...
resp的text方法,會自動將服務器端返回的內容進行解碼--decode,當然我們也可以自定義編碼方式:
await resp.text(encoding='gb2312')
除了text方法可以返回解碼后的內容外,我們也可以得到類型是字節的內容:
print(await resp.read())
運行的結果是:
b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...
gzip和deflate轉換編碼已經為你自動解碼。
小記:
text(),read()方法是把整個響應體讀入內存,如果你是獲取大量的數據,請考慮使用”字節流“(streaming response)
5.特殊響應內容:json
如果我們獲取的頁面的響應內容是json,aiohttp內置了更好的方法來處理json:
-
async with session.get('https://api.github.com/events') as resp:
-
print(await resp.json())
如果因為某種原因而導致resp.json()解析json失敗,例如返回不是json字符串等等,那么resp.json()將拋出一個錯誤,也可以給json()方法指定一個解碼方式:
print(await resp.json(
encoding='gb2312'
)) 或者傳遞一個函數進去:
print(await resp.json( lambda(x:x.replace('a','b')) ))
6.以字節流的方式讀取響應內容
雖然json(),text(),read()很方便的能把響應的數據讀入到內存,但是我們仍然應該謹慎的使用它們,因為它們是把整個的響應體全部讀入了內存。即使你只是想下載幾個字節大小的文件,但這些方法卻將在內存中加載所有的數據。所以我們可以通過控制字節數來控制讀入內存的響應內容:
-
async with session.get('https://api.github.com/events') as resp:
-
await resp.content.read(10) #讀取前10個字節
一般地,我們應該使用以下的模式來把讀取的字節流保存到文件中:
-
with open(filename, 'wb') as fd:
-
while True:
-
chunk = await resp.content.read(chunk_size)
-
if not chunk:
-
break
-
fd.write(chunk)
7.自定義請求頭
如果你想添加請求頭,可以像get添加參數那樣以dict的形式,作為get或者post的參數進行請求:
-
import json
-
url = 'https://api.github.com/some/endpoint'
-
payload = {'some': 'data'}
-
headers = {'content-type': 'application/json'}
-
-
await session.post(url,
-
data=json.dumps(payload),
-
headers=headers)
8.自定義Cookie
給服務器發送cookie,可以通過給 ClientSession 傳遞一個cookie參數:
-
url = 'http://httpbin.org/cookies'
-
cookies = {'cookies_are': 'working'}
-
async with ClientSession(cookies=cookies) as session:
-
async with session.get(url) as resp:
-
assert await resp.json() == {
-
"cookies": {"cookies_are": "working"}}
可直接訪問鏈接 “httpbin.org/cookies”查看當前cookie,訪問session中的cookie請見第10節。
9.post數據的幾種方式
(1)模擬表單post數據
-
payload = {'key1': 'value1', 'key2': 'value2'}
-
async with session.post('http://httpbin.org/post',
-
data=payload) as resp:
-
print(await resp.text())
注意:data=dict的方式post的數據將被轉碼,和form提交數據是一樣的作用,如果你不想被轉碼,可以直接以字符串的形式 data=str 提交,這樣就不會被轉碼。
(2)post json
-
import json
-
url = 'https://api.github.com/some/endpoint'
-
payload = {'some': 'data'}
-
-
async with session.post(url, data=json.dumps(payload)) as resp:
-
...
其實json.dumps(payload)返回的也是一個字符串,只不過這個字符串可以被識別為json格式
(3)post 小文件
-
url = 'http://httpbin.org/post'
-
files = {'file': open('report.xls', 'rb')}
-
-
await session.post(url, data=files)
可以設置好文件名和content-type:
-
url = 'http://httpbin.org/post'
-
data = FormData()
-
data.add_field('file',
-
open('report.xls', 'rb'),
-
filename='report.xls',
-
content_type='application/vnd.ms-excel')
-
-
await session.post(url, data=data)
如果將文件對象設置為數據參數,aiohttp將自動以字節流的形式發送給服務器。
(4)post 大文件
aiohttp支持多種類型的文件以流媒體的形式上傳,所以我們可以在文件未讀入內存的情況下發送大文件。
-
@aiohttp.streamer
-
def file_sender(writer, file_name=None):
-
with open(file_name, 'rb') as f:
-
chunk = f.read(2**16)
-
while chunk:
-
yield from writer.write(chunk)
-
chunk = f.read(2**16)
-
-
# Then you can use `file_sender` as a data provider:
-
-
async with session.post('http://httpbin.org/post',
-
data=file_sender(file_name='huge_file')) as resp:
-
print(await resp.text())
同時我們可以從一個url獲取文件后,直接post給另一個url,並計算hash值:
-
async def feed_stream(resp, stream):
-
h = hashlib.sha256()
-
-
while True:
-
chunk = await resp.content.readany()
-
if not chunk:
-
break
-
h.update(chunk)
-
stream.feed_data(chunk)
-
-
return h.hexdigest()
-
-
resp = session.get('http://httpbin.org/post')
-
stream = StreamReader()
-
loop.create_task(session.post('http://httpbin.org/post', data=stream))
-
-
file_hash = await feed_stream(resp, stream)
因為響應內容類型是StreamReader,所以可以把get和post連接起來,同時進行post和get:
-
r = await session.get('http://python.org')
-
await session.post('http://httpbin.org/post',
-
data=r.content)
(5)post預壓縮數據
在通過aiohttp發送前就已經壓縮的數據, 調用壓縮函數的函數名(通常是deflate 或 zlib)作為content-encoding的值:
-
async def my_coroutine(session, headers, my_data):
-
data = zlib.compress(my_data)
-
headers = {'Content-Encoding': 'deflate'}
-
async with session.post('http://httpbin.org/post',
-
data=data,
-
headers=headers)
-
pass
10.keep-alive, 連接池,共享cookie
ClientSession 用於在多個連接之間共享cookie:
-
async with aiohttp.ClientSession() as session:
-
await session.get(
-
'http://httpbin.org/cookies/set?my_cookie=my_value')
-
filtered = session.cookie_jar.filter_cookies('http://httpbin.org')
-
assert filtered['my_cookie'].value == 'my_value'
-
async with session.get('http://httpbin.org/cookies') as r:
-
json_body = await r.json()
-
assert json_body['cookies']['my_cookie'] == 'my_value'
也可以為所有的連接設置共同的請求頭:
-
async with aiohttp.ClientSession(
-
headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session:
-
async with session.get("http://httpbin.org/headers") as r:
-
json_body = await r.json()
-
assert json_body['headers']['Authorization'] == \
-
'Basic bG9naW46cGFzcw=='
ClientSession 還支持 keep-alive連接和連接池(connection pooling)
11.cookie安全性
默認ClientSession使用的是嚴格模式的 aiohttp.CookieJar. RFC 2109,明確的禁止接受url和ip地址產生的cookie,只能接受 DNS 解析IP產生的cookie。可以通過設置aiohttp.CookieJar 的 unsafe=True 來配置:
-
jar = aiohttp.CookieJar(unsafe=True)
-
session = aiohttp.ClientSession(cookie_jar=jar)
12.控制同時連接的數量(連接池)
也可以理解為同時請求的數量,為了限制同時打開的連接數量,我們可以將限制參數傳遞給連接器:
conn = aiohttp.TCPConnector(limit=30)#同時最大進行連接的連接數為30,默認是100,limit=0的時候是無限制
限制同時打開限制同時打開連接到同一端點的數量((host, port, is_ssl) 三的倍數),可以通過設置 limit_per_host 參數:
conn = aiohttp.TCPConnector(limit_per_host=30)#默認是0
13.自定義域名解析
我們可以指定域名服務器的 IP 對我們提供的get或post的url進行解析:
-
from aiohttp.resolver import AsyncResolver
-
-
resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
-
conn = aiohttp.TCPConnector(resolver=resolver)
14.設置代理
aiohttp支持使用代理來訪問網頁:
-
async with aiohttp.ClientSession() as session:
-
async with session.get("http://python.org",
-
proxy="http://some.proxy.com") as resp:
-
print(resp.status)
當然也支持需要授權的頁面:
-
async with aiohttp.ClientSession() as session:
-
proxy_auth = aiohttp.BasicAuth('user', 'pass')
-
async with session.get("http://python.org",
-
proxy="http://some.proxy.com",
-
proxy_auth=proxy_auth) as resp:
-
print(resp.status)
或者通過這種方式來驗證授權:
-
session.get("http://python.org",
-
proxy="http://user:pass@some.proxy.com")
15.響應狀態碼 response status code
可以通過 resp.status來檢查狀態碼是不是200:
-
async with session.get('http://httpbin.org/get') as resp:
-
assert resp.status == 200
16.響應頭
我們可以直接使用 resp.headers 來查看響應頭,得到的值類型是一個dict:
-
>>> resp.headers
-
{'ACCESS-CONTROL-ALLOW-ORIGIN': '*',
-
'CONTENT-TYPE': 'application/json',
-
'DATE': 'Tue, 15 Jul 2014 16:49:51 GMT',
-
'SERVER': 'gunicorn/18.0',
-
'CONTENT-LENGTH': '331',
-
'CONNECTION': 'keep-alive'}
或者我們可以查看原生的響應頭:
-
>>> resp.raw_headers
-
((b'SERVER', b'nginx'),
-
(b'DATE', b'Sat, 09 Jan 2016 20:28:40 GMT'),
-
(b'CONTENT-TYPE', b'text/html; charset=utf-8'),
-
(b'CONTENT-LENGTH', b'12150'),
-
(b'CONNECTION', b'keep-alive'))
17.查看cookie
-
url = 'http://example.com/some/cookie/setting/url'
-
async with session.get(url) as resp:
-
print(resp.cookies)
18.重定向的響應頭
如果一個請求被重定向了,我們依然可以查看被重定向之前的響應頭信息:
-
>>> resp = await session.get('http://example.com/some/redirect/')
-
>>> resp
-
<ClientResponse(http://example.com/some/other/url/) [200]>
-
>>> resp.history
-
( <ClientResponse(http://example.com/some/redirect/) [301]>,)
19.超時處理
默認的IO操作都有5分鍾的響應時間 我們可以通過 timeout 進行重寫:
-
async with session.get('https://github.com', timeout=60) as r:
-
...
如果 timeout=None 或者 timeout=0 將不進行超時檢查,也就是不限時長。