粘包現象與解決方案


簡單遠程執行命令程序開發

是時候用戶socket干點正事呀,我們來寫一個遠程執行命令的程序,寫一個socket client端在windows端發送指令,一個socket server在Linux端執行命令並返回結果給客戶端

執行命令的話,肯定是用我們學過的subprocess模塊啦,但注意注意注意:

 res = subprocess.Popen(cmd.decode('utf-8'),shell=True,stderr=subprocess.PIPE,stdout=subprocess.PIPE) 

命令結果的編碼是以當前所在的系統為准的,如果是windows,那么res.stdout.read()讀出的就是GBK編碼的,在接收端需要用GBK解碼,且只能從管道里讀一次結果

ssh server

import socket
import subprocess

ip_port = ('127.0.0.1', 8080)


tcp_socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(5)

while True:
    conn, addr = tcp_socket_server.accept()
    print('客戶端', addr)

    while True:
        cmd = conn.recv(1024)
        if len(cmd) == 0: break
        print("recv cmd",cmd)
        res = subprocess.Popen(cmd.decode('utf-8'), shell=True,
                               stdout=subprocess.PIPE,
                               stdin=subprocess.PIPE,
                               stderr=subprocess.PIPE)

        stderr = res.stderr.read()
        stdout = res.stdout.read()
        print("res length",len(stdout))
        conn.send(stderr)
        conn.send(stdout)
View Code

ssh client

import socket
ip_port = ('127.0.0.1', 8080)

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
res = s.connect_ex(ip_port)

while True:
    msg = input('>>: ').strip()
    if len(msg) == 0: continue
    if msg == 'quit': break

    s.send(msg.encode('utf-8'))
    act_res = s.recv(1024)

    print(act_res.decode('utf-8'), end='')
View Code

嘗試執行ls、pwd命令,你驚喜的發現,拿到了正確的結果!

but 莫開心太早,此時執行一個結果比較長的命令,比如top -bn 1, 你發現依然可以拿到結果,但如果再執行一條df -h的話,就發現,你拿到並不是df命令的結果,而是上一條top命令的部分結果。為啥捏?為啥捏?為啥捏?

是因為,top命令的結果比較長,但客戶端只recv(1024), 可結果比1024長呀,那怎么辦,只好在服務器端的IO緩沖區里把客戶端還沒收走的暫時存下來,等客戶端下次再來收,所以當客戶端第2次調用recv(1024)就會首先把上次沒收完的數據先收下來,再收df命令的結果。

那怎么解決呢? 有些同學說,直接把recv(1024)改大不就好了,改成5000\10000或whatever. 可我的親,這么干的話,並不能解決實際問題,因為你不可能提前知道對方返回的結果數據大下,無論你改成多大,對方的結果都有可能比你設置的大,另外這個recv並不是真的可以隨便改特別大的,有關部門建議的不要超過8192,再大反而會出現影響收發速度和不穩定的情況

同志們,這個現象叫做粘包,就是指兩次結果粘到一起了。它的發生主要是因為socket緩沖區導致的,來看一下

 

你的程序實際上無權直接操作網卡的,你操作網卡都是通過操作系統給用戶程序暴露出來的接口,那每次你的程序要給遠程發數據時,其實是先把數據從用戶態copy到內核態,這樣的操作是耗資源和時間的,頻繁的在內核態和用戶態之前交換數據勢必會導致發送效率降低, 因此socket 為提高傳輸效率,發送方往往要收集到足夠多的數據后才發送一次數據給對方。若連續幾次需要send的數據都很少,通常TCP socket 會根據優化算法把這些數據合成一個TCP段后一次發送出去,這樣接收方就收到了粘包數據。

粘包問題只存在於TCP中,Not UDP

還是看上圖,發送端可以是一K一K地發送數據,而接收端的應用程序可以兩K兩K地提走數據,當然也有可能一次提走3K或6K數據,或者一次只提走幾個字節的數據,也就是說,應用程序所看到的數據是一個整體,或說是一個流(stream),一條消息有多少字節對應用程序是不可見的,因此TCP協議是面向流的協議,這也是容易出現粘包問題的原因。而UDP是面向消息的協議,每個UDP段都是一條消息,應用程序必須以消息為單位提取數據,不能一次提取任意字節的數據,這一點和TCP是很不同的。怎樣定義消息呢?可以認為對方一次性write/send的數據為一個消息,需要明白的是當對方send一條信息的時候,無論底層怎樣分段分片,TCP協議層會把構成整條消息的數據段排序完成后才呈現在內核緩沖區。

例如基於tcp的套接字客戶端往服務端上傳文件,發送時文件內容是按照一段一段的字節流發送的,在接收方看了,根本不知道該文件的字節流從何處開始,在何處結束

所謂粘包問題主要還是因為接收方不知道消息之間的界限,不知道一次性提取多少字節的數據所造成的。

總結

  1. TCP(transport control protocol,傳輸控制協議)是面向連接的,面向流的,提供高可靠性服務。收發兩端(客戶端和服務器端)都要有一一成對的socket,因此,發送端為了將多個發往接收端的包,更有效的發到對方,使用了優化方法(Nagle算法),將多次間隔較小且數據量小的數據,合並成一個大的數據塊,然后進行封包。這樣,接收端,就難於分辨出來了,必須提供科學的拆包機制。 即面向流的通信是無消息保護邊界的。
  2. UDP(user datagram protocol,用戶數據報協議)是無連接的,面向消息的,提供高效率服務。不會使用塊的合並優化算法,, 由於UDP支持的是一對多的模式,所以接收端的skbuff(套接字緩沖區)采用了鏈式結構來記錄每一個到達的UDP包,在每個UDP包中就有了消息頭(消息來源地址,端口等信息),這樣,對於接收端來說,就容易進行區分處理了。 即面向消息的通信是有消息保護邊界的。
  3. tcp是基於數據流的,於是收發的消息不能為空,這就需要在客戶端和服務端都添加空消息的處理機制,防止程序卡住,而udp是基於數據報的,即便是你輸入的是空內容(直接回車),那也不是空消息,udp協議會幫你封裝上消息頭,實驗略

 

基於UDP的命令執行程序

上面說了,udp不存在粘包問題,我們看一下實例

udp server

import socket
import subprocess

ip_port = ('127.0.0.1', 9003)
bufsize = 1024

udp_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_server.bind(ip_port)

while True:
    # 收消息
    cmd, addr = udp_server.recvfrom(bufsize)
    print('用戶命令----->', cmd,addr)

    # 邏輯處理
    res = subprocess.Popen(cmd.decode('utf-8'), shell=True, stderr=subprocess.PIPE, stdin=subprocess.PIPE,
                           stdout=subprocess.PIPE)
    stderr = res.stderr.read()
    stdout = res.stdout.read()

    # 發消息
    udp_server.sendto(stdout + stderr, addr)

udp_server.close()
View Code

udp client

from socket import *

import time

ip_port = ('127.0.0.1', 9003)
bufsize = 1024

udp_client = socket(AF_INET, SOCK_DGRAM)

while True:
    msg = input('>>: ').strip()
    if len(msg) == 0:
        continue

    udp_client.sendto(msg.encode('utf-8'), ip_port)
    data, addr = udp_client.recvfrom(bufsize)
    print(data.decode('utf-8'), end='')
View Code

粘包的解決辦法

問題的根源在於,接收端不知道發送端將要傳送的字節流的長度,所以解決粘包的方法就是圍繞,如何讓發送端在發送數據前,把自己將要發送的字節流總大小讓接收端知曉,然后接收端來一個死循環接收完所有數據

普通青年版

服務器端

import socket,subprocess
ip_port=('127.0.0.1',8080)
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

s.bind(ip_port)
s.listen(5)

while True:
    conn,addr=s.accept()
    print('客戶端',addr)
    while True:
        msg=conn.recv(1024)
        if not msg:break
        res=subprocess.Popen(msg.decode('utf-8'),shell=True,\
                            stdin=subprocess.PIPE,\
                         stderr=subprocess.PIPE,\
                         stdout=subprocess.PIPE)
        err=res.stderr.read()
        if err:
            ret=err
        else:
            ret=res.stdout.read()
        data_length=len(ret)
        conn.send(str(data_length).encode('utf-8'))
        data=conn.recv(1024).decode('utf-8')
        if data == 'recv_ready':
            conn.sendall(ret)
    conn.close()
View Code

客戶端

import socket,time
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(('127.0.0.1',8080))

while True:
    msg=input('>>: ').strip()
    if len(msg) == 0:continue
    if msg == 'quit':break

    s.send(msg.encode('utf-8'))
    length=int(s.recv(1024).decode('utf-8'))
    s.send('recv_ready'.encode('utf-8'))
    send_size=0
    recv_size=0
    data=b''
    while recv_size < length:
        data+=s.recv(1024)
        recv_size+=len(data) #為什么不直接寫1024?


    print(data.decode('utf-8'))
View Code

為何low?

程序的運行速度遠快於網絡傳輸速度,所以在發送一段字節前,先用send去發送該字節流長度,這種方式會放大網絡延遲帶來的性能損耗

剛才上面 在發送消息之前需先發送消息長度給對端,還必須要等對端返回一個ready收消息的確認,不等到對端確認就直接發消息的話,還是會產生粘包問題(承載消息長度的那條消息和消息本身粘在一起)。 有沒有優化的好辦法么?

文藝青年版一

思考一個問題,為什么不能在發送了消息長度(稱為消息頭head吧)給對端后,立刻發消息內容(稱為body吧),是因為怕head 和body 粘在一起,所以通過等對端返回確認來把兩條消息中斷開。

可不可以直接發head + body,但又能讓對端區分出哪個是head,哪個是body呢?我靠、我靠,感覺智商要漲了。

想到了,把head設置成定長的呀,這樣對端只要收消息時,先固定收定長的數據,head里寫好,后面還有多少是屬於這條消息的數據,然后直接寫個循環收下來不就完了嘛!唉呀媽呀,我真機智。

可是、可是如何制作定長的消息頭呢?假設你有2條消息要發送,第一條消息長度是 3000個字節,第2條消息是200字節。如果消息頭只包含消息長度的話,那兩個消息的消息頭分別是

 len(msg1) = 4000 = 4字節 len(msg2) = 200 = 3字節 

你的服務端如何完整的收到這個消息頭呢?是recv(3)還是recv(4)服務器端怎么知道?用盡我所有知識,我只能想到拼接字符串的辦法了,打比方就是設置消息頭固定100字節長,不夠的拿空字符串去拼接。

server

import socket,json
import subprocess

ip_port = ('127.0.0.1', 8080)

tcp_socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_socket_server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #一行代碼搞定,寫在bind之前
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(5)

def pack_msg_header(header,size):
    bytes_header = bytes(json.dumps(header),encoding="utf-8")
    fill_up_size = size -  len(bytes_header)
    print("need to fill up ",fill_up_size)

    header['fill'] = header['fill'].zfill(fill_up_size)
    print("new header",header)
    bytes_new_header = bytes(bytes(json.dumps(header),encoding="utf-8"))
    return bytes_new_header

while True:
    conn, addr = tcp_socket_server.accept()
    print('客戶端', addr)

    while True:
        cmd = conn.recv(1024)
        if len(cmd) == 0: break
        print("recv cmd",cmd)
        res = subprocess.Popen(cmd.decode('utf-8'), shell=True,
                               stdout=subprocess.PIPE,
                               stdin=subprocess.PIPE,
                               stderr=subprocess.PIPE)

        stderr = res.stderr.read()
        stdout = res.stdout.read()
        print("res length",len(stdout))

        msg_header = {
            'length':len(stdout + stderr),
            'fill':''
        }
        packed_header = pack_msg_header(msg_header,100)
        print("packed header size",packed_header,len(packed_header))
        conn.send(packed_header)
        conn.send(stdout + stderr)
View Code

client

import socket
import json

ip_port = ('127.0.0.1', 8080)

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
res = s.connect_ex(ip_port)

while True:
    msg = input('>>: ').strip()
    if len(msg) == 0: continue
    if msg == 'quit': break

    s.send(msg.encode('utf-8'))
    response_msg_header = s.recv(100).decode("utf-8")

    response_msg_header_data = json.loads(response_msg_header)
    msg_size = response_msg_header_data['length']

    res = s.recv(msg_size)
    print("received res size ",len(res))
    print(res.decode('utf-8'), end='')
View Code

文藝青年版二

為字節流加上自定義固定長度報頭也可以借助於第三方模塊struct,用法為

import json,struct
#假設通過客戶端上傳1T:1073741824000的文件a.txt

#為避免粘包,必須自定制報頭
header={'file_size':1073741824000,'file_name':'/a/b/c/d/e/a.txt','md5':'8f6fbf8347faa4924a76856701edb0f3'} #1T數據,文件路徑和md5值

#為了該報頭能傳送,需要序列化並且轉為bytes
head_bytes=bytes(json.dumps(header),encoding='utf-8') #序列化並轉成bytes,用於傳輸

#為了讓客戶端知道報頭的長度,用struck將報頭長度這個數字轉成固定長度:4個字節
head_len_bytes=struct.pack('i',len(head_bytes)) #這4個字節里只包含了一個數字,該數字是報頭的長度

#客戶端開始發送
conn.send(head_len_bytes) #先發報頭的長度,4個bytes
conn.send(head_bytes) #再發報頭的字節格式
conn.sendall(文件內容) #然后發真實內容的字節格式

#服務端開始接收
head_len_bytes=s.recv(4) #先收報頭4個bytes,得到報頭長度的字節格式
x=struct.unpack('i',head_len_bytes)[0] #提取報頭的長度

head_bytes=s.recv(x) #按照報頭長度x,收取報頭的bytes格式
header=json.loads(json.dumps(header)) #提取報頭

#最后根據報頭的內容提取真實的數據,比如
real_data_len=s.recv(header['file_size'])
s.recv(real_data_len)
View Code

使用struct模塊實現方式如下

server

import socket,struct,json
import subprocess
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加

phone.bind(('127.0.0.1',8080))

phone.listen(5)

while True:
    conn,addr=phone.accept()
    while True:
        cmd=conn.recv(1024)
        if not cmd:break
        print('cmd: %s' %cmd)

        res=subprocess.Popen(cmd.decode('utf-8'),
                             shell=True,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        err=res.stderr.read()
        print(err)
        if err:
            back_msg=err
        else:
            back_msg=res.stdout.read()

        headers={'data_size':len(back_msg)}
        head_json=json.dumps(headers)
        head_json_bytes=bytes(head_json,encoding='utf-8')

        conn.send(struct.pack('i',len(head_json_bytes))) #先發報頭的長度
        conn.send(head_json_bytes) #再發報頭
        conn.sendall(back_msg) #在發真實的內容

    conn.close()
View Code

client

from socket import *
import struct,json

ip_port=('127.0.0.1',8080)
client=socket(AF_INET,SOCK_STREAM)
client.connect(ip_port)

while True:
    cmd=input('>>: ')
    if not cmd:continue
    client.send(bytes(cmd,encoding='utf-8'))

    head=client.recv(4) #先收4個bytes,這里4個bytes里包含了報頭的長度
    head_json_len=struct.unpack('i',head)[0] #解出報頭的長度
    head_json=json.loads(client.recv(head_json_len).decode('utf-8')) #拿到報頭
    data_len=head_json['data_size'] #取出報頭內包含的信息

    #開始收數據
    recv_size=0
    recv_data=b''
    while recv_size < data_len:
        recv_data+=client.recv(1024)
        recv_size+=len(recv_data)

    print(recv_data.decode('utf-8'))
    #print(recv_data.decode('gbk')) #windows默認gbk編碼
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM