因為沒有文件上傳,沒有大的字節傳輸,數據過來就放到隊列,所以沒有用異步,使用同步方式接收數據。
原理:
1.前面四個字節是消息頭,存放消息體長度;
2.后面字節定義消息體;
3.服務端收到消息后,先獲取消息頭部,如果不夠繼續接收;如果夠了則根據頭部計算出消息體長度;
4.根據消息頭標記的長度獲取消息體,如果不夠,繼續接收;如果夠了或者有多余,重新獲取消息頭部,不停的循環;
private void Receive(Socket socket, string ip) { Task.Factory.StartNew(() => { var pack = new BytePkg(); while (true) { try { //如果socket已經斷開,結束循環 if (!socket.Connected) { _logger.Warn($"IP:{ip},socket已斷開,停止接收數據;"); break; } byte[] prevBytes = new byte[1024];//單次最多可接收的字節 int len = socket.Receive(prevBytes, prevBytes.Length, SocketFlags.None); var bytes = prevBytes.Take(len).ToArray();//實際接收的字節 this.RcvHeadData(pack, bytes); } catch (Exception ex) { _logger.Error($"IP:{ip},接收socket數據異常,message:{ex.Message},stackTrace:{ex.StackTrace};"); } } }); }
/// <summary> /// 接收消息頭 /// </summary> /// <param name="pack"></param> /// <param name="bytes"></param> private void RcvHeadData(BytePkg pack, byte[] bytes) { var len = bytes.Length; pack.headIndex += len; if (pack.headIndex < pack.headLen) { for (int x = 0; x < len; x++) { pack.headBuff[pack.headIndex - len + x] = bytes[x]; }; } else { var actualHeadLen = pack.headIndex - len;//head的實際長度 var skipHeadLen = pack.headLen - actualHeadLen;//需要補上的長度;head定義長度 - head的實際長度 = body需要跳過的長度 for (int x = 0; x < skipHeadLen; x++) { pack.headBuff[actualHeadLen + x] = bytes[x]; } //★★★★★開始處理消息體部分★★★★★ var bodyLen = len;//身體長度 if (skipHeadLen > 0) { bodyLen = len - skipHeadLen;//第一次,獲取剩余部分的長度 pack.InitBodyBuff();//第一次,需要初始化body } this.RcvBodyData(pack, bytes.Skip(skipHeadLen).Take(bodyLen).ToArray()); } } /// <summary> /// 接收消息體 /// </summary> /// <param name="pack"></param> /// <param name="bytes"></param> private void RcvBodyData(BytePkg pack, byte[] bytes) { var len = bytes.Length; pack.bodyIndex += len; if (pack.bodyIndex < pack.bodyLen) { for (int x = 0; x < len; x++) { pack.bodyBuff[pack.bodyIndex - len + x] = bytes[x]; }; } else { var actualBodyLen = pack.bodyIndex - len;//body的實際長度 var skipBodyLen = pack.bodyLen - actualBodyLen;//需要補上的長度;body定義長度 - body的實際長度 = 本次需要獲取的body長度 for (int x = 0; x < skipBodyLen; x++) { pack.bodyBuff[actualBodyLen + x] = bytes[x]; } //處理接收到的數據 NetMsg msg = ByteHelper.DeSerialize<NetMsg>(pack.bodyBuff); this.OnReceiveMsg(msg); //重置消息包 pack.ResetData(); //★★★★★開始處理消息頭部分★★★★★ this.RcvHeadData(pack, bytes.Skip(skipBodyLen).ToArray()); } }
如果不放心,可以在該加日志的地方加日志,觀察代碼的邏輯是否正確。說明:這是借鑒一個叫做PESocket的開源項目改的。
下面是用到的相關類:
/// <summary> /// 消息接收類 /// </summary> public class BytePkg { public int headLen = 4; public byte[] headBuff = null; public int headIndex = 0; public int bodyLen = 0; public byte[] bodyBuff = null; public int bodyIndex = 0; public BytePkg() { headBuff = new byte[4]; } public void InitBodyBuff() { bodyLen = BitConverter.ToInt32(headBuff, 0); bodyBuff = new byte[bodyLen]; } public void ResetData() { headIndex = 0; bodyLen = 0; bodyBuff = null; bodyIndex = 0; } }
/// <summary> /// 字節輔助類 /// </summary> public class ByteHelper { public static byte[] PackNetMsg<T>(T msg) where T : NetMsg { return PackLenInfo(Serialize(msg)); } public static byte[] PackLenInfo(byte[] data) { int len = data.Length; byte[] pkg = new byte[len + 4]; byte[] head = BitConverter.GetBytes(len); head.CopyTo(pkg, 0); data.CopyTo(pkg, 4); return pkg; } public static byte[] Serialize<T>(T pkg) where T : NetMsg { using (MemoryStream ms = new MemoryStream()) { BinaryFormatter bf = new BinaryFormatter(); bf.Serialize(ms, pkg); ms.Seek(0, SeekOrigin.Begin); return ms.ToArray(); } } public static T DeSerialize<T>(byte[] bs) where T : NetMsg { using (MemoryStream ms = new MemoryStream(bs)) { BinaryFormatter bf = new BinaryFormatter(); T pkg = (T)bf.Deserialize(ms); return pkg; } } }
/// <summary> /// 自定義請求數據格式 /// </summary> [Serializable] public class NetMsg { public string EPCstring; public DateTime Time; public string IP; public string CommandType; }
1.客戶端:先將實體序列化,轉為字節數組,然后同步發送即可,無需異步發送
2.服務端:
先想辦法獲取消息頭
如果消息頭一直沒滿,那就一直獲取
如果消息頭滿了,就計算出消息體長度,並向下獲取消息體
再想辦法獲取消息體
根據消息頭包含的消息體長度,獲取消息體
如果消息體一直沒滿,那就一直獲取
如果消息體滿了,那就重新獲取消息頭