通過系列二 我們已經實現了socket的簡單通信 接下來我們測試一下,在時間應用的場景下,我們會快速且大量的傳輸數據的情況!
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 TCPListener tcp = new TCPListener();
6 TSocketClient client = new TSocketClient();
7 for (int i = 0; i < 10; i++)
8 {
9 client.SendMsg(System.Text.UTF8Encoding.Default.GetBytes("Holle Server!"));
10 }
11 Console.ReadLine();
12 }
13 }
我們通過測試代碼快速發送10條消息到服務器去,
我們看看運行結果

這樣不難看出,我們的客戶端發送了10條消息,但是服務器收到的時候變成了兩條消息,回復客戶端自然就變成兩次回復。
這是為什么呢?
我們修改一下程序一秒鍾發送一次消息試試
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 TCPListener tcp = new TCPListener();
6 TSocketClient client = new TSocketClient();
7 for (int i = 0; i < 5; i++)
8 {
9 Thread.Sleep(1000);
10 client.SendMsg(System.Text.UTF8Encoding.Default.GetBytes("Holle Server!"));
11 }
12 Console.ReadLine();
13 }
14 }
運行看看,

這次對了那么分析分析到底為什么呢?
這是socket的底層,做的手腳。因為我設置socket的發送和接受緩沖區
//10K的緩沖區空間 private int BufferSize = 10 * 1024; 10k的緩沖區,
且socket的底層 發送消息會有一定間隙,雖然這個時間很短,但是我們直接for循環發送的話,時間同意很快,
因為socket.send()方法並非真實的發送數據而是把數據壓入發送緩沖區。
那么我們就明白了為什么會出現上面的情況
出現了這樣的情況我們要怎么解決呢?
時間應用場景不可能1秒鍾才一條消息啥。
我們知道了導致這個問題的原因是因為消息發送是出現了快速壓入很多發送消息到待發送緩沖區里面一起發送導致的。這樣情況就是粘包了,那么我們是不是可以考慮給每一個消息加入包標識呢?
接下來我們修改一下發送包的數據代碼
創建消息的構造體 TSocketMessage
1 /// <summary>
2 /// 底層通信消息
3 /// </summary>
4 public class TSocketMessage : IDisposable
5 {
6 /// <summary>
7 /// 消息ID
8 /// </summary>
9 public int MsgID;
10 /// <summary>
11 /// 消息內容
12 /// </summary>
13 public byte[] MsgBuffer;
14
15 public TSocketMessage(int msgID, byte[] msg)
16 {
17 this.MsgID = msgID;
18 this.MsgBuffer = msg;
19 }
20
21 public void Dispose()
22 {
23 this.Dispose(true);
24 GC.SuppressFinalize(this);
25 }
26
27 protected virtual void Dispose(bool flag1)
28 {
29 if (flag1) { this.MsgBuffer = null; }
30 }
31 }
接下來我們創建消息包的封裝和拆分 MarshalEndian
1 public class MarshalEndian
2 {
3 //用於存儲剩余未解析的字節數
4 private List<byte> _LBuff = new List<byte>(2);
5 //默認是utf8的編碼格式
6 private UTF8Encoding utf8 = new UTF8Encoding();
7
8 //包頭1
9 const Int16 t1 = 0x55;
10 //包頭2
11 const Int16 t2 = 0xAA;
12 //字節數常量 兩個包頭4個字節,一個消息id4個字節,封裝消息長度 long 8個字節
13 const long ConstLenght = 12L;
14
15 public void Dispose()
16 {
17 this.Dispose(true);
18 GC.SuppressFinalize(this);
19 }
20
21 protected virtual void Dispose(bool flag1)
22 {
23 if (flag1)
24 {
25 IDisposable disposable2 = this.utf8 as IDisposable;
26 if (disposable2 != null) { disposable2.Dispose(); }
27 IDisposable disposable = this._LBuff as IDisposable;
28 if (disposable != null) { disposable.Dispose(); }
29 }
30 }
31
32 public byte[] Encode(TSocketMessage msg)
33 {
34 MemoryStream ms = new MemoryStream();
35 BinaryWriter bw = new BinaryWriter(ms, new UTF8Encoding());
36 byte[] msgBuffer = msg.MsgBuffer;
37
38 #region 封裝包頭
39 bw.Write((Int16)t1);
40 bw.Write((Int16)t2);
41 #endregion
42
43 #region 包協議
44 if (msgBuffer != null)
45 {
46 bw.Write((Int64)(msgBuffer.Length + 4));
47 bw.Write(msg.MsgID);
48 bw.Write(msgBuffer);
49 }
50 else { bw.Write((Int64)0); }
51 #endregion
52
53 bw.Close();
54 ms.Close();
55 bw.Dispose();
56 ms.Dispose();
57 return ms.ToArray();
58 }
59
60 public List<TSocketMessage> GetDcAppMess(byte[] buff, int len)
61 {
62 //拷貝本次的有效字節
63 byte[] _b = new byte[len];
64 Array.Copy(buff, 0, _b, 0, _b.Length);
65 buff = _b;
66 if (this._LBuff.Count > 0)
67 {
68 //拷貝之前遺留的字節
69 this._LBuff.AddRange(_b);
70 buff = this._LBuff.ToArray();
71 this._LBuff.Clear();
72 this._LBuff = new List<byte>(2);
73 }
74
75 List<TSocketMessage> list = new List<TSocketMessage>();
76 MemoryStream ms = new MemoryStream(buff);
77 BinaryReader buffers = new BinaryReader(ms, this.utf8);
78 try
79 {
80 byte[] _buff;
81 Label_0073:
82 //判斷本次解析的字節是否滿足常量字節數
83 if ((buffers.BaseStream.Length - buffers.BaseStream.Position) < ConstLenght)
84 {
85 _buff = new byte[(int)(buffers.BaseStream.Length - buffers.BaseStream.Position)];
86 Array.Copy(buff, (int)buffers.BaseStream.Position, _buff, 0, _buff.Length);
87 this._LBuff.AddRange(_buff);
88 return list;
89 }
90 #region 包頭讀取
91 //循環讀取包頭
92 Label_00983:
93 Int16 tt1 = buffers.ReadInt16();
94 Int16 tt2 = buffers.ReadInt16();
95 if (!(tt1 == t1 && tt2 == t2))
96 {
97 long ttttt = buffers.BaseStream.Seek(-3, SeekOrigin.Current);
98 goto Label_00983;
99 }
100 #endregion
101
102 #region 包協議
103 long offset = buffers.ReadInt64();
104 #endregion
105
106 #region 包解析
107 //剩余字節數大於本次需要讀取的字節數
108 if (offset < (buffers.BaseStream.Length - buffers.BaseStream.Position))
109 {
110 int msgID = buffers.ReadInt32();
111 _buff = new byte[offset - 4];
112 Array.Copy(buff, (int)buffers.BaseStream.Position, _buff, 0, _buff.Length);
113 list.Add(new TSocketMessage(msgID, _buff));
114 //設置偏移量 然后繼續循環讀取
115 buffers.BaseStream.Seek(offset, SeekOrigin.Current);
116 goto Label_0073;
117 }
118 else if (offset == (buffers.BaseStream.Length - buffers.BaseStream.Position))
119 {
120 int msgID = buffers.ReadInt32();
121 //剩余字節數剛好等於本次讀取的字節數
122 _buff = new byte[offset - 4];
123 Array.Copy(buff, (int)buffers.BaseStream.Position, _buff, 0, _buff.Length);
124 list.Add(new TSocketMessage(msgID, _buff));
125 }
126 else
127 {
128 //剩余字節數剛好小於本次讀取的字節數 存起來,等待接受剩余字節數一起解析
129 _buff = new byte[(int)(buffers.BaseStream.Length - buffers.BaseStream.Position + ConstLenght)];
130 Array.Copy(buff, (int)(buffers.BaseStream.Position - ConstLenght), _buff, 0, _buff.Length);
131 buff = _buff;
132 this._LBuff.AddRange(_buff);
133 }
134 #endregion
135
136 }
137 catch { }
138 finally
139 {
140 if (buffers != null) { buffers.Dispose(); }
141 buffers.Close();
142 if (buffers != null) { buffers.Dispose(); }
143 ms.Close();
144 if (ms != null) { ms.Dispose(); }
145 }
146 return list;
147 }
148 }
接下來我們修改一下 TSocketBase 的 抽象方法
1 public abstract void Receive(TSocketMessage msg);
在修改接受消息回調函數
1 /// <summary>
2 /// 消息解析器
3 /// </summary>
4 MarshalEndian mersha = new MarshalEndian();
5
6 /// <summary>
7 /// 接收消息回調函數
8 /// </summary>
9 /// <param name="iar"></param>
10 private void ReceiveCallback(IAsyncResult iar)
11 {
12 if (!this.IsDispose)
13 {
14 try
15 {
16 //接受消息
17 ReceiveSize = _Socket.EndReceive(iar, out ReceiveError);
18 //檢查狀態碼
19 if (!CheckSocketError(ReceiveError) && SocketError.Success == ReceiveError)
20 {
21 //判斷接受的字節數
22 if (ReceiveSize > 0)
23 {
24 byte[] rbuff = new byte[ReceiveSize];
25 Array.Copy(this.Buffers, rbuff, ReceiveSize);
26 var msgs = mersha.GetDcAppMess(rbuff, ReceiveSize);
27 foreach (var msg in msgs)
28 {
29 this.Receive(msg);
30 }
31 //重置連續收到空字節數
32 ZeroCount = 0;
33 //繼續開始異步接受消息
34 ReceiveAsync();
35 }
36 else
37 {
38 ZeroCount++;
39 if (ZeroCount == 5) { this.Close("錯誤鏈接"); }
40 }
41 }
42 }
43 catch (System.Net.Sockets.SocketException) { this.Close("鏈接已經被關閉"); }
44 catch (System.ObjectDisposedException) { this.Close("鏈接已經被關閉"); }
45 }
46 }
這樣我們完成了在收到消息后對數據包的解析。
修改一下TSocketClient的 Receive 重寫方法
1 /// <summary>
2 /// 收到消息后
3 /// </summary>
4 /// <param name="rbuff"></param>
5 public override void Receive(TSocketMessage msg)
6 {
7 Console.WriteLine("Receive ID:" + msg.MsgID + " Msg:" + System.Text.UTF8Encoding.Default.GetString(msg.MsgBuffer));
8 if (isServer)
9 {
10 this.SendMsg(new TSocketMessage(msg.MsgID, System.Text.UTF8Encoding.Default.GetBytes("Holle Client!")));
11 }
12 }
修改測試代碼如下
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 TCPListener tcp = new TCPListener();
6 TSocketClient client = new TSocketClient();
7 for (int i = 1; i < 5; i++)
8 {
9 Thread.Sleep(1000);
10 client.SendMsg(new TSocketMessage(i, System.Text.UTF8Encoding.Default.GetBytes("Holle Server!")));
11 }
12 Console.ReadLine();
13 }
14 }
運行結果

接受成功了,那么我們取消暫停狀態,快速發送消息試試
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 TCPListener tcp = new TCPListener();
6 TSocketClient client = new TSocketClient();
7 for (int i = 1; i < 5; i++)
8 {
9 client.SendMsg(new TSocketMessage(i, System.Text.UTF8Encoding.Default.GetBytes("Holle Server!")));
10 }
11 Console.ReadLine();
12 }
13 }
看看運行結果

瞬間完成了消息發送,也沒有再出現第一次運行的那樣~!
這樣完美的解決了socket通信 在傳輸上發送粘包問題
謝謝園友發現的問題,
問題是這樣的原本的解包和封包的測試代碼不夠嚴謹導致解析包出現錯誤
感謝園友發現問題,並提出問題。


附上最新的代碼
1 using System;
2 using System.Collections.Generic;
3 using System.IO;
4 using System.Linq;
5 using System.Text;
6 using System.Threading.Tasks;
7
8 namespace TSocket
9 {
10 public class MarshalEndian
11 {
12 //用於存儲剩余未解析的字節數
13 private List<byte> _LBuff = new List<byte>(2);
14 //默認是utf8的編碼格式
15 private UTF8Encoding utf8 = new UTF8Encoding();
16
17 //包頭1
18 const Int16 t1 = 0x55;
19 //包頭2
20 const Int16 t2 = 0xAA;
21 //字節數常量 兩個包頭4個字節,一個消息id4個字節,封裝消息長度 int32 4個字節
22 const Int32 ConstLenght = 8;
23
24 public void Dispose()
25 {
26 this.Dispose(true);
27 GC.SuppressFinalize(this);
28 }
29
30 protected virtual void Dispose(bool flag1)
31 {
32 if (flag1)
33 {
34 IDisposable disposable2 = this.utf8 as IDisposable;
35 if (disposable2 != null) { disposable2.Dispose(); }
36 IDisposable disposable = this._LBuff as IDisposable;
37 if (disposable != null) { disposable.Dispose(); }
38 }
39 }
40
41 public byte[] Encode(TSocketMessage msg)
42 {
43 MemoryStream ms = new MemoryStream();
44 BinaryWriter bw = new BinaryWriter(ms, new UTF8Encoding());
45 byte[] msgBuffer = msg.MsgBuffer;
46
47 #region 封裝包頭
48 bw.Write((Int16)t1);
49 bw.Write((Int16)t2);
50 #endregion
51
52 #region 包協議
53 if (msgBuffer != null)
54 {
55 bw.Write((Int32)(msgBuffer.Length + 4));
56 bw.Write(msg.MsgID);
57 bw.Write(msgBuffer);
58 }
59 else { bw.Write((Int32)0); }
60 #endregion
61
62 bw.Close();
63 ms.Close();
64 bw.Dispose();
65 ms.Dispose();
66 return ms.ToArray();
67 }
68
69 public List<TSocketMessage> GetDcAppMess(byte[] buff, int len)
70 {
71 //拷貝本次的有效字節
72 byte[] _b = new byte[len];
73 Array.Copy(buff, 0, _b, 0, _b.Length);
74 buff = _b;
75 if (this._LBuff.Count > 0)
76 {
77 //拷貝之前遺留的字節
78 this._LBuff.AddRange(_b);
79 buff = this._LBuff.ToArray();
80 this._LBuff.Clear();
81 this._LBuff = new List<byte>(2);
82 }
83
84 List<TSocketMessage> list = new List<TSocketMessage>();
85 MemoryStream ms = new MemoryStream(buff);
86 BinaryReader buffers = new BinaryReader(ms, this.utf8);
87 try
88 {
89 byte[] _buff;
90 Label_00983:
91
92 #region 包頭讀取
93 //循環讀取包頭
94 //判斷本次解析的字節是否滿足常量字節數
95 if ((buffers.BaseStream.Length - buffers.BaseStream.Position) < ConstLenght)
96 {
97 _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position));
98 this._LBuff.AddRange(_buff);
99 return list;
100 }
101 Int16 tt1 = buffers.ReadInt16();
102 Int16 tt2 = buffers.ReadInt16();
103 if (!(tt1 == t1 && tt2 == t2))
104 {
105 long ttttt = buffers.BaseStream.Seek(-3, SeekOrigin.Current);
106 goto Label_00983;
107 }
108 #endregion
109
110 #region 包協議
111 int offset = buffers.ReadInt32();
112 #endregion
113
114 #region 包解析
115 //剩余字節數大於本次需要讀取的字節數
116 if (offset <= (buffers.BaseStream.Length - buffers.BaseStream.Position))
117 {
118 int msgID = buffers.ReadInt32();
119 _buff = buffers.ReadBytes(offset - 4);
120 list.Add(new TSocketMessage(msgID, _buff));
121 if ((buffers.BaseStream.Length - buffers.BaseStream.Position) > 0)
122 {
123 goto Label_00983;
124 }
125 }
126 else
127 {
128 //剩余字節數剛好小於本次讀取的字節數 存起來,等待接受剩余字節數一起解析
129 _buff = buffers.ReadBytes((int)(buffers.BaseStream.Length - buffers.BaseStream.Position + ConstLenght));
130 this._LBuff.AddRange(_buff);
131 }
132 #endregion
133 }
134 catch (Exception ex) { Console.WriteLine(ex); }
135 finally
136 {
137 if (buffers != null) { buffers.Dispose(); }
138 buffers.Close();
139 if (buffers != null) { buffers.Dispose(); }
140 ms.Close();
141 if (ms != null) { ms.Dispose(); }
142 }
143 return list;
144 }
145 }
146 }

