aiohttp支持異步操作的網絡請求的模塊
1.一個簡單異步協程爬取
- read()
- text(encoding=編碼) 比如:await r.text(encoding="utf-8")
import asyncio
import aiohttp
async def request(url):
print("當前url:",url)
#使用aiohttp發起request請求。
async with aiohttp.request("GET",url) as r:
#r.read()不變嗎,直接讀取。返回來是二進制文件
reponse = await r.read()
print("返回reponse:",reponse)
urls = [
'https://www.baidu.com',
'https://www.sogou.com',
'https://www.qq.com',
]
#任務列表,存放多個任務對象
stasks=[]
for url in urls:
c = request(url)
task = asyncio.ensure_future(c)
stasks.append(task)
loop = asyncio.get_event_loop()
#需要將任務列表封裝到wait中
loop.run_until_complete(asyncio.wait(stasks))
2.發起session請求
#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
Xu Junkai
"""
import requests
import asyncio
import time
import aiohttp
start_time = time.time()
urls = [
'https://blog.csdn.net/',
'https://www.sogou.com',
'http://www.renren.com/',
]
async def get_page(url):
print(url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
print(res.status)#獲取相應狀態碼
print(res.charset)#獲取網頁編碼
reponse = await res.text()#獲取返回文本
print(reponse)
tasks=[]
for url in urls:
c = get_page(url)
task = asyncio.ensure_future(c)
tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end_time = time.time()
print('總耗時:',end_time-start_time)
- session.put
async with session.put(url,data=b"data")
注意:
不要為每次的連接都創建一次session,一般情況下只需要創建一個session,然后使用這個session執行所有的請求。
每個session對象,內部包含了一個連接池,並且將會保持連接和連接復用(默認開啟)可以加快整體的性能
3.url中傳遞參數
import asyncio
import time
import aiohttp
start_time = time.time()
urls = [
'https://blog.csdn.net/',
'https://www.sogou.com',
'http://www.renren.com/',
]
data = {"name":"foo"}
async def get_page(url,data):#定義函數可以放入多個參數
print(url)
async with aiohttp.ClientSession() as session:
async with session.get(url,params= data) as res:
print(res.status)
#獲取響應內容(由於獲取響應內容是一個阻塞耗時過程,所以我們使用await實現協程切換)
reponse = await res.text()
print(reponse)
print(res.charset)
tasks=[]
for url in urls:
c = get_page(url,data)#傳入參數,但不會執行
task = asyncio.ensure_future(c)
tasks.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end_time = time.time()
print('總耗時:',end_time-start_time)
注意
當使用res.text(),res.read()獲取響應內容(由於獲取響應內容是一個阻塞耗時過程,所以我們使用await實現協程切換)
正確寫法
await res.text()
await res.read() #獲取是字節
await res.json() 可以設置編碼,設置處理函數
注意:
res.json()為Requests中內置的JSON解碼器
其中只有response返回為json格式時,用res.json()打印出響應的內容.
如果response返回不為json格式,使用res.json()會報錯
4.StreamResponse
- 因為text(),read()方法是把整個響應體讀入內存,如果你是獲取大量的數據,請考慮使用”字節流“(StreamResponse)
#字節流形式獲取數據
import asyncio
import aiohttp
urls ='https://blog.csdn.net/'
async def get_page(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
#打印100個字節的數據
print(await res.content.read(100))
c = get_page(urls,)#函數對象
task = asyncio.ensure_future(c)#放入ensure_future中
loop = asyncio.get_event_loop()#創建循環事件
loop.run_until_complete(task)
#獲取100個字節數據
- 字節流形式讀取數據,保存文件
import asyncio
import aiohttp
urls ='https://blog.csdn.net/'
async def get_page(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
with open("cnds.text","wb") as fp:
#循環,100個字節100個字節讀取放入文件中
while True:
chunk = await res.content.read(100)
if not chunk:
break
fp.write(chunk)
c = get_page(urls,)
task = asyncio.ensure_future(c)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
注意
async with session.get(url) as res:#異步上下文管理器
with open("cnds.text","wb") as fp:#普通上下文管理器
#因為異步上下文管理器在enter和exit方法處能夠暫停執行上下文管理器
#為了實現此功能,加入了2個新方法:__aenter__ 和__aexit__這兩個方法都要返回一個 awaitable類型的值。
詳見:
https://www.jb51.net/article/163540.htm
異步迭代器
5.自定義請求頭
#與requests方法一樣,headers放User-agent比較多。
async def get_page(url):
async with aiohttp.ClientSession() as session:
headers = {'Content-Type':'text/html; charset=utf-8'}
async with session.get(url,headers=headers) as res:
with open("cnds.text","wb") as fp:
#循環,100個字節100個字節讀取放入文件中
while True:
chunk = await res.content.read(100)
if not chunk:
break
fp.write(chunk)
6.自定義cookie
- 注意:對於自定義cookie,我們需要設置在ClientSession(cookies=自定義cookie字典),而不是session.get()中
#源碼顯示
class ClientSession:
"""First-class interface for making HTTP requests."""
ATTRS = frozenset([
'_source_traceback', '_connector',
'requote_redirect_url', '_loop', '_cookie_jar',
'_connector_owner', '_default_auth',
'_version', '_json_serialize',
'_requote_redirect_url',
'_timeout', '_raise_for_status', '_auto_decompress',
'_trust_env', '_default_headers', '_skip_auto_headers',
'_request_class', '_response_class',
'_ws_response_class', '_trace_configs'])
_source_traceback = None
_connector = None
def __init__(self, *, connector: Optional[BaseConnector]=None,
loop: Optional[asyncio.AbstractEventLoop]=None,
cookies: Optional[LooseCookies]=None,
headers: Optional[LooseHeaders]=None,
skip_auto_headers: Optional[Iterable[str]]=None,
auth: Optional[BasicAuth]=None,
json_serialize: JSONEncoder=json.dumps,
request_class: Type[ClientRequest]=ClientRequest,
response_class: Type[ClientResponse]=ClientResponse,
ws_response_class: Type[ClientWebSocketResponse]=ClientWebSocketResponse, # noqa
version: HttpVersion=http.HttpVersion11,
cookie_jar: Optional[AbstractCookieJar]=None,
connector_owner: bool=True,
raise_for_status: bool=False,
read_timeout: Union[float, object]=sentinel,
conn_timeout: Optional[float]=None,
timeout: Union[object, ClientTimeout]=sentinel,
auto_decompress: bool=True,
trust_env: bool=False,
requote_redirect_url: bool=True,
trace_configs: Optional[List[TraceConfig]]=None) -> None:
- 使用
cookies = {"cookies":"xxxxxxxxxx"}
async with ClientSession(cookies=cookies) as session:
...
7.獲取網站響應狀態碼
-
res.status
async with session.get(url) as res: print(res.status)
8.查看響應頭
- res.headers 查看響應頭,得到值類型是一個dick
- res.raw_headers 查看原生響應頭,字節類型
import asyncio
import aiohttp
async def get_page(url):
async with aiohttp.ClientSession() as session:
headers = {'Content-Type':'text/html; charset=utf-8'}
async with session.get(url,headers=headers) as res:
for item,values in res.headers.items():
print(item,"*******",values)
c = get_page(urls,)
task = asyncio.ensure_future(c)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
9.查看重定向的響應頭
- res.history
10.超時處理
-
默認IO操作都有5分鍾響應時間,但是時間太長,我們可以自己設置timeout
-
如果timeout=None或timeout=0將不進行超時檢查。也就不限時長。
async with session.get("https://baidu.com",timeout=60) as res: pass
11.ClientSession用於多個連接之間(同一個網站)共享cookie.
import aiohttp
import asyncio
async def request():
#設置一個cookies
cookies = {"my_cookie":"my_set_cookies"}
async with aiohttp.ClientSession(cookies=cookies) as session:
async with session.get("https://www.csdn.net/") as res:
print(session.cookie_jar.filter_cookies("https://www.csdn.net/nav/python"))
print("*******************************************")
async with session.get("https://www.csdn.net/") as res:
print(session.cookie_jar.filter_cookies("https://www.csdn.net/nav/java"))
c = request()
task = asyncio.ensure_future(c)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
#Set-Cookie: dc_session_id=10_1562499942692.566280
#Set-Cookie: my_cookie=my_set_cookies
#Set-Cookie: uuid_tt_dd=10_20709428800-1562499942692-906566
#*******************************************
#Set-Cookie: dc_session_id=10_1562499942692.566280
#Set-Cookie: my_cookie=my_set_cookies
#Set-Cookie: uuid_tt_dd=10_20709428800-1562499942692-906566
-
最好使用session.cookie_jar.filter_cookies()獲取網站cookie,不同於requests模塊,雖然我們可以使用res.cookies有可能獲取到cookie,但似乎並未獲取到所有的cookies。
-
總結
1.當我們使用res.cookie時,只會獲取到當前url下設置的cookie,不會維護整站的cookie 2.而session.cookie_jar.filter_cookies(url)會一直保留這個網站的所有設置cookies,含有我們在會話時設置的cookie,並且會根據響應修改更新cookie。這個才是我們需要的 3.而我們設置cookie,也是需要在aiohttp.ClientSession(cookies=cookies)中設置 4.ClientSession 還支持 請求頭,keep-alive連接和連接池(connection pooling)
12.cookie的安全性
-
默認ClientSession使用的是嚴格模式的 aiohttp.CookieJar. RFC 2109,明確的禁止接受url和ip地址產生的cookie,只能接受 DNS 解析IP產生的cookie。可以通過設置aiohttp.CookieJar 的 unsafe=True 來配置
jar = aiohttp.CookieJar(unsafe=True) session = aiohttp.ClientSession(cookie_jar=jar)
13控制連接數量
-
TCPConnector維持鏈接池,限制並行連接的總量,當池滿了,有請求退出再加入新請求
async def request(): cookies = {"my_cookies":"my_cookies"} #限制並行的數量 conn = aiohttp.TCPConnector(limit=5) async with aiohttp.ClientSession(cookies=cookies,connector=conn) as session: pass c = request() task = asyncio.ensure_future(c) loop = asyncio.get_event_loop() loop.run_until_complete(task)
-
限制同時打開連接到同一端點的數量,可以通過設置 limit_per_host 參數:
limit_per_host: 同一端點的最大連接數量。同一端點即(host, port, is_ssl)完全相同情況。 conn = aiohttp.TCPConnector(limit_per_host=30)#默認是0
14一個小例子
import asyncio
import aiohttp
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36",
}
def callback(task):
#回調函數可以對頁面進行解析,這里圖省事就打印了
print(len(task.result()))
async def res(url):
async with aiohttp.request('GET',url,headers=headers)as fp:
#
response =await fp.read()
#因訪問3個網站編碼方式不同,統一轉碼(ISO-8859-1比較全)
response = response.decode('iso-8859-1')
# 返回給回調好書
return response
urls = [
'https://www.baidu.com',
'https://www.sogou.com',
'https://www.qq.com',
]
#proxy="http://some.proxy.com"
if __name__ == '__main__':
#創建
stasks = []
for url in urls:
#創建協程對象
c = res(url)
#封裝任務對象
task = asyncio.ensure_future(c)
#給任務對象綁定回調函數
task.add_done_callback(callback)
#添加列表中
stasks.append(task)
# 創建一個事件循環對象
loop = asyncio.get_event_loop()
#將任務對象列表注冊到事件循環對象中並且開啟事件循環
loop.run_until_complete(asyncio.wait(stasks))
- 源文來自於https://www.jb51.net/article/163537.htm