通過系列二 我們已經實現了socket的簡單通信 接下來我們測試一下,在時間應用的場景下,我們會快速且大量的傳輸數據的情況!
class Program { static void Main(string[] args) { TCPListener tcp = new TCPListener(); TSocketClient client = new TSocketClient(); ; i < ; i++) { client.SendMsg(System.Text.UTF8Encoding.Default.GetBytes("Holle Server!")); } Console.ReadLine(); } }
我們通過測試代碼快速發送10條消息到服務器去,
我們看看運行結果
這樣不難看出,我們的客戶端發送了10條消息,但是服務器收到的時候變成了兩條消息,回復客戶端自然就變成兩次回復。
這是為什么呢?
我們修改一下程序一秒鍾發送一次消息試試
class Program { static void Main(string[] args) { TCPListener tcp = new TCPListener(); TSocketClient client = new TSocketClient(); ; i < ; i++) { Thread.Sleep(); client.SendMsg(System.Text.UTF8Encoding.Default.GetBytes("Holle Server!")); } Console.ReadLine(); } }
運行看看,
這次對了那么分析分析到底為什么呢?這是socket的底層,做的手腳。因為我設置socket的發送和接受緩沖區//10K的緩沖區空間 private int BufferSize = 10 * 1024; 10k的緩沖區,且socket的底層 發送消息會有一定間隙,雖然這個時間很短,但是我們直接for循環發送的話,時間同意很快,因為socket.send()方法並非真實的發送數據而是把數據壓入發送緩沖區。那么我們就明白了為什么會出現上面的情況出現了這樣的情況我們要怎么解決呢?時間應用場景不可能1秒鍾才一條消息啥。我們知道了導致這個問題的原因是因為消息發送是出現了快速壓入很多發送消息到待發送緩沖區里面一起發送導致的。這樣情況就是粘包了,那么我們是不是可以考慮給每一個消息加入包標識呢?
接下來我們修改一下發送包的數據代碼
創建消息的構造體 TSocketMessage
/// <summary> /// 底層通信消息 /// </summary> public class TSocketMessage : IDisposable { /// <summary> /// 消息ID /// </summary> public int MsgID; /// <summary> /// 消息內容 /// </summary> public byte[] MsgBuffer; public TSocketMessage(int msgID, byte[] msg) { this.MsgID = msgID; this.MsgBuffer = msg; } public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool flag1) { if (flag1) { this.MsgBuffer = null; } } }
接下來我們創建消息包的封裝和拆分 MarshalEndian
public class MarshalEndian { //用於存儲剩余未解析的字節數 ); //默認是utf8的編碼格式 private UTF8Encoding utf8 = new UTF8Encoding(); //包頭1 const Int16 t1 = 0x55; //包頭2 const Int16 t2 = 0xAA; //字節數常量 兩個包頭4個字節,一個消息id4個字節,封裝消息長度 long 8個字節 const long ConstLenght = 12L; public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool flag1) { if (flag1) { IDisposable disposable2 = this.utf8 as IDisposable; if (disposable2 != null) { disposable2.Dispose(); } IDisposable disposable = this._LBuff as IDisposable; if (disposable != null) { disposable.Dispose(); } } } public byte[] Encode(TSocketMessage msg) { MemoryStream ms = new MemoryStream(); BinaryWriter bw = new BinaryWriter(ms, new UTF8Encoding()); byte[] msgBuffer = msg.MsgBuffer; #region 封裝包頭 bw.Write((Int16)t1); bw.Write((Int16)t2); #endregion #region 包協議 if (msgBuffer != null) { bw.Write((Int64)(msgBuffer.Length + )); bw.Write(msg.MsgID); bw.Write(msgBuffer); } ); } #endregion bw.Close(); ms.Close(); bw.Dispose(); ms.Dispose(); return ms.ToArray(); } public List<TSocketMessage> GetDcAppMess(byte[] buff, int len) { //拷貝本次的有效字節 byte[] _b = new byte[len]; Array.Copy(buff, , _b, , _b.Length); buff = _b; ) { //拷貝之前遺留的字節 this._LBuff.AddRange(_b); buff = this._LBuff.ToArray(); this._LBuff.Clear(); ); } List<TSocketMessage> list = new List<TSocketMessage>(); MemoryStream ms = new MemoryStream(buff); BinaryReader buffers = new BinaryReader(ms, this.utf8); try { byte[] _buff; Label_0073: //判斷本次解析的字節是否滿足常量字節數 if ((buffers.BaseStream.Length - buffers.BaseStream.Position) < ConstLenght) { _buff = new byte[(int)(buffers.BaseStream.Length - buffers.BaseStream.Position)]; Array.Copy(buff, (, _buff.Length); this._LBuff.AddRange(_buff); return list; } #region 包頭讀取 //循環讀取包頭 Label_00983: Int16 tt1 = buffers.ReadInt16(); Int16 tt2 = buffers.ReadInt16(); if (!(tt1 == t1 && tt2 == t2)) { , SeekOrigin.Current); goto Label_00983; } #endregion #region 包協議 long offset = buffers.ReadInt64(); #endregion #region 包解析 //剩余字節數大於本次需要讀取的字節數 if (offset < (buffers.BaseStream.Length - buffers.BaseStream.Position)) { int msgID = buffers.ReadInt32(); _buff = ]; Array.Copy(buff, (, _buff.Length); list.Add(new TSocketMessage(msgID, _buff)); //設置偏移量 然后繼續循環讀取 buffers.BaseStream.Seek(offset, SeekOrigin.Current); goto Label_0073; } else if (offset == (buffers.BaseStream.Length - buffers.BaseStream.Position)) { int msgID = buffers.ReadInt32(); //剩余字節數剛好等於本次讀取的字節數 _buff = ]; Array.Copy(buff, (, _buff.Length); list.Add(new TSocketMessage(msgID, _buff)); } else { //剩余字節數剛好小於本次讀取的字節數 存起來,等待接受剩余字節數一起解析 _buff = new byte[(int)(buffers.BaseStream.Length - buffers.BaseStream.Position + ConstLenght)]; Array.Copy(buff, (, _buff.Length); buff = _buff; this._LBuff.AddRange(_buff); } #endregion } catch { } finally { if (buffers != null) { buffers.Dispose(); } buffers.Close(); if (buffers != null) { buffers.Dispose(); } ms.Close(); if (ms != null) { ms.Dispose(); } } return list; } }
接下來我們修改一下 TSocketBase 的 抽象方法
public abstract void Receive(TSocketMessage msg);
在修改接受消息回調函數
/// <summary> /// 消息解析器 /// </summary> MarshalEndian mersha = new MarshalEndian(); /// <summary> /// 接收消息回調函數 /// </summary> /// <param name="iar"></param> private void ReceiveCallback(IAsyncResult iar) { if (!this.IsDispose) { try { //接受消息 ReceiveSize = _Socket.EndReceive(iar, out ReceiveError); //檢查狀態碼 if (!CheckSocketError(ReceiveError) && SocketError.Success == ReceiveError) { //判斷接受的字節數 ) { byte[] rbuff = new byte[ReceiveSize]; Array.Copy(this.Buffers, rbuff, ReceiveSize); var msgs = mersha.GetDcAppMess(rbuff, ReceiveSize); foreach (var msg in msgs) { this.Receive(msg); } //重置連續收到空字節數 ZeroCount = ; //繼續開始異步接受消息 ReceiveAsync(); } else { ZeroCount++; ) { this.Close("錯誤鏈接"); } } } } catch (System.Net.Sockets.SocketException) { this.Close("鏈接已經被關閉"); } catch (System.ObjectDisposedException) { this.Close("鏈接已經被關閉"); } } }
這樣我們完成了在收到消息后對數據包的解析。
修改一下TSocketClient的 Receive 重寫方法
/// <summary> /// 收到消息后 /// </summary> /// <param name="rbuff"></param> public override void Receive(TSocketMessage msg) { Console.WriteLine("Receive ID:" + msg.MsgID + " Msg:" + System.Text.UTF8Encoding.Default.GetString(msg.MsgBuffer)); if (isServer) { this.SendMsg(new TSocketMessage(msg.MsgID, System.Text.UTF8Encoding.Default.GetBytes("Holle Client!"))); } }
修改測試代碼如下
class Program { static void Main(string[] args) { TCPListener tcp = new TCPListener(); TSocketClient client = new TSocketClient(); ; i < ; i++) { Thread.Sleep(); client.SendMsg(new TSocketMessage(i, System.Text.UTF8Encoding.Default.GetBytes("Holle Server!"))); } Console.ReadLine(); } }
運行結果
接受成功了,那么我們取消暫停狀態,快速發送消息試試
class Program { static void Main(string[] args) { TCPListener tcp = new TCPListener(); TSocketClient client = new TSocketClient(); for(i=1 ; i < 5; i++) { client.SendMsg(new TSocketMessage(i, System.Text.UTF8Encoding.Default.GetBytes("Holle Server!"))); } Console.ReadLine(); } }
看看運行結果
瞬間完成了消息發送,也沒有再出現第一次運行的那樣~!
這樣完美的解決了socket通信 在傳輸上發送粘包問題
謝謝園友發現的問題,
- 問題是這樣的原本的解包和封包的測試代碼不夠嚴謹導致解析包出現錯誤感謝園友發現問題,並提出問題。
附上最新的代碼
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; namespace TSocket { public class MarshalEndian { //用於存儲剩余未解析的字節數//默認是utf8的編碼格式 private UTF8Encoding utf8 = new UTF8Encoding(); //包頭1 const Int16 t1 = 0x55; //包頭2 const Int16 t2 = 0xAA; //字節數常量 兩個包頭4個字節,一個消息id4個字節,封裝消息長度 int32 4個字節public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool flag1) { if (flag1) { IDisposable disposable2 = this.utf8 as IDisposable; if (disposable2 != null) { disposable2.Dispose(); } IDisposable disposable = this._LBuff as IDisposable; if (disposable != null) { disposable.Dispose(); } } } public byte[] Encode(TSocketMessage msg) { MemoryStream ms = new MemoryStream(); BinaryWriter bw = new BinaryWriter(ms, new UTF8Encoding()); byte[] msgBuffer = msg.MsgBuffer; #region 封裝包頭 bw.Write((Int16)t1); bw.Write((Int16)t2); #endregion #region 包協議 if (msgBuffer != null) { bw.Write((Int32)(msgBuffer.Length + )); bw.Write(msg.MsgID); bw.Write(msgBuffer); } ); } #endregion bw.Close(); ms.Close(); bw.Dispose(); ms.Dispose(); return ms.ToArray(); } public List<TSocketMessage> GetDcAppMess(byte[] buff, int len) { //拷貝本次的有效字節 byte[] _b = new byte[len]; Array.Copy(buff, , _b, , _b.Length); buff = _b; ) { //拷貝之前遺留的字節 this._LBuff.AddRange(_b); buff = this._LBuff.ToArray(); this._LBuff.Clear(); ); } List<TSocketMessage> list = new List<TSocketMessage>(); MemoryStream ms = new MemoryStream(buff); BinaryReader buffers = new BinaryReader(ms, this.utf8); try { byte[] _buff; Label_00983: #region 包頭讀取 //循環讀取包頭 //判斷本次解析的字節是否滿足常量字節數 if ((buffers.BaseStream.Length - buffers.BaseStream.Position) < ConstLenght) { _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position)); this._LBuff.AddRange(_buff); return list; } Int16 tt1 = buffers.ReadInt16(); Int16 tt2 = buffers.ReadInt16(); if (!(tt1 == t1 && tt2 == t2)) { , SeekOrigin.Current); goto Label_00983; } #endregion #region 包協議 int offset = buffers.ReadInt32(); #endregion #region 包解析 //剩余字節數大於本次需要讀取的字節數 if (offset <= (buffers.BaseStream.Length - buffers.BaseStream.Position)) { int msgID = buffers.ReadInt32(); _buff = buffers.ReadBytes(offset - ); list.Add(new TSocketMessage(msgID, _buff)); ) { goto Label_00983; } } else { //剩余字節數剛好小於本次讀取的字節數 存起來,等待接受剩余字節數一起解析 _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position + ConstLenght)); this._LBuff.AddRange(_buff); } #endregion } catch (Exception ex) { Console.WriteLine(ex); } finally { if (buffers != null) { buffers.Dispose(); } buffers.Close(); if (buffers != null) { buffers.Dispose(); } ms.Close(); if (ms != null) { ms.Dispose(); } } return list; } } }