python的socket.recv函數陷阱


前言

慣例練習歷史實驗,在編寫tcp數據流粘包實驗的時候,發現一個奇怪的現象。當遠程執行的命令返回結果很短的時候可以正常執行,但返回結果很長時,就會發生json解碼錯誤,故將排錯和解決方法記錄下來。

一個粘包實驗

服務端(用函數):

import socket
import json
import struct
import subprocess
import sys

from concurrent.futures import ThreadPoolExecutor

def init_socket():
    addr = ('127.0.0.1', 8080)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(addr)
    server.listen(5)
    print('start listening...')
    return server


def handle(request):
    command = request.decode('utf-8')
    obj = subprocess.Popen(command,
                           shell=True,
                           stdout=subprocess.PIPE,
                           stderr=subprocess.PIPE)
    result = obj.stdout.read() + obj.stderr.read()
    # 如果是win還需要轉換編碼
    if sys.platform == 'win32':
        result = result.decode('gbk').encode('utf-8')
    return result


def build_header(data_len):
    dic = {
        'cmd_type': 'shell',
        'data_len': data_len,
    }
    return json.dumps(dic).encode('utf-8')


def send(conn, response):
    data_len = len(response)
    header = build_header(data_len)
    header_len = len(header)
    struct_bytes = struct.pack('i', header_len)

    # 粘包發送
    conn.send(struct_bytes)
    conn.send(header)
    conn.send(response)


def task(conn):
    try:
        while True:  # 消息循環
            request = conn.recv(1024)
            if not request:
                # 鏈接失效
                raise ConnectionResetError

            response = handle(request)
            send(conn, response)

    except ConnectionResetError:
        msg = f'鏈接-{conn.getpeername()}失效'
        conn.close()
        return msg


def show_res(future):
    result = future.result()
    print(result)


if __name__ == '__main__':
    max_thread = 5
    futures = []
    server = init_socket()

    with ThreadPoolExecutor(max_thread) as pool:
        while True:  # 鏈接循環
            conn, addr = server.accept()
            print(f'一個客戶端上線{addr}')

            future = pool.submit(task, conn)
            future.add_done_callback(show_res)
            futures.append(future)

客戶端(用類):

import socket
import struct
import time
import json

class Client(object):
    addr = ('127.0.0.1', 8080)

    def __init__(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect(self.addr)
        print('連接上服務器')

    def get_request(self):
        while True:
            request = input('>>>').strip()
            if not request:
                continue

            return request

    def recv(self):
        # 拆包接收
        struct_bytes = self.socket.recv(4)
        header_len = struct.unpack('i', struct_bytes)[0]
        header_bytes = self.socket.recv(header_len)
        header = json.loads(header_bytes.decode('utf-8'))
        data_len = header['data_len']

        gap_abs = data_len % 1024
        count = data_len // 1024
        recv_data = b''

        for i in range(count):
            data = self.socket.recv(1024)
            recv_data += data
        recv_data += self.socket.recv(gap_abs)

        print('recv data len is:', len(recv_data))
        return recv_data

    def run(self):
        while True:  # 消息循環
            request = self.get_request()
            self.socket.send(request.encode('utf-8'))
            response = self.recv()
            print(response.decode('utf-8'))


if __name__ == '__main__':
    client = Client()
    client.run()

執行結果

在執行dir/ipconfig等命令時可以正常獲取結果,但是在執行tasklist命令時,發現沒有獲取完整的執行結果,而且下一條命令將發生報錯:

Traceback (most recent call last):
  File "F:/projects/hello/world.py", line 62, in <module>
    client.run()
  File "F:/projects/hello/world.py", line 57, in run
    response = self.recv()
  File "F:/projects/hello/world.py", line 35, in recv
    header = json.loads(header_bytes.decode('utf-8'))
  File "C:\Users\zouliwei\AppData\Local\Programs\Python\Python36\lib\json\__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "C:\Users\zouliwei\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "C:\Users\zouliwei\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

排錯思路

1、錯誤明確指示是json的解碼發生了錯誤,解碼錯誤應該是來自於解碼的數據編碼不正確或者讀取的數據不完整
2、發生錯誤的函數在客戶端,錯誤在第6行,摘出如下:

 def recv(self):
        # 拆包接收
        struct_bytes = self.socket.recv(4)
        header_len = struct.unpack('i', struct_bytes)[0]
        header_bytes = self.socket.recv(header_len)
        header = json.loads(header_bytes.decode('utf-8'))  # 此行發生錯誤
        data_len = header['data_len']

        gap_abs = data_len % 1024
        count = data_len // 1024
        recv_data = b''

        for i in range(count):
            data = self.socket.recv(1024)
            recv_data += data
        recv_data += self.socket.recv(gap_abs)

        print('recv data len is:', len(recv_data))
        return recv_data

3、繼續思考,第6行嘗試對接收到的頭部二進制數據進行json解碼,而頭部二進制在服務器是通過UTF-8編碼的,查看服務器端編碼代碼發現沒有錯誤,所以編碼錯誤被排除。剩下的應該就是接收的數據不完整問題。
4、按理說,通過structheader來控制每一次讀取的字節流可以保證每次收取的時候是准確完整的收取一個消息的數據,但是這里卻發生了錯誤,我通過在下方的for函數增加print看一下依次循環讀取時的長度數據:

for i in range(count):
    data = self.socket.recv(1024)
    print('recv接收的長度是:', len(data))  # 增加此行查看每次循環讀取的長度是多少,按理應該是1024
    recv_data += data

結果令我意外:

recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 400  # 錯誤
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 400  # 錯誤
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 400  # 錯誤
recv接收的長度是: 1024
recv接收的長度是: 1024
recv data len is: 14121

按照邏輯,每一次循環應該都收取1024字節,卻發現有3次收取並不完整(每次執行時錯誤不完全一樣,但是都會發生錯誤),這就是導致最終數據不完整的原因。
因為執行tasklist返回的結果很長,導致接收數據不完整,於是下一條執行命令就發生了粘包,json解碼的數據就不是一個正常的數據,故報錯。

解決和總結

1、之所以會發生這種情況,我猜測應該是recv函數的接收機制原因,recv函數一旦被調用,就會嘗試獲取緩沖中的數據,只要有數據,就會直接返回,如果緩沖中的數據大於1024,最多返回1024字節,不過如果緩沖只有400,也只會返回400,這是recv函數的讀取機制。

2、當客戶端需要讀取大量數據(執行tasklist命令的返回就達到1w字節以上)時,需要多次recv,每一次recv時,客戶端並不能保證緩沖中的數據量已經達到1024字節(這可能有服務器和客戶端發送和接收速度不適配的問題),有可能某次緩沖只有400字節,但是recv依然讀取並返回。

3、最初嘗試解決的方法是,在recv之前增加time.sleep(0.1)來使得每次recv之前都有一個充足的時間來等待緩沖區的數據大於1024,此方法可以解決問題,不過這方法不是很好,因為如果服務器在遠程,就很難控制sleep的秒數,因為你不知道網絡IO會發生多長時間,一旦sleep時間過長,就會長期阻塞線程浪費cpu時間。

4、查看recv函數源碼,發現是c寫的,不過recv的接口好像除了size之外,還有一個flag參數。翻看《python參考手冊》查找recv函數的說明,recv函數的flag參數可以有一個選項是:MSG_WAITALL,書上說,這表示在接收的時候,函數一定會等待接收到指定size之后才會返回。

5、最終使用如下方法解決:

for i in range(count):
    # time.sleep(0.1)
    data = self.socket.recv(1024, socket.MSG_WAITALL)
    print('recv接收的長度是:', len(data))
    recv_data += data

接收結果:

recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv data len is: 16039

6、以后應該還會學習到更好的解決方法,努力學習。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM