本文參考Python官方文檔:https://docs.python.org/zh-cn/3.8/library/asyncio-stream.html
本文參考Python官方文檔針對官方文檔示例進行解析,解析不完整只為了便於理解
流
流是用於處理網絡連接的高級async/await-ready原語。流允許發送和接收數據,而不需要使用回調或低級協議和傳輸。
Stream函數
下面的高級 asyncio 函數可以用來創建和處理流:coroutine asyncio.
open_connection
(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)
-
建立網絡連接並返回一對
(reader, writer)
對象。返回的 reader 和 writer 對象是
StreamReader
和StreamWriter
類的實例。注意:使用ayncio.open_connection()方法創建和處理流時只有在await時才返回reader和writer對象
為了方便測試我們在本地搭建一個nginx服務器,首頁index.html內容為“Hello World”
示例:
import asyncio async def wget(host): connect = asyncio.open_connection(host,80) print(type(connect)) reader,writer = await connect print(type(reader),type(writer)) async def main(): # 獲取表頭主機列表 hosts = ['192.168.1.100'] # 根據主機列表獲取一個tasks列表 tasks = [asyncio.create_task(wget(host)) for host in hosts] # 等待任務列表執行結果 await asyncio.gather(*tasks) # 運行 asyncio.run(main()
運行輸出如下
<class 'coroutine'> <class 'asyncio.streams.StreamReader'> <class 'asyncio.streams.StreamWriter'> Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000236AA956F70> Traceback (most recent call last): File "C:\ProgramData\Anaconda3\lib\asyncio\proactor_events.py", line 116, in __del__ self.close() File "C:\ProgramData\Anaconda3\lib\asyncio\proactor_events.py", line 108, in close self._loop.call_soon(self._call_connection_lost, None) File "C:\ProgramData\Anaconda3\lib\asyncio\base_events.py", line 719, in call_soon self._check_closed() File "C:\ProgramData\Anaconda3\lib\asyncio\base_events.py", line 508, in _check_closed raise RuntimeError('Event loop is closed') RuntimeError: Event loop is closed
解析:運行報錯是因為獲取了writer對象但是並沒有寫數據
打印類型可以看到
完善代碼使用writer對象發送請求至服務器,然后reader對象就可以收取服務器發送過來的數據
# asyncio.open_connection創建數據流 start import asyncio async def wget(host): connect = asyncio.open_connection(host,80) print(type(connect)) reader,writer = await connect print(type(reader),type(writer)) # 定義請求頭部,格式是固定格式 header = 'GET / HTTP/1.0\r\n Host:{0}\r\n\r\n'.format(host) # 通過writer對象往http服務器發送請求,請求是二進制格式的需要使用encode()方法編碼 writer.write(header.encode('utf-8')) # writer.write方法需要與drain()方法一起使用 await writer.drain() # 阻塞獲取服務器發送過來的所有數據,read()方法一次性獲取所有數據,數據多可以使用readline()方法一行行獲取 data = await reader.read() # 打印獲取的數據,獲取數據為二進制格式不加decode()解碼則打印原始數據 print(data.decode()) # 關閉writer需要和writer.wait_closed()一起使用,這里可以省略 writer.close() await writer.wait_closed() async def main(): hosts = ['192.168.1.100'] tasks = [asyncio.create_task(wget(host)) for host in hosts] await asyncio.gather(*tasks) asyncio.run(main()) # asyncio.open_connection創建數據流 end
輸出如下
<class 'coroutine'> <class 'asyncio.streams.StreamReader'> <class 'asyncio.streams.StreamWriter'> HTTP/1.1 200 OK Server: nginx/1.14.0 Date: Sat, 30 Oct 2021 09:37:17 GMT Content-Type: text/html Content-Length: 12 Last-Modified: Fri, 29 Oct 2021 07:41:00 GMT Connection: close ETag: "617ba58c-c" Accept-Ranges: bytes Hello World
本次代碼演示了連接http服務器並且向服務器發送一個GET請求,服務器收到GET請求以后把數據返回給客戶,然后通過reader對象獲取到服務器發送過來的數據。
注意:本次發送的是一個GET請求,格式是固定的
本次發送的完整數據為
'GET / HTTP/1.0\r\n Host:192.168.1.100\r\n\r\n'
對應關系如下圖
拆分解析如下
GET / HTTP/1.0\r\n Host:192.168.1.100\r\n\r\n GET #請求方法為GET # 空格 / # 請求URL為/即根目錄 HTTP/1.0 # 協議版本 \r\n # 回車符和換行符 Host # 頭部字段為Host : # 固定格式的符號: 192.168.1.100 # Host的值,即本次請求的主機值 \r\n # 請求頭部的回車符和換行符 \r\n # 最后的回車符和換行符
使用流的TCP回顯客戶端和服務器
使用流的TCP回顯客戶端
tcp_stream_client.py
import asyncio # 回顯客戶端協程函數,傳遞參數message發送給服務器端,服務器端接收信息原樣返回 async def tcp_echo_client(message): # 創建reader,writer對象分別用於接收和發送信息 reader,writer = await asyncio.open_connection('127.0.0.1',8888) print(f'Send:{message!r}') # 往服務器端寫信息,需要編碼后發送 writer.write(message.encode()) # await writer.drain() # 從服務器端讀取信息讀取100個字節 data = await reader.read(100) # 打印解碼后的信息 print(f'Received:{data.decode()!r}') print('Close the connection') # 關閉 writer.close() # await writer.wait_closed() asyncio.run(tcp_echo_client('Hello World!'))
注意:這里沒有使用writer.drain()和writer.wait_closed()也可以
使用流的TCP回顯服務器端
tcp_stream_server.py
import asyncio # 啟動服務后當客戶端建立新連接時調用該函數 # 接受參數為reader,writer # reader是類StreamReader的實例,而writer是類StreamWriter的實例 # 即客戶端和服務器端的reader和writer是一一對應的,分別用於接收對方數據流和往對方發送數據流 async def handle_echo(reader, writer): # 服務器從客戶端讀取信息 # 即客戶端通過writer往服務器寫的信息 data = await reader.read(100) # 信息解碼 message = data.decode() # 該方法獲取客戶端的ip地址信息 addr = writer.get_extra_info('peername') print(f"Received {message!r} from {addr!r}") print(f"Send: {message!r}") # 服務器端把從客戶端讀取的信息又發送給客戶端 writer.write(data) await writer.drain() # 關閉連接 print("Close the connection") writer.close() async def main(): # start_server()方法啟動套接字服務,返回一個server對象 # 當一個新的客戶端連接被建立時,回調函數會被調用。該函數會接收到一對參數(reader,writer) # reader是類StreamReader的實例,而writer是類StreamWriter的實例 # client_connected_cb 即可以是普通的可調用對象也可以是一個 協程函數; 如果它是一個協程函數,它將自動作為 Task 被調度。 server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888) # 以下方法可以獲取啟動的ip地址和端口信息返回一個元組其實即使start_server方法傳遞的ip和端口信息('127.0.0.1',8888) addr = server.sockets[0].getsockname() print(f'Serving on {addr}') # 啟動服務端 # Server對象是異步上下文管理器。當用於async with語句時,異步上下文管理器可以確保Server對象被關閉 # 並且在async with完成后不接受新的連接。 async with server: # server_forver()方法 # 開始接受連接,直到協程被取消。server_forever任務的取消將導致服務器被關閉 await server.serve_forever() asyncio.run(main())
打開兩個窗口,先啟動服務器端
服務器端開啟了8888端口等待客戶端連接
運行客戶端
運行客戶端的時候客戶端和服務器端建立了連接才啟動協程函數handle_echo
服務器端接收的信息為