隨着智能硬件越來越流行,很多后端開發人員都有可能接觸到socket編程。而很多情況下,服務器與端上需要保證數據的有序,穩定到達,自然而然就會選擇基於tcp/ip協議的socekt開發。開發過程中,經常會遇到tcp粘包,拆包的問題,本文將從產生原因,和解決方案以及workerman是如何處理粘包拆包問題的,這幾個層面來說明這個問題。
什么是粘包拆包
對於什么是粘包、拆包問題,我想先舉兩個簡單的應用場景:
-
客戶端和服務器建立一個連接,客戶端發送一條消息,客戶端關閉與服務端的連接。
-
客戶端和服務器簡歷一個連接,客戶端連續發送兩條消息,客戶端關閉與服務端的連接。
對於第一種情況,服務端的處理流程可以是這樣的:當客戶端與服務端的連接建立成功之后,服務端不斷讀取客戶端發送過來的數據,當客戶端與服務端連接斷開之后,服務端知道已經讀完了一條消息,然后進行解碼和后續處理...。對於第二種情況,如果按照上面相同的處理邏輯來處理,那就有問題了,我們來看看第二種情況下客戶端發送的兩條消息遞交到服務端有可能出現的情況:
第一種情況:
服務端一共讀到兩個數據包,第一個包包含客戶端發出的第一條消息的完整信息,第二個包包含客戶端發出的第二條消息,那這種情況比較好處理,服務器只需要簡單的從網絡緩沖區去讀就好了,第一次讀到第一條消息的完整信息,消費完再從網絡緩沖區將第二條完整消息讀出來消費。
沒有發生粘包、拆包示意圖
第二種情況:
服務端一共就讀到一個數據包,這個數據包包含客戶端發出的兩條消息的完整信息,這個時候基於之前邏輯實現的服務端就蒙了,因為服務端不知道第一條消息從哪兒結束和第二條消息從哪兒開始,這種情況其實是發生了TCP粘包。
TCP粘包示意圖
第三種情況:
服務端一共收到了兩個數據包,第一個數據包只包含了第一條消息的一部分,第一條消息的后半部分和第二條消息都在第二個數據包中,或者是第一個數據包包含了第一條消息的完整信息和第二條消息的一部分信息,第二個數據包包含了第二條消息的剩下部分,這種情況其實是發送了TCP拆,因為發生了一條消息被拆分在兩個包里面發送了,同樣上面的服務器邏輯對於這種情況是不好處理的。
TCP拆包示意圖
產生tcp粘包和拆包的原因
我們知道tcp是以流動的方式傳輸數據,傳輸的最小單位為一個報文段(segment)。tcp Header中有個Options標識位,常見的標識為mss(Maximum Segment Size)指的是,連接層每次傳輸的數據有個最大限制MTU(Maximum Transmission Unit),一般是1500比特,超過這個量要分成多個報文段,mss則是這個最大限制減去TCP的header,光是要傳輸的數據的大小,一般為1460比特。換算成字節,也就是180多字節。
tcp為提高性能,發送端會將需要發送的數據發送到緩沖區,等待緩沖區滿了之后,再將緩沖中的數據發送到接收方。同理,接收方也有緩沖區這樣的機制,來接收數據。
發生TCP粘包、拆包主要是由於下面一些原因:
-
應用程序寫入的數據大於套接字緩沖區大小,這將會發生拆包。
-
應用程序寫入數據小於套接字緩沖區大小,網卡將應用多次寫入的數據發送到網絡上,這將會發生粘包。
-
進行mss(最大報文長度)大小的TCP分段,當TCP報文長度-TCP頭部長度>mss的時候將發生拆包。
-
接收方法不及時讀取套接字緩沖區數據,這將發生粘包。
-
……
如何解決拆包粘包
既然知道了tcp是無界的數據流,且協議本身無法避免粘包,拆包的發生,那我們只能在應用層數據協議上,加以控制。通常在制定傳輸數據時,可以使用如下方法:
-
使用帶消息頭的協議、消息頭存儲消息開始標識及消息長度信息,服務端獲取消息頭的時候解析出消息長度,然后向后讀取該長度的內容。
-
設置定長消息,服務端每次讀取既定長度的內容作為一條完整消息。
-
設置消息邊界,服務端從網絡流中按消息編輯分離出消息內容。
a)先基於第三種方法,假設區分數據邊界的標識為換行符"\n"(注意請求數據本身內部不能包含換行符),數據格式為Json,例如下面是一個符合這個規則的請求包。
{"type":"message","content":"hello"}\n
注意上面的請求數據末尾有一個換行字符(在PHP中用雙引號字符串"\n"表示),代表一個請求的結束。
b)基於第一種方法,可以制定,首部固定10個字節長度用來保存整個數據包長度,位數不夠補0的數據協議
0000000036{"type":"message","content":"hello"}
c)基於第一種方法,可以制定,首部4字節網絡字節序unsigned int,標記整個包的長度
****{"type":"message","content":"hello all"}
其中首部四字節*號代表一個網絡字節序的unsigned int數據,為不可見字符,緊接着是Json的數據格式的包體數據。
基於workerman的解決方案
制定了數據協議,那我們下面來通過代碼具體分析一下,php中workerman,是如何解決上述問題的。為了便於理解,可以看下下面的流程圖
workerman是基於策略模式來設計處理tcp粘包,拆包問題的。具體數據協議的制定在應用目錄Applications/YourApp/Protocols目錄下,實現則是在框架目錄Workerman/Connection/TcpConnection.php中。這樣的好處就是用戶可以隨意定制自己的數據協議格式,而框架代碼都能處理。
我們現在Applications/YourApp/Protocols目錄下,建一個jsonNL.php,來實現自己制定自己定義的數據協議。
JsonNL.php的實現
-
namespace Protocols;
-
class JsonNL
-
{
-
/**
-
* 檢查包的完整性
-
* 如果能夠得到包長,則返回包的在buffer中的長度,否則返回0繼續等待數據
-
* 如果協議有問題,則可以返回false,當前客戶端連接會因此斷開
-
* @param string $buffer
-
* @return int
-
*/
-
public static function input($buffer)
-
{
-
// 獲得換行字符"\n"位置
-
$pos = strpos($buffer, "\n");
-
// 沒有換行符,無法得知包長,返回0繼續等待數據
-
if($pos === false)
-
{
-
return 0;
-
}
-
// 有換行符,返回當前包長(包含換行符)
-
return $pos+1;
-
}
-
-
/**
-
* 打包,當向客戶端發送數據的時候會自動調用
-
* @param string $buffer
-
* @return string
-
*/
-
public static function encode($buffer)
-
{
-
// json序列化,並加上換行符作為請求結束的標記
-
return json_encode($buffer)."\n";
-
}
-
-
/**
-
* 解包,當接收到的數據字節數等於input返回的值(大於0的值)自動調用
-
* 並傳遞給onMessage回調函數的$data參數
-
* @param string $buffer
-
* @return string
-
*/
-
public static function decode($buffer)
-
{
-
// 去掉換行,還原成數組
-
return json_decode(trim($buffer), true);
-
}
-
}
再看下TcpConnection.php中,接收數據時,如何處理。
-
public function baseRead($socket, $check_eof = true)
-
{
-
$buffer = fread($socket, self::READ_BUFFER_SIZE);
-
-
// Check connection closed.
-
if ($buffer === '' || $buffer === false) {
-
if ($check_eof && (feof($socket) || !is_resource($socket) || $buffer === false)) {
-
$this->destroy();
-
return;
-
}
-
} else {
-
$this->_recvBuffer .= $buffer;
-
}
-
-
// If the application layer protocol has been set up.
-
if ($this->protocol) {
-
$parser = $this->protocol;
-
while ($this->_recvBuffer !== '' && !$this->_isPaused) {
-
// The current packet length is known.
-
if ($this->_currentPackageLength) {
-
// Data is not enough for a package.
-
if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
-
break;
-
}
-
} else {
-
// Get current package length.
-
$this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
-
// The packet length is unknown.
-
if ($this->_currentPackageLength === 0) {
-
break;
-
} elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize) {
-
// Data is not enough for a package.
-
if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
-
break;
-
}
-
} // Wrong package.
-
else {
-
echo 'error package. package_length=' . var_export($this->_currentPackageLength, true);
-
$this->destroy();
-
return;
-
}
-
}
-
-
// The data is enough for a packet.
-
self::$statistics['total_request']++;
-
// The current packet length is equal to the length of the buffer.
-
if (strlen($this->_recvBuffer) === $this->_currentPackageLength) {
-
$one_request_buffer = $this->_recvBuffer;
-
$this->_recvBuffer = '';
-
} else {
-
// Get a full package from the buffer.
-
$one_request_buffer = substr( $this->_recvBuffer, 0, $this->_currentPackageLength);
-
// Remove the current package from the receive buffer.
-
$this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
-
}
-
// Reset the current packet length to 0.
-
$this->_currentPackageLength = 0;
-
if (!$this->onMessage) {
-
continue;
-
}
-
try {
-
// Decode request buffer before Emitting onMessage callback.
-
call_user_func( $this->onMessage, $this, $parser::decode($one_request_buffer, $this));
-
} catch (\Exception $e) {
-
Worker::log($e);
-
exit(250);
-
} catch (\Error $e) {
-
Worker::log($e);
-
exit(250);
-
}
-
}
-
return;
-
}
-
-
if ($this->_recvBuffer === '' || $this->_isPaused) {
-
return;
-
}
-
-
// Applications protocol is not set.
-
self::$statistics['total_request']++;
-
if (!$this->onMessage) {
-
$this->_recvBuffer = '';
-
return;
-
}
-
try {
-
call_user_func( $this->onMessage, $this, $this->_recvBuffer);
-
} catch (\Exception $e) {
-
Worker::log($e);
-
exit(250);
-
} catch (\Error $e) {
-
Worker::log($e);
-
exit(250);
-
}
-
// Clean receive buffer.
-
$this->_recvBuffer = '';
-
}
上面的代碼比較多,不需要細讀,幾個關鍵的地方可以看出處理的思路,先把接收的數據包追加到_recvBuffer變量中,然后調用用戶自己定義的數據協議中的input方法。input方法則會判斷數據中是否包含邊界符,如果不包含則返回0,包含則返回當前數據包的大小。框架中接收到input的返回值后,如果接收值為0,則跳出循環不做處理,如果接收值不為0,則將截取的數據包賦值給one_request_buffer,並且重置_recvBuffer
-
// Get a full package from the buffer.
-
$one_request_buffer = substr( $this->_recvBuffer, 0, $this->_currentPackageLength);
-
// Remove the current package from the receive buffer.
-
$this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
最后:tcp雖然是個強大的協議,能保證數據的穩定性,一致性,但在實際開發中,我們還需要根據實際的數據協議,來控制每次獲取的包是客戶端發過來的一個完整的可以解析的包。