對於基於TCP開發的通訊程序,有個很重要的問題需要解決,就是封包和拆包.自從我從事網絡通訊編程工作以來(大概有三年的時間了),我一直在思索和改進封包和拆包的方法.下面就針對這個問題談談我的想法,拋磚引玉.若有不對,不妥之處,懇求大家指正.在此先謝過大家了.
一.為什么基於TCP的通訊程序需要進行封包和拆包.
TCP是個"流"協議,所謂流,就是沒有界限的一串數據.大家可以想想河里的流水,是連成一片的,其間是沒有分界線的.但一般通訊程序開發是需要定義一個 個相互獨立的數據包的,比如用於登陸的數據包,用於注銷的數據包.由於TCP"流"的特性以及網絡狀況,在進行數據傳輸時會出現以下幾種情況.
假設我們連續調用兩次send分別發送兩段數據data1和data2,在接收端有以下幾種接收情況(當然不止這幾種情況,這里只列出了有代表性的情況).
A.先接收到data1,然后接收到data2.
B.先接收到data1的部分數據,然后接收到data1余下的部分以及data2的全部.
C.先接收到了data1的全部數據和data1的部分數據,然后接收到了data2的余下的數據.
D.一次性接收到了data1和data2的全部數據.
對於A這種情況正是我們需要的,不再做討論.對於B,C,D的情況就是大家經常說的"粘包",就需要我們把接收到的數據進行拆包,拆成一個個獨立的數據包.為了拆包就必須在發送端進行封包.
另:對於UDP來說就不存在拆包的問題,因為UDP是個"數據包"協議,也就是兩段數據間是有界限的,在接收端要么接收不到數據要么就是接收一個完整的一段數據,不會少接收也不會多接收.
二.為什么會出現B.C.D的情況.
"粘包"可發生在發送端也可發生在接收端.
1.由Nagle算法造成的發送端的粘包:Nagle算法是一種改善網絡傳輸效率的算法.簡單的說,當我們提交一段數據給TCP發送時,TCP並不立刻發 送此段數據,而是等待一小段時間,看看在等待期間是否還有要發送的數據,若有則會一次把這兩段數據發送出去.這是對Nagle算法一個簡單的解釋,詳細的 請看相關書籍.象C和D的情況就有可能是Nagle算法造成的.
2.接收端接收不及時造成的接收端粘包:TCP會把接收到的數據存在自己的緩沖區中,然后通知應用層取數據.當應用層由於某些原因不能及時的把TCP的數據取出來,就會造成TCP緩沖區中存放了幾段數據.
三.怎樣封包和拆包.
最初遇到"粘包"的問題時,我是通過在兩次send之間調用sleep來休眠一小段時間來解決.這個解決方法的缺點是顯而易見的,使傳輸效率大大降低,而 且也並不可靠.后來就是通過應答的方式來解決,盡管在大多數時候是可行的,但是不能解決象B的那種情況,而且采用應答方式增加了通訊量,加重了網絡負荷. 再后來就是對數據包進行封包和拆包的操作.
封包:
封包就是給一段數據加上包頭,這樣一來數據包就分為包頭和包體兩部分內容了(以后講過濾非法包時封包會加入"包尾"內容).包頭其實上是個大小固定的結構 體,其中有個結構體成員變量表示包體的長度,這是個很重要的變量,其他的結構體成員可根據需要自己定義.根據包頭長度固定以及包頭中含有包體長度的變量就 能正確的拆分出一個完整的數據包.
對於拆包目前我最常用的是以下兩種方式.
1.動態緩沖區暫存方式.之所以說緩沖區是動態的是因為當需要緩沖的數據長度超出緩沖區的長度時會增大緩沖區長度.
大概過程描述如下:
A,為每一個連接動態分配一個緩沖區,同時把此緩沖區和SOCKET關聯,常用的是通過結構體關聯.
B,當接收到數據時首先把此段數據存放在緩沖區中.
C,判斷緩存區中的數據長度是否夠一個包頭的長度,如不夠,則不進行拆包操作.
D,根據包頭數據解析出里面代表包體長度的變量.
E,判斷緩存區中除包頭外的數據長度是否夠一個包體的長度,如不夠,則不進行拆包操作.
F,取出整個數據包.這里的"取"的意思是不光從緩沖區中拷貝出數據包,而且要把此數據包從緩存區中刪除掉.刪除的辦法就是把此包后面的數據移動到緩沖區的起始地址.
這種方法有兩個缺點.1.為每個連接動態分配一個緩沖區增大了內存的使用.2.有三個地方需要拷貝數據,一個地方是把數據存放在緩沖區,一個地方是把完整的數據包從緩沖區取出來,一個地方是把數據包從緩沖區中刪除.第二種拆包的方法會解決和完善這些缺點.
下面給出相關代碼.
前面提到過這種方法的缺點.下面給出一個改進辦法, 即采用環形緩沖.但是這種改進方法還是不能解決第一個缺點以及第一個數據拷貝,只能解決第三個地方的數據拷貝(這個地方是拷貝數據最多的地方).第2種拆包方式會解決這兩個問題.
環形緩沖實現方案是定義兩個指針,分別指向有效數據的頭和尾.在存放數據和刪除數據時只是進行頭尾指針的移動.
用代碼來說明.注:下面的代碼是采用一個開源的游戲服務器的代碼,我對此代碼有所修改. 2.利用底層的緩沖區來進行拆包
由於TCP也維護了一個緩沖區,所以我們完全可以利用TCP的緩沖區來緩存我們的數據,這樣一來就不需要為每一個連接分配一個緩沖區了.另一方面我們知道
recv或者wsarecv都有一個參數,用來表示我們要接收多長長度的數據.利用這兩個條件我們就可以對第一種方法進行優化.
對於阻塞SOCKET來說,我們可以利用一個循環來接收包頭長度的數據,然后解析出代表包體長度的那個變量,再用一個循環來接收包體長度的數據.
拆包一直是個硬傷呀,MLGB的,服務端各種亂數據,果斷整理下
拆包思路:設計一個網絡協議,一般都會分包,一個包就相當於一個邏輯上的命令。
1、如果我們用udp協議,省事的多,一次會收到一個完整的包,但UDP不可靠,順序也不能保證,當然像QQ對UDP封裝的很好,模擬了TCP的可靠性。網上也有一些封裝好的可靠的UDP組件,大家用的話可以找找。關於用什么協議好這個問題,本貼不討論。 2、如果我們用TCP協議不是長連接,像HTTP(不考慮KeepAlive)那樣,一個連接上只發送一個包,我們也會很清晰的區分出接受到的每一個包。 3、還有就是我們還用TCP長連接,但每次發送固定長度的包,如果要發送的數據長度不夠就用補齊,如果大於固定長度,就分成幾個發,這個也很簡單實用。 4、再有就是一個包有特定的開始和結尾,比如包頭是<bof>包尾是<eof>,我們在可以從頭讀到尾,並把一個一個的包放入隊列,由處理線程去處理。 5、再有一種就是每個包有固定長度的header,這個header里包含一個包的長度信息,我們可以先從頭里讀出長度信息,然后再借着讀這么長的數據,完了這就是一個包。 關於封包的幾種類型我就想到這么多,其中的利弊大家一看便知,我就不忽悠了,本文主要介紹最后一種方式,好多網絡協議用的都是這種,包括CMPP協議,我們自己設計協議的時候一般不用像CMPP協議那樣,因為二進制協議雖然雖然節省網絡流量,但可讀性不好。出問題,抓個包分析起來太麻煩。我們可以用.net自帶的序列功能把要發送的類序列化成XML字符串發送出去,這多好看呀。 由於Socket緩沖區設置及其他的原因,Socket在接受數據的時候有時候不能完整的收到一個包,就是你讀出包的長度后,可能不能一次就讀取這么多數據。而如果讀個半截兒的包就用UTF8Encoding等來解析,會解析出亂碼的,我們這里用Encoding.UTF8.GetDecoder()來對包進行成塊兒的解析,它就是用來做這種事情的。 下面就來看一下代碼,代碼的注釋很全,演示了一個包從發到接受、解析的全過程,其中接受的過程沒有一次收全所有的包,而是收了好幾次,但我們最終還是成功的解析了收到的包。
public static void UnPack() { //1、聲明通過socket發送的字符串 string toSendStringBySocket = "娃娃士大夫%#¥%My name is 蛙蛙王子!!"; //2、轉換成utf-8字節數組 byte[] bsInput = Encoding.UTF8.GetBytes(toSendStringBySocket); //3、計算要發送的字節數組的長度,並寫到第一塊兒字節數組的開頭 //一般協議設計里都有一個長度的Header,這里就是寫這個Header int inputBytesCount = bsInput.Length; byte[] bs1 = new byte[4 + 3]; //4是一個int的長度,3是底一塊字節數組除了Header剩余的大小 Buffer.BlockCopy(BitConverter.GetBytes(inputBytesCount), 0, bs1, 0, 4); //4、把要發送的字節數組拆分成3塊兒發出去,因為socket在接受字節數組的時候 //也可能半截半截兒的接收,我們就是要模擬這種效果下的拆包,因為第一塊包寫了 //一個4個字節的Header,而第一塊字節數組長度是7,所以再寫三個字節長度的數據 int offSet = 0; Buffer.BlockCopy(bsInput, offSet, bs1, 4, 3); offSet += bs1.Length - 4; //5、寫第二塊兒數據 byte[] bs2 = new byte[8]; Buffer.BlockCopy(bsInput, offSet, bs2, 0, bs2.Length); offSet += bs2.Length; //6、寫第三塊兒數據,我們這里模擬在最后一塊數據的末尾加一些亂七八糟的數據 //這些亂七八糟的數據有可能是下一個包的header。 byte[] bs3 = new byte[bsInput.Length - offSet + 4]; Buffer.BlockCopy(bsInput, offSet, bs3, 0, bsInput.Length - offSet); Buffer.BlockCopy(new byte[] { 1, 2, 3, 4 }, 0, bs3, bs3.Length - 4, 4); //7、Socket的接收方在執行BeginReceive函數,並回調函數里把收到的數據放入一個隊列里 //dotNet的隊列內部就是一個環形數組,這里直接就當環形緩沖區來用了。 Queue<byte[]> bufferPool = new Queue<byte[]>(); bufferPool.Enqueue(bs1); bufferPool.Enqueue(bs2); bufferPool.Enqueue(bs3); //8、初始化一些變量准備解包 //聲明一個字符串緩沖區,大小是你的協議里規定的最大的包體長度 char[] chars = new char[256]; //定義一個UTF-8的Decoder,它可以成塊的解包,內部自動維護解析狀態 //關於它的使用請參考MSDN或者《.net框架設計》 Decoder d = Encoding.UTF8.GetDecoder(); int charLen = 0; //定義每次解包返回的字符長度 int parseBytesCount = 0; //定義已解包的字節數 int LenghHeader = 0; //定義收到包的長度 bool needReadLengthHeader = true; //是否需要讀取長度的頭 int srcOffSet = 0; //定義要解析的數據塊的偏移量 byte[] tempBuffer; //9、當環形緩沖里有數據的時候就一直解析 while (bufferPool.Count > 0) { //10、讀取數據包的長度信息,LengthHeader //因為第一塊兒包包含長度信息,所以要先讀出來 //讀了長度包后,要把數據庫解析偏移量加4 if(needReadLengthHeader) { LenghHeader = BitConverter.ToInt32(bs1, parseBytesCount); needReadLengthHeader = false; srcOffSet = 4; } //11、從環形緩沖區取出一塊兒數據 tempBuffer = bufferPool.Dequeue(); parseBytesCount += tempBuffer.Length-srcOffSet; //更改已解析的字節數 //12、如果已解析的字節數大於數據的長度,那么只解需要解析的字節 if (parseBytesCount > LenghHeader) { parseBytesCount -= tempBuffer.Length; d.GetChars(tempBuffer, srcOffSet, inputBytesCount - parseBytesCount, chars, charLen); //這里記錄下當前的臨時緩沖區已解析到了什么位置,准備解析下一個包 srcOffSet = inputBytesCount - parseBytesCount; // break; } //13、解析這半拉包 charLen += d.GetChars(tempBuffer, srcOffSet, tempBuffer.Length-srcOffSet, chars, charLen); srcOffSet = 0; } string s = new string(chars); //14、通知包處理線程來處理這個包 Console.WriteLine(s); }
實際場景中應用
using UnityEngine; using System.Collections; using System; using System.Threading; using System.Text; using System.Net; using System.Net.Sockets; using System.Collections.Generic; using System.IO; using System.Runtime.InteropServices; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; public class JFSocket { //Socket客戶端對象 private Socket clientSocket; //JFPackage.WorldPackage是我封裝的結構體, //在與服務器交互的時候會傳遞這個結構體 //當客戶端接到到服務器返回的數據包時,我把結構體add存在鏈表中。 public List<JFPackage.WorldPackage> worldpackage; //單例模式 private static JFSocket instance; public static JFSocket GetInstance() { if (instance == null) { instance = new JFSocket(); } return instance; } //單例的構造函數 JFSocket() { //創建Socket對象, 這里我的連接類型是TCP clientSocket = new Socket (AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp); //服務器IP地址 IPAddress ipAddress = IPAddress.Parse ("192.168.1.100"); //服務器端口 IPEndPoint ipEndpoint = new IPEndPoint (ipAddress, 10060); //這是一個異步的建立連接,當連接建立成功時調用connectCallback方法 IAsyncResult result = clientSocket.BeginConnect (ipEndpoint,new AsyncCallback (connectCallback),clientSocket); //這里做一個超時的監測,當連接超過5秒還沒成功表示超時 bool success = result.AsyncWaitHandle.WaitOne( 5000, true ); if ( !success ) { //超時 Closed(); Debug.Log("connect Time Out"); }else { //與socket建立連接成功,開啟線程接受服務端數據。 worldpackage = new List<JFPackage.WorldPackage>(); Thread thread = new Thread(new ThreadStart(ReceiveSorket)); thread.IsBackground = true; thread.Start(); } } private void connectCallback(IAsyncResult asyncConnect) { Debug.Log("connectSuccess"); } private void ReceiveSorket() { //在這個線程中接受服務器返回的數據 while (true) { if(!clientSocket.Connected) { //與服務器斷開連接跳出循環 Debug.Log("Failed to clientSocket server."); clientSocket.Close(); break; } try { //接受數據保存至bytes當中 byte[] bytes = new byte[4096]; //Receive方法中會一直等待服務端回發消息 //如果沒有回發會一直在這里等着。 int i = clientSocket.Receive(bytes); if(i <= 0) { clientSocket.Close(); break; } //這里條件可根據你的情況來判斷。 //因為我目前的項目先要監測包頭長度, //我的包頭長度是2,所以我這里有一個判斷 if(bytes.Length > 2) { SplitPackage(bytes,0); }else { Debug.Log("length is not > 2"); } } catch (Exception e) { Debug.Log("Failed to clientSocket error." + e); clientSocket.Close(); break; } } } private void SplitPackage(byte[] bytes , int index) { //在這里進行拆包,因為一次返回的數據包的數量是不定的 //所以需要給數據包進行查分。 while(true) { //包頭是2個字節 byte[] head = new byte[2]; int headLengthIndex = index + 2; //把數據包的前兩個字節拷貝出來 Array.Copy(bytes,index,head,0,2); //計算包頭的長度 short length = BitConverter.ToInt16(head,0); //當包頭的長度大於0 那么需要依次把相同長度的byte數組拷貝出來 if(length > 0) { byte[] data = new byte[length]; //拷貝出這個包的全部字節數 Array.Copy(bytes,headLengthIndex,data,0,length); //把數據包中的字節數組強制轉換成數據包的結構體 //BytesToStruct()方法就是用來轉換的 //這里需要和你們的服務端程序商量, JFPackage.WorldPackage wp = new JFPackage.WorldPackage(); wp = (JFPackage.WorldPackage)BytesToStruct(data,wp.GetType()); //把每個包的結構體對象添加至鏈表中。 worldpackage.Add(wp); //將索引指向下一個包的包頭 index = headLengthIndex + length; }else { //如果包頭為0表示沒有包了,那么跳出循環 break; } } } //向服務端發送一條字符串 //一般不會發送字符串 應該是發送數據包 public void SendMessage(string str) { byte[] msg = Encoding.UTF8.GetBytes(str); if(!clientSocket.Connected) { clientSocket.Close(); return; } try { //int i = clientSocket.Send(msg); IAsyncResult asyncSend = clientSocket.BeginSend (msg,0,msg.Length,SocketFlags.None,new AsyncCallback (sendCallback),clientSocket); bool success = asyncSend.AsyncWaitHandle.WaitOne( 5000, true ); if ( !success ) { clientSocket.Close(); Debug.Log("Failed to SendMessage server."); } } catch { Debug.Log("send message error" ); } } //向服務端發送數據包,也就是一個結構體對象 public void SendMessage(object obj) { if(!clientSocket.Connected) { clientSocket.Close(); return; } try { //先得到數據包的長度 short size = (short)Marshal.SizeOf(obj); //把數據包的長度寫入byte數組中 byte [] head = BitConverter.GetBytes(size); //把結構體對象轉換成數據包,也就是字節數組 byte[] data = StructToBytes(obj); //此時就有了兩個字節數組,一個是標記數據包的長度字節數組, 一個是數據包字節數組, //同時把這兩個字節數組合並成一個字節數組 byte[] newByte = new byte[head.Length + data.Length]; Array.Copy(head,0,newByte,0,head.Length); Array.Copy(data,0,newByte,head.Length, data.Length); //計算出新的字節數組的長度 int length = Marshal.SizeOf(size) + Marshal.SizeOf(obj); //向服務端異步發送這個字節數組 IAsyncResult asyncSend = clientSocket.BeginSend (newByte,0,length,SocketFlags.None,new AsyncCallback (sendCallback),clientSocket); //監測超時 bool success = asyncSend.AsyncWaitHandle.WaitOne( 5000, true ); if ( !success ) { clientSocket.Close(); Debug.Log("Time Out !"); } } catch (Exception e) { Debug.Log("send message error: " + e ); } } //結構體轉字節數組 public byte[] StructToBytes(object structObj) { int size = Marshal.SizeOf(structObj); IntPtr buffer = Marshal.AllocHGlobal(size); try { Marshal.StructureToPtr(structObj,buffer,false); byte[] bytes = new byte[size]; Marshal.Copy(buffer, bytes,0,size); return bytes; } finally { Marshal.FreeHGlobal(buffer); } } //字節數組轉結構體 public object BytesToStruct(byte[] bytes, Type strcutType) { int size = Marshal.SizeOf(strcutType); IntPtr buffer = Marshal.AllocHGlobal(size); try { Marshal.Copy(bytes,0,buffer,size); return Marshal.PtrToStructure(buffer, strcutType); } finally { Marshal.FreeHGlobal(buffer); } } private void sendCallback (IAsyncResult asyncSend) { } //關閉Socket public void Closed() { if(clientSocket != null && clientSocket.Connected) { clientSocket.Shutdown(SocketShutdown.Both); clientSocket.Close(); } clientSocket = null; } }
為了與服務端達成默契,判斷數據包是否完成。我們需要在數據包中定義包頭 ,包頭一般是這個數據包的長度,也就是結構體對象的長度。正如代碼中我們把兩個數據類型 short 和 object 合並成一個新的字節數組。
然后是數據包結構體的定義,需要注意如果你在做IOS和Android的話數據包中不要包含數組,不然在結構體轉換byte數組的時候會出錯。
Marshal.StructureToPtr () error : Attempting to JIT compile method
JFPackage.cs
using UnityEngine; using System.Collections; using System.Runtime.InteropServices; public class JFPackage { //結構體序列化 [System.Serializable] //4字節對齊 iphone 和 android上可以1字節對齊 [StructLayout(LayoutKind.Sequential, Pack = 4)] public struct WorldPackage { public byte mEquipID; public byte mAnimationID; public byte mHP; public short mPosx; public short mPosy; public short mPosz; public short mRosx; public short mRosy; public short mRosz; public WorldPackage(short posx,short posy,short posz, short rosx, short rosy, short rosz,byte equipID,byte animationID,byte hp) { mPosx = posx; mPosy = posy; mPosz = posz; mRosx = rosx; mRosy = rosy; mRosz = rosz; mEquipID = equipID; mAnimationID = animationID; mHP = hp; } }; }
讓角色發生移動的時候,調用該方法向服務端發送數據。
oid SendPlayerWorldMessage() { //組成新的結構體對象,包括主角坐標旋轉等。 Vector3 PlayerTransform = transform.localPosition; Vector3 PlayerRotation = transform.localRotation.eulerAngles; //用short的話是2字節,為了節省包的長度。這里乘以100 避免使用float 4字節。當服務器接受到的時候小數點向前移動兩位就是真實的float數據 short px = (short)(PlayerTransform.x*100); short py = (short)(PlayerTransform.y*100); short pz = (short)(PlayerTransform.z*100); short rx = (short)(PlayerRotation.x*100); short ry = (short)(PlayerRotation.y*100); short rz = (short)(PlayerRotation.z*100); byte equipID = 1; byte animationID =9; byte hp = 2; JFPackage.WorldPackage wordPackage = new JFPackage.WorldPackage(px,py,pz,rx,ry,rz,equipID,animationID,hp); //通過Socket發送結構體對象 mJFsorket.SendMessage(wordPackage); }
接着就是客戶端同步服務器的數據,目前是測試階段所以寫的比較簡陋,不過原理都是一樣的
//上次同步時間 private float mSynchronous; void Update () { mSynchronous +=Time.deltaTime; //在Update中每0.5s的時候同步一次 if(mSynchronous > 0.5f) { int count = mJFsorket.worldpackage.Count; //當接受到的數據包長度大於0 開始同步 if(count > 0) { //遍歷數據包中 每個點的坐標 foreach(JFPackage.WorldPackage wp in mJFsorket.worldpackage) { float x = (float)(wp.mPosx / 100.0f); float y = (float)(wp.mPosy /100.0f); float z = (float)(wp.mPosz /100.0f); Debug.Log("x = " + x + " y = " + y+" z = " + z); //同步主角的新坐標 mPlayer.transform.position = new Vector3 (x,y,z); } //清空數據包鏈表 mJFsorket.worldpackage.Clear(); } mSynchronous = 0; } }
