aiohttp 的簡單使用!


本文翻譯自aiohttp的官方文檔,如有紕漏,歡迎指出。

aiohttp分為服務器端和客戶端,本文只介紹客戶端。

由於上下文的緣故,請求代碼必須在一個異步的函數中進行:

async def fn():

pass

 

1. aiohttp安裝

 

pip install aiohttp

 

 

1.1. 基本請求用法

 

 

  1.  
    async with aiohttp.request('GET','https://github.com') as r:
  2.  
    await r.text()

其中r.text(), 可以在括號中指定解碼方式,編碼方式,例如

await resp.text(encoding='windows-1251')

 

或者也可以選擇不編碼,適合讀取圖像等,是無法編碼的

await resp.read()

 

  1.  
    #使用示例, 進行一次請求
  2.  
     
  3.  
    import aiohttp, asyncio
  4.  
     
  5.  
    async def main():#aiohttp必須放在異步函數中使用
  6.  
    async with aiohttp.request('GET', 'https://api.github.com/events') as resp:
  7.  
    json = await resp.json()
  8.  
    print(json)
  9.  
     
  10.  
    loop = asyncio.get_event_loop()
  11.  
    loop.run_until_complete(main())
  12.  
     
  13.  
    ------------------------------------------------------------------------------
  14.  
    #使用示例,進行多次請求
  15.  
     
  16.  
    import aiohttp, asyncio
  17.  
     
  18.  
    async def main():#aiohttp必須放在異步函數中使用
  19.  
    tasks = []
  20.  
    [tasks.append(fetch( 'https://api.github.com/events?a={}'.format(i))) for i in range(10)]#十次請求
  21.  
    await asyncio.wait(tasks)
  22.  
     
  23.  
    async def fetch(url):
  24.  
    async with aiohttp.request('GET', url) as resp:
  25.  
    json = await resp.json()
  26.  
    print(json)
  27.  
     
  28.  
    loop = asyncio.get_event_loop()
  29.  
    loop.run_until_complete(main())
  30.  
    ------------------------------------------------------------------------------
  31.  
    #使用示例,進行多次請求,並限制同時請求的數量
  32.  
     
  33.  
    import aiohttp, asyncio
  34.  
     
  35.  
    async def main(pool):#aiohttp必須放在異步函數中使用
  36.  
    tasks = []
  37.  
    sem = asyncio.Semaphore(pool) #限制同時請求的數量
  38.  
    [tasks.append(control_sem(sem, 'https://api.github.com/events?a={}'.format(i))) for i in range(10)]#十次請求
  39.  
    await asyncio.wait(tasks)
  40.  
     
  41.  
    async def control_sem(sem, url):#限制信號量
  42.  
    async with sem:
  43.  
    await fetch(url)
  44.  
     
  45.  
    async def fetch(url):
  46.  
    async with aiohttp.request('GET', url) as resp:
  47.  
    json = await resp.json()
  48.  
    print(json)
  49.  
     
  50.  
    loop = asyncio.get_event_loop()
  51.  
    loop.run_until_complete(main(pool= 2))

上面的示例中可以正確的使用協程進行請求,但是由於aiohttp自身的原因會報 Unclosed client session 的警告。官方不推薦使用aiohttp.request的方式請求,可以將 aiohttp.request 換成 aiohttp.ClientSession(**kw).request的方式即可。 

具體請看2.發起一個session請求

 

2.發起一個session請求

 

首先是導入aiohttp模塊:

 

import aiohttp

然后我們試着獲取一個web源碼,這里以GitHub的public Time-line頁面為例:

 

  1.  
    async with aiohttp.ClientSession() as session:
  2.  
    async with session.get('https://api.github.com/events') as resp:
  3.  
    print(resp.status)
  4.  
    print(await resp.text())

上面的代碼中,我們創建了一個 ClientSession 對象命名為session,然后通過session的get方法得到一個 ClientResponse 對象,命名為resp,get方法中傳入了一個必須的參數url,就是要獲得源碼的http url。至此便通過協程完成了一個異步IO的get請求。

 

有get請求當然有post請求,並且post請求也是一個協程:

 

session.post('http://httpbin.org/post', data=b'data')

用法和get是一樣的,區別是post需要一個額外的參數data,即是需要post的數據。

 

除了get和post請求外,其他http的操作方法也是一樣的:

 

  1.  
    session.put('http://httpbin.org/put', data=b'data')
  2.  
    session.delete('http://httpbin.org/delete')
  3.  
    session.head('http://httpbin.org/get')
  4.  
    session.options('http://httpbin.org/get')
  5.  
    session.patch('http://httpbin.org/patch', data=b'data')

小記:

 

不要為每次的連接都創建一次session,一般情況下只需要創建一個session,然后使用這個session執行所有的請求。

每個session對象,內部包含了一個連接池,並且將會保持連接和連接復用(默認開啟)可以加快整體的性能。

  1.  
    #使用示例
  2.  
     
  3.  
    import aiohttp, asyncio
  4.  
     
  5.  
    async def main(pool):#啟動
  6.  
    sem = asyncio.Semaphore(pool)
  7.  
    async with aiohttp.ClientSession() as session:#給所有的請求,創建同一個session
  8.  
    tasks = []
  9.  
    [tasks.append(control_sem(sem, 'https://api.github.com/events?a={}'.format(i), session)) for i in range(10)]#十次請求
  10.  
    await asyncio.wait(tasks)
  11.  
     
  12.  
    async def control_sem(sem, url, session):#限制信號量
  13.  
    async with sem:
  14.  
    await fetch(url, session)
  15.  
     
  16.  
    async def fetch(url, session):#開啟異步請求
  17.  
    async with session.get(url) as resp:
  18.  
    json = await resp.json()
  19.  
    print(json)
  20.  
     
  21.  
    loop = asyncio.get_event_loop()
  22.  
    loop.run_until_complete(main(pool= 2))

速度快的不要不要的

 

3.在URL中傳遞參數

 

我們經常需要通過 get 在url中傳遞一些參數,參數將會作為url問號后面的一部分發給服務器。在aiohttp的請求中,允許以dict的形式來表示問號后的參數。舉個例子,如果你想傳遞 key1=value1   key2=value2 到 httpbin.org/get 你可以使用下面的代碼:

 

  1.  
    params = {'key1': 'value1', 'key2': 'value2'}
  2.  
    async with session.get('http://httpbin.org/get',
  3.  
    params=params) as resp:
  4.  
    assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'

可以看到,代碼正確的執行了,說明參數被正確的傳遞了進去。不管是一個參數兩個參數,還是更多的參數,都可以通過這種方式來傳遞。除了這種方式之外,還有另外一個,使用一個 list 來傳遞(這種方式可以傳遞一些特殊的參數,例如下面兩個key是相等的也可以正確傳遞):

 

 

  1.  
    params = [('key', 'value1'), ('key', 'value2')]
  2.  
    async with session.get('http://httpbin.org/get',
  3.  
    params=params) as r:
  4.  
    assert r.url == 'http://httpbin.org/get?key=value2&key=value1'

除了上面兩種,我們也可以直接通過傳遞字符串作為參數來傳遞,但是需要注意,通過字符串傳遞的特殊字符不會被編碼:

 

 

  1.  
    async with session.get('http://httpbin.org/get',
  2.  
    params='key=value+1') as r:
  3.  
    assert r.url == 'http://httpbin.org/get?key=value+1'

 

 

4.響應的內容

 

還是以GitHub的公共Time-line頁面為例,我們可以獲得頁面響應的內容:

 

  1.  
    async with session.get('https://api.github.com/events') as resp:
  2.  
    print(await resp.text())

運行之后,會打印出類似於如下的內容:

 

 

'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

resp的text方法,會自動將服務器端返回的內容進行解碼--decode,當然我們也可以自定義編碼方式:

 

 

await resp.text(encoding='gb2312')

除了text方法可以返回解碼后的內容外,我們也可以得到類型是字節的內容:

 

 

print(await resp.read())

運行的結果是:

 

 

b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...

 

gzip和deflate轉換編碼已經為你自動解碼。

小記:

text(),read()方法是把整個響應體讀入內存,如果你是獲取大量的數據,請考慮使用”字節流“(streaming response)

 

5.特殊響應內容:json

 

如果我們獲取的頁面的響應內容是json,aiohttp內置了更好的方法來處理json:

 

  1.  
    async with session.get('https://api.github.com/events') as resp:
  2.  
    print(await resp.json())

如果因為某種原因而導致resp.json()解析json失敗,例如返回不是json字符串等等,那么resp.json()將拋出一個錯誤,也可以給json()方法指定一個解碼方式:

 

 

print(await resp.json(
encoding='gb2312'

)) 或者傳遞一個函數進去:

 

 

print(await resp.json( lambda(x:x.replace('a','b')) ))

 

 

6.以字節流的方式讀取響應內容

 

雖然json(),text(),read()很方便的能把響應的數據讀入到內存,但是我們仍然應該謹慎的使用它們,因為它們是把整個的響應體全部讀入了內存。即使你只是想下載幾個字節大小的文件,但這些方法卻將在內存中加載所有的數據。所以我們可以通過控制字節數來控制讀入內存的響應內容:

 

  1.  
    async with session.get('https://api.github.com/events') as resp:
  2.  
    await resp.content.read(10) #讀取前10個字節

一般地,我們應該使用以下的模式來把讀取的字節流保存到文件中:

 

 

  1.  
    with open(filename, 'wb') as fd:
  2.  
    while True:
  3.  
    chunk = await resp.content.read(chunk_size)
  4.  
    if not chunk:
  5.  
    break
  6.  
    fd.write(chunk)

 

 

7.自定義請求頭

 

如果你想添加請求頭,可以像get添加參數那樣以dict的形式,作為get或者post的參數進行請求:

 

  1.  
    import json
  2.  
    url = 'https://api.github.com/some/endpoint'
  3.  
    payload = {'some': 'data'}
  4.  
    headers = {'content-type': 'application/json'}
  5.  
     
  6.  
    await session.post(url,
  7.  
    data=json.dumps(payload),
  8.  
    headers=headers)

 

 

8.自定義Cookie

 

給服務器發送cookie,可以通過給 ClientSession 傳遞一個cookie參數:

 

  1.  
    url = 'http://httpbin.org/cookies'
  2.  
    cookies = {'cookies_are': 'working'}
  3.  
    async with ClientSession(cookies=cookies) as session:
  4.  
    async with session.get(url) as resp:
  5.  
    assert await resp.json() == {
  6.  
    "cookies": {"cookies_are": "working"}}

 

可直接訪問鏈接 “httpbin.org/cookies”查看當前cookie,訪問session中的cookie請見第10節。

 

9.post數據的幾種方式

 

(1)模擬表單post數據

 

  1.  
    payload = {'key1': 'value1', 'key2': 'value2'}
  2.  
    async with session.post('http://httpbin.org/post',
  3.  
    data=payload) as resp:
  4.  
    print(await resp.text())

 

注意:data=dict的方式post的數據將被轉碼,和form提交數據是一樣的作用,如果你不想被轉碼,可以直接以字符串的形式 data=str 提交,這樣就不會被轉碼。

(2)post json

 

  1.  
    import json
  2.  
    url = 'https://api.github.com/some/endpoint'
  3.  
    payload = {'some': 'data'}
  4.  
     
  5.  
    async with session.post(url, data=json.dumps(payload)) as resp:
  6.  
    ...

其實json.dumps(payload)返回的也是一個字符串,只不過這個字符串可以被識別為json格式

 

(3)post 小文件

 

  1.  
    url = 'http://httpbin.org/post'
  2.  
    files = {'file': open('report.xls', 'rb')}
  3.  
     
  4.  
    await session.post(url, data=files)

可以設置好文件名和content-type:

 

 

  1.  
    url = 'http://httpbin.org/post'
  2.  
    data = FormData()
  3.  
    data.add_field('file',
  4.  
    open('report.xls', 'rb'),
  5.  
    filename='report.xls',
  6.  
    content_type='application/vnd.ms-excel')
  7.  
     
  8.  
    await session.post(url, data=data)

如果將文件對象設置為數據參數,aiohttp將自動以字節流的形式發送給服務器。

 

(4)post 大文件

aiohttp支持多種類型的文件以流媒體的形式上傳,所以我們可以在文件未讀入內存的情況下發送大文件。

 

 

  1.  
    @aiohttp.streamer
  2.  
    def file_sender(writer, file_name=None):
  3.  
    with open(file_name, 'rb') as f:
  4.  
    chunk = f.read(2**16)
  5.  
    while chunk:
  6.  
    yield from writer.write(chunk)
  7.  
    chunk = f.read(2**16)
  8.  
     
  9.  
    # Then you can use `file_sender` as a data provider:
  10.  
     
  11.  
    async with session.post('http://httpbin.org/post',
  12.  
    data=file_sender(file_name='huge_file')) as resp:
  13.  
    print(await resp.text())

同時我們可以從一個url獲取文件后,直接post給另一個url,並計算hash值:

 

 

  1.  
    async def feed_stream(resp, stream):
  2.  
    h = hashlib.sha256()
  3.  
     
  4.  
    while True:
  5.  
    chunk = await resp.content.readany()
  6.  
    if not chunk:
  7.  
    break
  8.  
    h.update(chunk)
  9.  
    stream.feed_data(chunk)
  10.  
     
  11.  
    return h.hexdigest()
  12.  
     
  13.  
    resp = session.get('http://httpbin.org/post')
  14.  
    stream = StreamReader()
  15.  
    loop.create_task(session.post('http://httpbin.org/post', data=stream))
  16.  
     
  17.  
    file_hash = await feed_stream(resp, stream)

因為響應內容類型是StreamReader,所以可以把get和post連接起來,同時進行post和get:

 

 

  1.  
    r = await session.get('http://python.org')
  2.  
    await session.post('http://httpbin.org/post',
  3.  
    data=r.content)

(5)post預壓縮數據

 

在通過aiohttp發送前就已經壓縮的數據, 調用壓縮函數的函數名(通常是deflate 或 zlib)作為content-encoding的值:

 

  1.  
    async def my_coroutine(session, headers, my_data):
  2.  
    data = zlib.compress(my_data)
  3.  
    headers = {'Content-Encoding': 'deflate'}
  4.  
    async with session.post('http://httpbin.org/post',
  5.  
    data=data,
  6.  
    headers=headers)
  7.  
    pass

 

 

10.keep-alive, 連接池,共享cookie

 

ClientSession 用於在多個連接之間共享cookie:

 

  1.  
    async with aiohttp.ClientSession() as session:
  2.  
    await session.get(
  3.  
    'http://httpbin.org/cookies/set?my_cookie=my_value')
  4.  
    filtered = session.cookie_jar.filter_cookies('http://httpbin.org')
  5.  
    assert filtered['my_cookie'].value == 'my_value'
  6.  
    async with session.get('http://httpbin.org/cookies') as r:
  7.  
    json_body = await r.json()
  8.  
    assert json_body['cookies']['my_cookie'] == 'my_value'

也可以為所有的連接設置共同的請求頭:

 

 

  1.  
    async with aiohttp.ClientSession(
  2.  
    headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session:
  3.  
    async with session.get("http://httpbin.org/headers") as r:
  4.  
    json_body = await r.json()
  5.  
    assert json_body['headers']['Authorization'] == \
  6.  
    'Basic bG9naW46cGFzcw=='

ClientSession 還支持 keep-alive連接和連接池(connection pooling)

 

 

11.cookie安全性

 

默認ClientSession使用的是嚴格模式的 aiohttp.CookieJar. RFC 2109,明確的禁止接受url和ip地址產生的cookie,只能接受 DNS 解析IP產生的cookie。可以通過設置aiohttp.CookieJar 的 unsafe=True 來配置:

 

  1.  
    jar = aiohttp.CookieJar(unsafe=True)
  2.  
    session = aiohttp.ClientSession(cookie_jar=jar)

 

 

12.控制同時連接的數量(連接池)

 

也可以理解為同時請求的數量,為了限制同時打開的連接數量,我們可以將限制參數傳遞給連接器:

 

conn = aiohttp.TCPConnector(limit=30)#同時最大進行連接的連接數為30,默認是100,limit=0的時候是無限制

限制同時打開限制同時打開連接到同一端點的數量((host, port, is_ssl) 三的倍數),可以通過設置 limit_per_host 參數:

 

 

conn = aiohttp.TCPConnector(limit_per_host=30)#默認是0

 

 

13.自定義域名解析

 

我們可以指定域名服務器的 IP 對我們提供的get或post的url進行解析:

 

  1.  
    from aiohttp.resolver import AsyncResolver
  2.  
     
  3.  
    resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
  4.  
    conn = aiohttp.TCPConnector(resolver=resolver)

 

 

14.設置代理

 

aiohttp支持使用代理來訪問網頁:

 

  1.  
    async with aiohttp.ClientSession() as session:
  2.  
    async with session.get("http://python.org",
  3.  
    proxy="http://some.proxy.com") as resp:
  4.  
    print(resp.status)

當然也支持需要授權的頁面:

  1.  
    async with aiohttp.ClientSession() as session:
  2.  
    proxy_auth = aiohttp.BasicAuth('user', 'pass')
  3.  
    async with session.get("http://python.org",
  4.  
    proxy="http://some.proxy.com",
  5.  
    proxy_auth=proxy_auth) as resp:
  6.  
    print(resp.status)

或者通過這種方式來驗證授權:

  1.  
    session.get("http://python.org",
  2.  
    proxy="http://user:pass@some.proxy.com")

 

 

15.響應狀態碼 response status code

 

可以通過 resp.status來檢查狀態碼是不是200:

 

  1.  
    async with session.get('http://httpbin.org/get') as resp:
  2.  
    assert resp.status == 200

 

 

16.響應頭

 

我們可以直接使用 resp.headers 來查看響應頭,得到的值類型是一個dict:

 

  1.  
    >>> resp.headers
  2.  
    {'ACCESS-CONTROL-ALLOW-ORIGIN': '*',
  3.  
    'CONTENT-TYPE': 'application/json',
  4.  
    'DATE': 'Tue, 15 Jul 2014 16:49:51 GMT',
  5.  
    'SERVER': 'gunicorn/18.0',
  6.  
    'CONTENT-LENGTH': '331',
  7.  
    'CONNECTION': 'keep-alive'}

或者我們可以查看原生的響應頭:

 

 

  1.  
    >>> resp.raw_headers
  2.  
    ((b'SERVER', b'nginx'),
  3.  
    (b'DATE', b'Sat, 09 Jan 2016 20:28:40 GMT'),
  4.  
    (b'CONTENT-TYPE', b'text/html; charset=utf-8'),
  5.  
    (b'CONTENT-LENGTH', b'12150'),
  6.  
    (b'CONNECTION', b'keep-alive'))

 

 

17.查看cookie

 

 

  1.  
    url = 'http://example.com/some/cookie/setting/url'
  2.  
    async with session.get(url) as resp:
  3.  
    print(resp.cookies)

 

18.重定向的響應頭

 

 

如果一個請求被重定向了,我們依然可以查看被重定向之前的響應頭信息:

 

  1.  
    >>> resp = await session.get('http://example.com/some/redirect/')
  2.  
    >>> resp
  3.  
    <ClientResponse(http://example.com/some/other/url/) [200]>
  4.  
    >>> resp.history
  5.  
    ( <ClientResponse(http://example.com/some/redirect/) [301]>,)

 

 

19.超時處理

 

默認的IO操作都有5分鍾的響應時間 我們可以通過 timeout 進行重寫:

 

  1.  
    async with session.get('https://github.com', timeout=60) as r:
  2.  
    ...

如果 timeout=None 或者 timeout=0 將不進行超時檢查,也就是不限時長。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM