Python之數據流(stream)


  本文參考Python官方文檔:https://docs.python.org/zh-cn/3.8/library/asyncio-stream.html

  本文參考Python官方文檔針對官方文檔示例進行解析,解析不完整只為了便於理解

  流

  流是用於處理網絡連接的高級async/await-ready原語。流允許發送和接收數據,而不需要使用回調或低級協議和傳輸。

  Stream函數

  下面的高級 asyncio 函數可以用來創建和處理流:coroutine asyncio.open_connection(host=Noneport=None*loop=Nonelimit=Nonessl=Nonefamily=0proto=0flags=0sock=Nonelocal_addr=Noneserver_hostname=Nonessl_handshake_timeout=None)

建立網絡連接並返回一對 (reader, writer) 對象。

返回的 reader 和 writer 對象是 StreamReader 和 StreamWriter 類的實例。

注意:使用ayncio.open_connection()方法創建和處理流時只有在await時才返回reader和writer對象

為了方便測試我們在本地搭建一個nginx服務器,首頁index.html內容為“Hello World”

 

 示例:

import asyncio
async def wget(host):
    connect = asyncio.open_connection(host,80)
    print(type(connect))
    reader,writer = await connect
    print(type(reader),type(writer))
       
async def main():
    # 獲取表頭主機列表
    hosts = ['192.168.1.100']
   # 根據主機列表獲取一個tasks列表
    tasks = [asyncio.create_task(wget(host)) for host in hosts]
    # 等待任務列表執行結果
    await asyncio.gather(*tasks)

# 運行
asyncio.run(main() 

  

運行輸出如下

<class 'coroutine'>
<class 'asyncio.streams.StreamReader'> <class 'asyncio.streams.StreamWriter'>
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000236AA956F70>
Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\lib\asyncio\proactor_events.py", line 116, in __del__
    self.close()
  File "C:\ProgramData\Anaconda3\lib\asyncio\proactor_events.py", line 108, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "C:\ProgramData\Anaconda3\lib\asyncio\base_events.py", line 719, in call_soon
    self._check_closed()
  File "C:\ProgramData\Anaconda3\lib\asyncio\base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

  

解析:運行報錯是因為獲取了writer對象但是並沒有寫數據

打印類型可以看到

 

 完善代碼使用writer對象發送請求至服務器,然后reader對象就可以收取服務器發送過來的數據

# asyncio.open_connection創建數據流 start
import asyncio
async def wget(host):
    connect = asyncio.open_connection(host,80)
    print(type(connect))
    reader,writer = await connect
    print(type(reader),type(writer))
    # 定義請求頭部,格式是固定格式
    header = 'GET / HTTP/1.0\r\n Host:{0}\r\n\r\n'.format(host)
    # 通過writer對象往http服務器發送請求,請求是二進制格式的需要使用encode()方法編碼
    writer.write(header.encode('utf-8'))
    # writer.write方法需要與drain()方法一起使用
    await writer.drain()
    # 阻塞獲取服務器發送過來的所有數據,read()方法一次性獲取所有數據,數據多可以使用readline()方法一行行獲取
    data = await reader.read()
    # 打印獲取的數據,獲取數據為二進制格式不加decode()解碼則打印原始數據
    print(data.decode())
    # 關閉writer需要和writer.wait_closed()一起使用,這里可以省略
    writer.close()
    await writer.wait_closed()
       
async def main():
    hosts = ['192.168.1.100']
    tasks = [asyncio.create_task(wget(host)) for host in hosts]
    await asyncio.gather(*tasks)

asyncio.run(main())
# asyncio.open_connection創建數據流 end

  輸出如下

<class 'coroutine'>
<class 'asyncio.streams.StreamReader'> <class 'asyncio.streams.StreamWriter'>
HTTP/1.1 200 OK
Server: nginx/1.14.0
Date: Sat, 30 Oct 2021 09:37:17 GMT
Content-Type: text/html
Content-Length: 12
Last-Modified: Fri, 29 Oct 2021 07:41:00 GMT
Connection: close
ETag: "617ba58c-c"
Accept-Ranges: bytes

Hello World

  本次代碼演示了連接http服務器並且向服務器發送一個GET請求,服務器收到GET請求以后把數據返回給客戶,然后通過reader對象獲取到服務器發送過來的數據。

  注意:本次發送的是一個GET請求,格式是固定的

 

 本次發送的完整數據為

'GET / HTTP/1.0\r\n Host:192.168.1.100\r\n\r\n'

  對應關系如下圖

 

 拆分解析如下

GET / HTTP/1.0\r\n Host:192.168.1.100\r\n\r\n
GET #請求方法為GET 
 # 空格
/ # 請求URL為/即根目錄
HTTP/1.0 # 協議版本
\r\n # 回車符和換行符
Host # 頭部字段為Host
: # 固定格式的符號:
192.168.1.100 # Host的值,即本次請求的主機值
\r\n # 請求頭部的回車符和換行符
\r\n # 最后的回車符和換行符

  使用流的TCP回顯客戶端和服務器

使用流的TCP回顯客戶端

tcp_stream_client.py

import asyncio

# 回顯客戶端協程函數,傳遞參數message發送給服務器端,服務器端接收信息原樣返回
async def tcp_echo_client(message):
    # 創建reader,writer對象分別用於接收和發送信息
    reader,writer = await asyncio.open_connection('127.0.0.1',8888)
    print(f'Send:{message!r}')
    # 往服務器端寫信息,需要編碼后發送
    writer.write(message.encode())
    # await writer.drain()
    # 從服務器端讀取信息讀取100個字節
    data = await reader.read(100)
    # 打印解碼后的信息
    print(f'Received:{data.decode()!r}')
    print('Close the connection')
    # 關閉
    writer.close()
    # await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

  注意:這里沒有使用writer.drain()和writer.wait_closed()也可以

  

使用流的TCP回顯服務器端

  tcp_stream_server.py

import asyncio

# 啟動服務后當客戶端建立新連接時調用該函數
# 接受參數為reader,writer
# reader是類StreamReader的實例,而writer是類StreamWriter的實例
# 即客戶端和服務器端的reader和writer是一一對應的,分別用於接收對方數據流和往對方發送數據流
async def handle_echo(reader, writer):
    # 服務器從客戶端讀取信息
    # 即客戶端通過writer往服務器寫的信息
    data = await reader.read(100)
    # 信息解碼
    message = data.decode()
    # 該方法獲取客戶端的ip地址信息
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    # 服務器端把從客戶端讀取的信息又發送給客戶端
    writer.write(data)
    await writer.drain()
    # 關閉連接
    print("Close the connection")
    writer.close()

async def main():
    # start_server()方法啟動套接字服務,返回一個server對象
    # 當一個新的客戶端連接被建立時,回調函數會被調用。該函數會接收到一對參數(reader,writer)
    # reader是類StreamReader的實例,而writer是類StreamWriter的實例
    # client_connected_cb 即可以是普通的可調用對象也可以是一個 協程函數; 如果它是一個協程函數,它將自動作為 Task 被調度。
    server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
    # 以下方法可以獲取啟動的ip地址和端口信息返回一個元組其實即使start_server方法傳遞的ip和端口信息('127.0.0.1',8888)
    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')
    # 啟動服務端
    # Server對象是異步上下文管理器。當用於async with語句時,異步上下文管理器可以確保Server對象被關閉
    # 並且在async with完成后不接受新的連接。
    async with server:
        # server_forver()方法
        # 開始接受連接,直到協程被取消。server_forever任務的取消將導致服務器被關閉
        await server.serve_forever()

asyncio.run(main())

  

  打開兩個窗口,先啟動服務器端

 

 

 服務器端開啟了8888端口等待客戶端連接

運行客戶端

運行客戶端的時候客戶端和服務器端建立了連接才啟動協程函數handle_echo

 

 服務器端接收的信息為

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM