前言
慣例練習歷史實驗,在編寫tcp
數據流粘包實驗的時候,發現一個奇怪的現象。當遠程執行的命令返回結果很短的時候可以正常執行,但返回結果很長時,就會發生json
解碼錯誤,故將排錯和解決方法記錄下來。
一個粘包實驗
服務端(用函數):
import socket
import json
import struct
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor
def init_socket():
addr = ('127.0.0.1', 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.listen(5)
print('start listening...')
return server
def handle(request):
command = request.decode('utf-8')
obj = subprocess.Popen(command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result = obj.stdout.read() + obj.stderr.read()
# 如果是win還需要轉換編碼
if sys.platform == 'win32':
result = result.decode('gbk').encode('utf-8')
return result
def build_header(data_len):
dic = {
'cmd_type': 'shell',
'data_len': data_len,
}
return json.dumps(dic).encode('utf-8')
def send(conn, response):
data_len = len(response)
header = build_header(data_len)
header_len = len(header)
struct_bytes = struct.pack('i', header_len)
# 粘包發送
conn.send(struct_bytes)
conn.send(header)
conn.send(response)
def task(conn):
try:
while True: # 消息循環
request = conn.recv(1024)
if not request:
# 鏈接失效
raise ConnectionResetError
response = handle(request)
send(conn, response)
except ConnectionResetError:
msg = f'鏈接-{conn.getpeername()}失效'
conn.close()
return msg
def show_res(future):
result = future.result()
print(result)
if __name__ == '__main__':
max_thread = 5
futures = []
server = init_socket()
with ThreadPoolExecutor(max_thread) as pool:
while True: # 鏈接循環
conn, addr = server.accept()
print(f'一個客戶端上線{addr}')
future = pool.submit(task, conn)
future.add_done_callback(show_res)
futures.append(future)
客戶端(用類):
import socket
import struct
import time
import json
class Client(object):
addr = ('127.0.0.1', 8080)
def __init__(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect(self.addr)
print('連接上服務器')
def get_request(self):
while True:
request = input('>>>').strip()
if not request:
continue
return request
def recv(self):
# 拆包接收
struct_bytes = self.socket.recv(4)
header_len = struct.unpack('i', struct_bytes)[0]
header_bytes = self.socket.recv(header_len)
header = json.loads(header_bytes.decode('utf-8'))
data_len = header['data_len']
gap_abs = data_len % 1024
count = data_len // 1024
recv_data = b''
for i in range(count):
data = self.socket.recv(1024)
recv_data += data
recv_data += self.socket.recv(gap_abs)
print('recv data len is:', len(recv_data))
return recv_data
def run(self):
while True: # 消息循環
request = self.get_request()
self.socket.send(request.encode('utf-8'))
response = self.recv()
print(response.decode('utf-8'))
if __name__ == '__main__':
client = Client()
client.run()
執行結果
在執行dir/ipconfig
等命令時可以正常獲取結果,但是在執行tasklist
命令時,發現沒有獲取完整的執行結果,而且下一條命令將發生報錯:
Traceback (most recent call last):
File "F:/projects/hello/world.py", line 62, in <module>
client.run()
File "F:/projects/hello/world.py", line 57, in run
response = self.recv()
File "F:/projects/hello/world.py", line 35, in recv
header = json.loads(header_bytes.decode('utf-8'))
File "C:\Users\zouliwei\AppData\Local\Programs\Python\Python36\lib\json\__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "C:\Users\zouliwei\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "C:\Users\zouliwei\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
排錯思路
1、錯誤明確指示是json
的解碼發生了錯誤,解碼錯誤應該是來自於解碼的數據編碼不正確或者讀取的數據不完整。
2、發生錯誤的函數在客戶端,錯誤在第6
行,摘出如下:
def recv(self):
# 拆包接收
struct_bytes = self.socket.recv(4)
header_len = struct.unpack('i', struct_bytes)[0]
header_bytes = self.socket.recv(header_len)
header = json.loads(header_bytes.decode('utf-8')) # 此行發生錯誤
data_len = header['data_len']
gap_abs = data_len % 1024
count = data_len // 1024
recv_data = b''
for i in range(count):
data = self.socket.recv(1024)
recv_data += data
recv_data += self.socket.recv(gap_abs)
print('recv data len is:', len(recv_data))
return recv_data
3、繼續思考,第6
行嘗試對接收到的頭部二進制數據進行json
解碼,而頭部二進制在服務器是通過UTF-8
編碼的,查看服務器端編碼代碼發現沒有錯誤,所以編碼錯誤被排除。剩下的應該就是接收的數據不完整問題。
4、按理說,通過struct
和header
來控制每一次讀取的字節流可以保證每次收取的時候是准確完整的收取一個消息的數據,但是這里卻發生了錯誤,我通過在下方的for
函數增加print
看一下依次循環讀取時的長度數據:
for i in range(count):
data = self.socket.recv(1024)
print('recv接收的長度是:', len(data)) # 增加此行查看每次循環讀取的長度是多少,按理應該是1024
recv_data += data
結果令我意外:
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 400 # 錯誤
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 400 # 錯誤
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 400 # 錯誤
recv接收的長度是: 1024
recv接收的長度是: 1024
recv data len is: 14121
按照邏輯,每一次循環應該都收取1024
字節,卻發現有3
次收取並不完整(每次執行時錯誤不完全一樣,但是都會發生錯誤),這就是導致最終數據不完整的原因。
因為執行tasklist
返回的結果很長,導致接收數據不完整,於是下一條執行命令就發生了粘包,json
解碼的數據就不是一個正常的數據,故報錯。
解決和總結
1、之所以會發生這種情況,我猜測應該是recv
函數的接收機制原因,recv
函數一旦被調用,就會嘗試獲取緩沖中的數據,只要有數據,就會直接返回,如果緩沖中的數據大於1024
,最多返回1024
字節,不過如果緩沖只有400
,也只會返回400
,這是recv
函數的讀取機制。
2、當客戶端需要讀取大量數據(執行tasklist
命令的返回就達到1w
字節以上)時,需要多次recv
,每一次recv
時,客戶端並不能保證緩沖中的數據量已經達到1024
字節(這可能有服務器和客戶端發送和接收速度不適配的問題),有可能某次緩沖只有400
字節,但是recv
依然讀取並返回。
3、最初嘗試解決的方法是,在recv
之前增加time.sleep(0.1)
來使得每次recv
之前都有一個充足的時間來等待緩沖區的數據大於1024
,此方法可以解決問題,不過這方法不是很好,因為如果服務器在遠程,就很難控制sleep
的秒數,因為你不知道網絡IO
會發生多長時間,一旦sleep
時間過長,就會長期阻塞線程浪費cpu
時間。
4、查看recv
函數源碼,發現是c
寫的,不過recv
的接口好像除了size
之外,還有一個flag
參數。翻看《python參考手冊》
查找recv
函數的說明,recv
函數的flag
參數可以有一個選項是:MSG_WAITALL
,書上說,這表示在接收的時候,函數一定會等待接收到指定size
之后才會返回。
5、最終使用如下方法解決:
for i in range(count):
# time.sleep(0.1)
data = self.socket.recv(1024, socket.MSG_WAITALL)
print('recv接收的長度是:', len(data))
recv_data += data
接收結果:
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv接收的長度是: 1024
recv data len is: 16039
6、以后應該還會學習到更好的解決方法,努力學習。