前言
在本章節中,我們將探討TCP協議基於流式傳輸的最大一個問題,即粘包問題。本章主要介紹TCP粘包的原理與其三種解決粘包的方案。並且還會介紹為什么UDP協議不會產生粘包。
基於TCP協議的socket實現遠程命令輸入
我們准備做一個可以在Client端遠程執行Server端shell
命令並拿到其執行結果的程序,而涉及到網絡通信就必然會出現socket
模塊,關於如何抉擇傳輸層協議的選擇?我們選擇使用TCP協議,因為它是可靠傳輸協議且數據量支持比UDP協議要大
Server端代碼如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基於TCP協議的socket實現遠程命令輸入之Server ==== import subprocess from socket import * server = socket(AF_INET, SOCK_STREAM) server.bind(("0.0.0.0",6666)) # 放在遠程填入0.0.0.0,放在本地填入127.0.0.1 server.listen(5) while 1: # 鏈接循環 conn,client_addr = server.accept() while 1: # 通信循環 try: # 防止Windows平台下Client端異常關閉導致雙向鏈接崩塌Server端異常的情況發生 cmd = conn.recv(1024) if not cmd: # 防止類Unix平台下Client端異常關閉導致雙向鏈接崩塌Server端異常的情況發生 break res = subprocess.Popen(cmd.decode("utf-8"), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,) stdout_res = res.stdout.read() # 正確結果 stderr_res = res.stderr.read() # 錯誤結果 # subprocess模塊拿到的是bytes類型,所以直接發送即可 cmd_res = stdout_res if stdout_res else stderr_res # 因為兩個結果只有一個有信息,所以我們只拿到有結果的那個 conn.send(cmd_res) except Exception: break conn.close() # 由於client端鏈接異常,故關閉鏈接循環
Client端代碼如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基於TCP協議的socket實現遠程命令輸入之Client ==== from socket import * client = socket(AF_INET,SOCK_STREAM) client.connect(("xxx.xxx.xxx.xxx",6666)) # 填入Server端公網IP while 1: cmd = input("請輸入命令>>>:").strip() if not cmd: continue if cmd == "quit": break client.send(cmd.encode("utf-8")) cmd_res = client.recv(1024) # 本次接收1024字節數據 print(cmd_res.decode("utf-8")) # 如果Server端是Windows則用gbk解碼,類Unix用utf-8解碼 client.close()
測試結果:
粘包問題及其原理
上面的測試一切看起來都非常完美,但是是有一個BUG的。當我們如果讀取一條非常長的命令實際上是會出問題的,比如:
這種現象被稱之為粘包,那么為何會產生這樣的現象呢?
這是由於
recv()
沒有一次性讀取完整個內核緩沖區的內容導致的。其實歸根結底還是怪TCP是字節流方式傳輸數據。
我們來解析一下這種現象產生的原因:
由於我們的recv()
只是按照固定的1024去讀取數據,那么一旦整體內核緩沖區中所存儲的整體數據大於1024,就會產生粘包現象。所謂粘包問題主要還是因為接收方不知道消息之間的界限,不知道一次性提取多少字節的數據所造成的。
這里我還畫了一幅圖,可以方便讀者理解:
那么我們可以通過不斷的增大recv()
中的讀取范圍來解決這個問題嗎?就像對應上圖中的,一次性把快遞櫃包裹全取完,答案是不可以!你再大你也不可能大過內核緩沖區,這個東西都是有一個一定的閾值。一旦超出了這個閾值就會引發異常或者干脆無效。那么有什么好的辦法呢?哈,下面會教給你一些解決辦法的。不過在此之前我們要先看一個TCP協議特有的Nagle
算法。
Nagle算法與粘包
基於TCP協議的socket通信有一個特點,即:一方的
send()
與另一方的recv()
可以沒有任何關系,即:一方send()
三次,另一方recv()
一次就可以將數據全部取出來。
TCP協議的發送方有一個特征。他會進行組包,如果一次發送的數據量很小,比如第一次發送10個字節,第二次發生2個字節,第三次發生3個字節。他可能會將這15個字節湊到一塊發送出去,這是采用了Nagle
算法來進行的,這么做有一個弊端就是接收方想要將這個大的數據包按照發送方的發送次數精確無誤的接收拆分成10 2 3必須要有發送方提供的拆包機制才行。
如下圖組所示
發送方:
from socket import * ip_port = ("127.0.0.1",12306) buffer_size = 1024 back_log = 5 server = socket(AF_INET,SOCK_STREAM) server.bind(ip_port) server.listen(back_log) conn,addr = server.accept() conn.send("hello,".encode("utf-8")) # 第一次發送是6Bytes的數據 conn.send("world,".encode("utf-8")) # 第二次也是6Bytes的數據 conn.send("yunyaGG!!".encode("utf-8")) # 第三次是9Bytes的數據
接收方:
from socket import * ip_port = ("127.0.0.1",12306) buffer_size = 1024 client = socket(AF_INET,SOCK_STREAM) client.connect(ip_port) data_1 = client.recv(buffer_size) # 我們讀取數據時統一用設定的 buffer_size 來讀取 print("這是第一次的數據包:",data_1.decode("utf-8")) data_2 = client.recv(buffer_size) print("這是第二次的數據包:",data_2.decode("utf-8")) data_3 = client.recv(buffer_size) print("這是第三次的數據包:",data_3.decode("utf-8"))
接收結果:
# ==== 執行結果 ==== """ 這是第一次的數據包: hello, 這是第二次的數據包: world,yunyaGG!! 這是第三次的數據包: """
和預想的有點不太一樣哈,居然把第二次和第三次組成了一個大的數據包發送過來了。這就是Nagle
算法,這樣的組包策略很容易就會產生粘包。我不知道你是以什么樣的方式發過來的,所以我recv()
就只能按照自己設定的方式去接收。
現在思考一下粘包的思路,我們的發送方需要將切分解包的規則告訴給接收方。
我們嘗試改一下每一次的buffer_size
接收大小:
接收方:
from socket import * ip_port = ("127.0.0.1",12306) buffer_size = 1024 client = socket(AF_INET,SOCK_STREAM) client.connect(ip_port) data_1 = client.recv(6) # 我們手動的按照對方發送時的規則來進行拆包 print("這是第一次的數據包:",data_1.decode("utf-8")) data_2 = client.recv(6) print("這是第二次的數據包:",data_2.decode("utf-8")) data_3 = client.recv(9) print("這是第三次的數據包:",data_3.decode("utf-8"))
接收結果:
# ==== 執行結果 ==== """ 這是第一次的數據包: hello, 這是第二次的數據包: world, 這是第三次的數據包: yunyaGG!! """
粘包被我們手動的計算字節數來精確的分割數據接受量的大小給解決了,但是這樣做是不現實的..我們不可能知道對方發送的數據到底是怎么樣的,更不用說手動計算。所以有沒有更好的解決方案呢?
解決方案1:預先發送消息長度
好了,其實上面關於解決粘包的思路已經出來了。我們需要做的就是讓接收方知道本次發送內容的大小,接收方才能夠精確的將所有數據全部提取出來不產生遺漏。其實實現方式很簡單,可以嘗試以下思路:
1.發送方發送一個此次數據固定的長度
2.接收方接收到該數據長度並且回應
3.發送方收到回應並且發送真正的數據
4.接收方不斷的用默認的
buffer_size
值接收新的數據並存儲起來直到超出整個數據的長度,代表此處數據全部接收完畢
Server端:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基於TCP協議的socket實現遠程命令輸入之Server ==== import subprocess from socket import * server = socket(AF_INET, SOCK_STREAM) server.bind(("0.0.0.0", 6666)) # 放在遠程填入0.0.0.0 放在本地測試填入127.0.0.1 server.listen(5) while 1: # 鏈接循環 conn, client_addr = server.accept() while 1: # 通信循環 try: # 防止Windows平台下Client端異常關閉導致雙向鏈接崩塌Server端異常的情況發生 cmd = conn.recv(1024) if not cmd: # 防止類Unix平台下Client端異常關閉導致雙向鏈接崩塌Server端異常的情況發生 break res = subprocess.Popen(cmd.decode("utf-8"), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout_res = res.stdout.read() # 正確結果 stderr_res = res.stderr.read() # 錯誤結果 # subprocess模塊拿到的是bytes類型,所以直接發送即可 cmd_res = stdout_res if stdout_res else stderr_res # 因為兩個結果只有一個有信息,所以我們只拿到有結果的那個 msg_length = len(cmd_res) # 本次數據的長度 conn.send(str(msg_length).encode("utf-8")) # 先將要發的整體內容長度發送過去 if conn.recv(1024) == b"ready": # 如果接收方回應了ready則開始發送真正的數據體 conn.send(cmd_res) except Exception: break conn.close() # 由於client端鏈接異常,故關閉鏈接循環
Client端:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基於TCP協議的socket實現遠程命令輸入之Client ==== from socket import * client = socket(AF_INET, SOCK_STREAM) client.connect(("xxx.xxx.xxx.xxx", 6666)) # 填入Server端公網IP while 1: cmd = input("請輸入命令>>>:").strip() if not cmd: continue if cmd == "quit": break client.send(cmd.encode("utf-8")) msg_length = int(client.recv(1024).decode("utf-8")) # 接收到此次發送內容的整體長度 recv_length = 0 # 代表已接收的內容長度 cmd_res = b"" client.send(b"ready") # 發送給Server端,代表自己已經接收到此次內容長度,可以發送真正的數據啦 while recv_length < msg_length: cmd_res += client.recv(1024) # 本次接收1024字節數據,可能是一小節數據 recv_length += len(cmd_res) # 添加上本次讀取的長度,當全部讀取完后應該 recv_length == msg_length else: print(cmd_res.decode("utf-8")) # 如果Server端是Windows則用gbk解碼,類Unix用utf-8解碼 client.close()
結果如下:
解決方案2:json+struct方案
其實上面的解決方案還是有一些弊端,因為Server端是發送了2次send()
,第1次發送數據整體長度,第2次發送數據內容主體,這樣其實是不太好的(Server端可能同時處理多個鏈接,所以send()
次數越少越好),而且如果Server端傳的是一個文件的話那么局限性就太強了。因為我們只能將整體的消息長度發送過去而諸如文件名,文件大小之內的信息就發送不過去。
所以我們需要一個更加完美的解決方案,即Server端發送一次send()
就將本次的數據整體長度發送過去(還可以包括文件姓名,文件大小等信息。)
struct
模塊使用介紹
struct
模塊可以將其某一種數據格式序列化為固定長度的Bytes
類型,其中最重要的兩個方法就是pack()
、unpack()
。
pack(fmt,*args)
: 根據格式將其轉換為Bytes
類型
unpack(fmt,string)
:根據格式將Bytes
類型數據反解為其原本的形式
格式 | C語言類型 | Python類型 | 字節數大小 |
---|---|---|---|
x | 填充字節 | 沒有值 | |
c | char | 字節長度為1 | 1 |
b | signed char | 整數 | 1 |
B | unsigned char | 整數 | 1 |
? | _Bool | bool | 1 |
h | short | 整數 | 2 |
H | unsigned short | 整數 | 2 |
i | int | 整數 | 4 |
I | unsigned int | 整數 | 4 |
l | long | 整數 | 4 |
L | unsigned long | 整數 | 4 |
q | long long | 整數 | 8 |
Q | unsigned long long | 整數 | 8 |
n | ssize_t | 整數 | |
N | size_t | 整數 | |
f | float | 浮點數 | 4 |
d | double | 浮點數 | 8 |
s | char[] | 字節 | |
p | char[] | 字節 | |
P | void * | 整數 |
使用演示:
>>> import struct >>> b1 = struct.pack("i",12) # 嘗試將 int類型的12進行序列化,得到一個4字節的對象 >>> b1 b'\x0c\x00\x00\x00' >>> struct.unpack("i",b1) # 嘗試將12的序列化對象字節進行反解,得出元組,第1位就是需要的數據。 (12,) >>>
好了,了解到這里我們就可以開始進行改寫了。
Server端代碼如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基於TCP協議的socket實現遠程命令輸入之Server ==== import json import struct import subprocess from socket import * server = socket(AF_INET, SOCK_STREAM) server.bind(("0.0.0.0", 6666)) # 放在遠程填入0.0.0.0 放在本地測試填入127.0.0.1 server.listen(5) while 1: # 鏈接循環 conn, client_addr = server.accept() while 1: # 通信循環 try: # 防止Windows平台下Client端異常關閉導致雙向鏈接崩塌Server端異常的情況發生 cmd = conn.recv(1024) if not cmd: # 防止類Unix平台下Client端異常關閉導致雙向鏈接崩塌Server端異常的情況發生 break res = subprocess.Popen(cmd.decode("utf-8"), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout_res = res.stdout.read() # 正確結果 stderr_res = res.stderr.read() # 錯誤結果 # subprocess模塊拿到的是bytes類型,所以直接發送即可 cmd_res = stdout_res if stdout_res else stderr_res # 因為兩個結果只有一個有信息,所以我們只拿到有結果的那個 # 解決粘包:構建字典,包含數據主體長度,這個就相當於其頭部信息 head_msg = { "msg_length": len(cmd_res), # 包含數據主體部分的長度 # 如果是文件,還可以添加file_name,file_size等屬性。 } # 序列化成json格式,並且統計其頭部的長度 head_data = json.dumps(head_msg).encode("utf-8") head_length = struct.pack("i", len(head_data)) # 得到4字節的頭部信息,里面包含頭部的長度 # 發送頭部長度信息,頭部數據,與真實數據部分 conn.send(head_length + head_data + cmd_res) except Exception: break conn.close() # 由於client端鏈接異常,故關閉鏈接循環
Client端代碼如下:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # ==== 基於TCP協議的socket實現遠程命令輸入之Client ==== import json import struct from socket import * client = socket(AF_INET, SOCK_STREAM) client.connect(("xxx.xxx.xxx.xxx", 6666)) # 填入Server端公網IP while 1: cmd = input("請輸入命令>>>:").strip() if not cmd: continue if cmd == "quit": break client.send(cmd.encode("utf-8")) # 發送終端命令 # 解決粘包 head_length = struct.unpack("i", client.recv(4))[0] # 接收到頭部的長度信息 head_data = json.loads(client.recv(head_length)) # 接收到真實的頭部信息 msg_length = head_data["msg_length"] # 獲取到數據主體的長度信息 recv_length = 0 # 代表已接收的內容長度 cmd_res = b"" # 開始獲取真正的數據主體信息 while recv_length < msg_length: cmd_res += client.recv(1024) # 本次接收1024字節數據,可能是一小節數據 recv_length += len(cmd_res) # 添加上本次讀取的長度,當全部讀取完后應該 recv_length == msg_length else: print(cmd_res.decode("utf-8")) # 如果Server端是Windows則用gbk解碼,類Unix用utf-8解碼 client.close()
思想如下:
1.Server端構建自身的數據頭部分,其中包含數據體整體長度,如果傳輸的是文件的話還可以包含文件名,文件大小等信息
2.將數據頭部分
json
序列化后再轉換為Bytes
類型3.使用
struct.pack()
模塊獲取數據頭的長度,得到一個長度為4的Bytes
類型4.Server端將 數據頭長度 + 數據頭部分 + 數據體部分 全部發送給Client端
5. Client端
recv()
接收值改為4,拿到數據頭長度Bytes類型6. Client端使用
struct.unpack(
數據頭長度Bytes類型)
模塊反解出數據頭真實的長度7. Client端使用
recv()
接收值為數據頭真實的長度拿到真正的數據頭8. 通過
json
反序列化出真正的數據頭,在到其中取出數據體的長度9. 開始
while
循環不斷的讀取真實的數據體數據
解決方案3:iter()與偏函數(失敗案例)
上面那么做看似完美但還是美中不足。因為內存緩沖區本來就是只能取一次值,和迭代器很像,只能迭代一次便不能繼續迭代了。基於這一點我們來做一個終極優化:
還記得iter()
方法嗎?iter()
方法除開創建迭代器外實際上還有一個參數:
def iter(source, sentinel=None): # known special case of iter """ iter(iterable) -> iterator iter(callable, sentinel) -> iterator Get an iterator from an object. In the first form, the argument must supply its own iterator, or be a sequence. In the second form, the callable is called until it returns the sentinel. """ pass
我們來試試這個參數做什么用的。
li = [1, 2, 3, 4] def my_iter(): return li.pop() res = iter(my_iter, 2) # 代表這個迭代器沒__next__一下就會執行my_iter函數,並且該函數返回值如果是2則終止迭代 print(res.__next__()) # 4 print(res.__next__()) # 3 print(res.__next__()) # StopIteration
第二個參數看來可以設置迭代的終點。
那么偏函數是什么呢?偏函數可以設定一個固定的參數給第一個位置的值
效果如下:
from functools import partial # 導入偏函數 def add(x, y): return x + y func = partial(add, 1) # 設置辨寒暑綁定的第一個參數的值 print(func(1)) # 2 print(func(5)) # 6
現在我們仔細回想,當緩沖區的消息接收完畢后為空的狀態是會變成 b""
的形式。那么這個時候我們可以使用iter()
方法設置為不斷的取出緩存中的值直到出現b""
,而偏函數可以對recv()
函數進行設置讓它始終取一個值,最后通過join來拼接出取出的所有值即可。
可以使用 "".join(iter(partial(tcp_clien.recv,back_log)),b"")
我們嘗試用函數來查看一下效果:
from functools import partial # 導入偏函數 li = [b"","1","2","3","4","5"] # 模擬內核緩沖區 def test(buffer_size): if buffer_size: # 模擬recv的數據大小 return li.pop() print("buffer_size必須為一個int類型的值") res = "".join(iter(partial(test,1024),b"")) print(res) # 54321 # join()方法會不斷的調用iter()下的__next__,每調用一次就執行一次偏函數。知道出現b""停止
最后我們發現,這樣的做法是會產生recv()
阻塞的,總體來說還是不能夠成功。因為join()
方法會不斷的執行,即使內核緩沖區的數據被recv()
讀完了也不會終止迭代而是繼續阻塞下次的recv()
,故這種方式宣告失敗。(還是iter()
的第二個參數導致的,或許讀取完后內核緩沖區中的數據並不是b""
)
測試的Server端代碼如下:
from socket import * import subprocess import struct ip_port=('127.0.0.1',8080) back_log=5 buffer_size=1024 tcp_server=socket(AF_INET,SOCK_STREAM) tcp_server.bind(ip_port) tcp_server.listen(back_log) while True: conn,addr=tcp_server.accept() print('新的Client鏈接',addr) while True: #收 try: cmd=conn.recv(buffer_size) if not cmd:break print('收到Client的命令',cmd) #執行命令,得到命令的運行結果cmd_res res=subprocess.Popen(cmd.decode('utf-8'),shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, stdin=subprocess.PIPE) err=res.stderr.read() if err: cmd_res=err else: cmd_res=res.stdout.read() #發 if not cmd_res: cmd_res='執行成功'.encode('gbk') length=len(cmd_res) data_length=struct.pack('i',length) conn.send(data_length) conn.send(cmd_res) except Exception as e: print(e) break
測試的Client代碼如下:
from socket import * import struct from functools import partial #偏函數 ip_port=('127.0.0.1',8080) back_log=5 buffer_size=1024 tcp_client=socket(AF_INET,SOCK_STREAM) tcp_client.connect(ip_port) while True: cmd=input('>>: ').strip() if not cmd:continue if cmd == 'quit':break tcp_client.send(cmd.encode('utf-8')) #解決粘包 length_data=tcp_client.recv(4) length=struct.unpack('i',length_data)[0] #第一種方法 recv_size=0 recv_msg=b'' while recv_size < length: #為何recv里是buffer_size,不是length,因為length如果為24G,系統內存沒有那么大 #所以每次buffer_size,當recv_size < length時,循環接收,直到recv_size =length,退出循環 recv_msg += tcp_client.recv(buffer_size) recv_size=len(recv_msg) #1024 #第二種方法 失敗版本,會引發recv()的阻塞,而不會終止迭代。因為join()方法會不斷的調用其iter()方法產生的迭代器,也就是調用其__next__方法,所以第二次沒消息的recv()會阻塞住。 #recv_msg=''.join(iter(partial(tcp_client.recv, buffer_size), b'')) print('命令的執行結果是 ',recv_msg.decode('gbk')) tcp_client.close()
UDP協議為何不會產生粘包
UDP協議是面向消息的協議,每一次的
sendto()
與recvfrom()
必須一一對應,否則就會收不到消息。
UDP是面向消息的協議,每個UDP段都是一條消息,每sendto()
一次就是發送一次消息,而不管接收方有沒有收到消息發送方只管自己的發送任務,這也是UDP被稱為不可靠傳輸協議的由來。接收端的套接字緩沖區采用了鏈式的結構來記錄每一個到達的UDP包,在每一個UDP包中都有了消息頭,包括端口,消息源等等..於是UDP就能夠去區分出一個明確的消息定義,即面向消息的通信是有消息邊界的,所以UDP的傳輸叫做數據報的形式。
並且每一次recvform()
的buffer_size
最大值如果不夠獲取完全部的內核緩沖區里的數據的話,那么只會收夠指定的最大字節數量(即buffer_size
的設定值),剩余的就不要了。所以UDP不會存在粘包,多么干脆利落...
我們還是用一個快遞員的那個圖來進行演示:
還有一點需要注意一下。使用UDP協議進行通信的時候不管首先啟動哪一方都不會報錯,因為它只管發,不管有沒有人接收。
所以,這也是我稱UDP協議比較隨便的原因。