- 前言
本項目核心在於實現通訊協議解析,目前例程僅實現了 一對一通訊的解決方案,多設備的(如 485通訊)從理論上是可以實現,后期有機會再從框架層去處理。
- 項目介紹
項目名稱為 ZhCun.SerialIO
一款串口通訊框架,更容易的處理協議解析,內部實現了粘包、分包、冗余錯誤包的處理實現; 它可更方便的業務邏輯的處理,將通訊、協議解析、業務邏輯 完全獨立開來,更容易擴展和修改代碼邏輯; 本項目參考了SuperSocket中協議解析的設計原理,可外部 命令(Command)類, 對應協議中命令字靈活實現邏輯。
例如: 協議格式:[2字節 固定頭 0xaa,0xbb] + [1字節 長度 len] [1字節 命令字] [1字節 設備號] [N字節 data] [2字節 CRC校驗碼]
命令數據: AA BB 09 01 05 00 01 2B 56
可以處理以下幾種(粘包、分包、容錯)情況:
1. AA BB 09 01 05 00 01 2B 56 AA BB 09 01 05 00 01 2B 56 AA BB 09 01 05 00 01 可以不考慮粘包的處理,會分成3條協議交給Command來處理(后面說明)
2. 00 00 00 AA BB 09 01 05 00 01 2B 56 00 00 00 標記為紅色的為錯誤數據,這些數據會被自動過濾掉
3. 連續收到(延時接收了)多個半包,串口緩存問題可能導致延時收到數據
AA BB 09 01 05
00 01 2B 56
這種情況會等待下次處理,如果之后再沒有收到正確的數據會丟棄前部分,后面正確的數據會正常處理
代碼目錄:
- 設計思路及實現
ISerialServer 是實現串口通訊的接口,接收數據、發送數據、打開與關閉串口的實現;SerialCore 是 ISerialServer 通訊的核心實現
public interface ISerialServer : IDisposable { /// <summary> /// 當連接狀態改變時觸發 /// </summary> event Action<ConnectChangedEvent> ConnectChanged; /// <summary> /// 當讀數據完成時觸發 /// </summary> event Action<ReadWriteEvent> DataReadOver; /// <summary> /// 當下發數據時觸發 /// </summary> event Action<ReadWriteEvent> DataWriteOver; /// <summary> /// 當前服務的配置 /// </summary> SerialOption Option { get; } void Write(byte[] data); void Write(byte[] data, int offset, int count); void Write(byte[] data, int offset, int count, int sendTimes, int interval); void Write(IWriteModel model); /// <summary> /// 開始監聽串口 /// </summary> void Start(SerialOption option); /// <summary> /// 默認參數及指定串口開始服務 /// </summary> void Start(string portName, int baudRate = 9600); }
SerialServerBase 繼承了 SerialCore ,它主要實現了 命令處理器 及 過濾處理器,這套框架的核心就是協議的解析及命令的處理;
它擴展了兩個重要屬性 :Filters 和 Commands 分別是協議過濾器和命令處理器,與 SerialCore 分開是為了滿足不需要過濾器和命令處理器的情況
構造函數中調用載入過濾器 LoadFilters() 與 載入命令處理器 的兩個方法,該方法應該由應用程序來實現子類並加入用戶自定義的 Filters 和 Commands
public SerialServerBase() { Filters = new List<IReceiveFilter>(); Commands = new List<ICommand>(); LoadFilters(); LoadCommands(); Filters.ForEach(s => s.OnFilterFinish = ReceiveFilterAction); } /// <summary> /// 過濾器 /// </summary> protected List<IReceiveFilter> Filters { get; } /// <summary> /// 命令處理器 /// </summary> protected List<ICommand> Commands { get; } /// <summary> /// 加載過濾器,子類需 Filters.Add /// </summary> protected virtual void LoadFilters() { } /// <summary> /// 加載命令處理器 /// </summary> protected virtual void LoadCommands() { }
SerialServerBase 重寫了 OnDataReadOver 和 ReceiveFilterAction,分別來處理協議解析和命令處理
/// <summary> /// 接收到數據后交給過濾器來處理協議 /// </summary> protected override void OnDataReadOver(byte[] data, int offset, int count) { foreach (var filter in Filters) { filter.Filter(this, data, offset, count, false, out _); } base.OnDataReadOver(data, offset, count); } /// <summary> /// 接收數據解析完成后觸發 /// </summary> protected virtual void ReceiveFilterAction(PackageInfo package) { if (Commands == null || Commands.Count == 0) return; var cmd = Commands.Find(s => s.Key == package.Key); if (cmd != null) { cmd.Execute(this, package); } }
IReceiveFilter 接收過濾器定義,它實現解析的核心功能,處理粘包、分包都是在 它的實現類 ReceiveBaseFilter 中,Filter 方法實現分包粘包的處理,代碼如下
/// <summary> /// 過濾協議,粘包、分包的處理 /// </summary> public virtual void Filter(IBufferReceive recBuffer, byte[] data, int offset, int count, bool isBuffer, out int rest) { if (!isBuffer && recBuffer.HasReceiveBuffer()) { recBuffer.SetReceiveBuffer(data, offset, count); Filter(recBuffer, recBuffer.ReceiveBuffer, 0, recBuffer.ReceiveOffset, true, out rest); return; } if (isBuffer && count < MinLength) { rest = 0; return; } if (!isBuffer && recBuffer.ReceiveOffset + count < MinLength) { //等下一次接收后處理 recBuffer.SetReceiveBuffer(data, offset, count); rest = 0; return; } rest = 0; if (!FindHead(data, offset, count, out int headOffset)) { //未找到包頭丟棄 recBuffer.RestReceiveBuffer(); FilterFinish(1, data, offset, count); return; } if (count - (headOffset - offset) < MinLength) { // 從包頭位置小於最小長度(半包情況),注意:解析了一半,不做解析完成處理 recBuffer.SetReceiveBuffer(data, headOffset, count - (headOffset - offset)); return; } int dataLen = GetDataLength(data, headOffset, count - (headOffset - offset)); if (dataLen <= 0) { //錯誤的長度 丟棄 recBuffer.RestReceiveBuffer(); FilterFinish(2, data, offset, count); return; } if (dataLen > count - (headOffset - offset)) { //半(分)包情況,等下次接收后合並 if (!isBuffer) recBuffer.SetReceiveBuffer(data, headOffset, count - (headOffset - offset)); return; } rest = count - (dataLen + (headOffset - offset)); FilterFinish(0, data, headOffset, dataLen); recBuffer.RestReceiveBuffer(); if (rest > 0) { Filter(recBuffer, data, headOffset + dataLen, rest, false, out rest); return; } }
核心解析先介紹這么多,下面舉例說明下如何應用及使用過程
以上介紹示例的協議來舉例
協議說明: [2字節 固定頭 0xaa,0xbb] + [1字節 長度 len] [1字節 命令字] [1字節 設備號] [N字節 data] [2字節 CRC校驗碼]
步驟:
1. 創建過濾器 ,應用層的過濾器只需要設置 包頭,獲取數據包長度、命令字的實現,簡單幾行代碼即可方便實現過濾的整個過程;
代碼如下:
public class FHDemoFilter : FixedHeadFilter { static byte[] Head = new byte[] { 0xaa, 0xbb }; public FHDemoFilter() : base(Head, 6) { } //[0xaa,0xbb] [len] [cmd] [DevId] [data] [crc-1,crc-2] protected override int GetDataLength(byte[] data, int offset, int count) { //數據包長度 第3個字節 return data[offset + 2]; } protected override int GetPackageKey(byte[] data, int offset, int count) { //命令字 第4個字節 return data[offset + 3]; } }
2. 創建命令處理器 ,命令處理器 由 ICommand 派生,需要指明 Key (即:GetPackageKey 獲取的命令字),然后一個 執行的邏輯方法 Execute ,這里將 接收到的數據包與發送的數據包封裝了實體對象,更方便處理data的解析及發送包的封裝;
定義一個抽象的 CmdBase 它的派生類來實現具體 命令 的業務邏輯,CmdBase 會將協議生成一個實體對象給派生類
public abstract class CmdBase<TReadModel> : ICommand where TReadModel : R_Base, new() { public abstract int Key { get; } public abstract string CmdName { get; } /// <summary> /// 執行響應邏輯 /// </summary> public abstract void ExecuteHandle(ISerialServer server, TReadModel rep); public virtual void Execute(ISerialServer server, PackageInfo package) { var rModel = new TReadModel(); var r = rModel.Analy(package.Body, package.BodyOffset, package.BodyCount); if (r == 0) { ExecuteHandle(server, rModel); } else { LogPrint.Print($"解析key={Key} 異常,error code: {r}"); } } }
CmdBase 將創建的 R_Base 實例 交給派生類處理,R_Base 封裝了解析數據包內容及校驗的實現,派生類只需要將 Data 數據再次解析即可
R_Base 使用了 BytesAnalyHelper 字節解析工具,它能更方便和靈活的來按順序解析協議;
public abstract class R_Base : BaseProtocol { /// <summary> /// 解析數據對象 /// </summary> protected BytesAnalyHelper AnalyObj { get; private set; } /// <summary> /// 解析消息體, 0 正常, 1 校驗碼 錯誤,2 解析異常 /// </summary> protected abstract int AnalyBody(BytesAnalyHelper analy); /// <summary> /// 解析協議, 0 成功 1 校驗碼錯誤 9 異常 /// </summary> public int Analy(byte[] data, int offset, int count) { try { var crc = GetCRC(data, offset, count - 2); //校驗碼方法內去除 var crcBytes = BitConverter.GetBytes(crc); if (crcBytes[0] != data[offset + count - 1] || crcBytes[1] != data[offset + count - 2]) { return 1; } //[0xaa,0xbb] [len] [cmd] [DevId] [data] [crc-1,crc-2] AnalyObj = new BytesAnalyHelper(data, offset, count, 4); // 跳過包頭部分 DevId = AnalyObj.GetByte(); // 取 DevId var r = AnalyBody(AnalyObj); return r; } catch { //LogHelper.LogObj.Error($"解析數據包發生異常.", ex); return 9; } } }
舉例解析 data 為一個文本的實現,只需要用哪種編碼轉換即可,直接賦值給 Text
public class R_Text : R_Base { public string Text { set; get; } protected override int AnalyBody(BytesAnalyHelper analy) { Text = analy.GetString(analy.NotAnalyCount - 2); return 0; } }
,然后CmdText 的命令處理器就可以得到這個對象,來進行對應的業務邏輯處理
public class CmdText : CmdBase<R_Text> { public override int Key => 0x02; public override string CmdName => "Text"; public override void ExecuteHandle(ISerialServer server, R_Text rep) { LogPrint.Print($"[Text]: {rep.Text}"); //回復消息 var w = new W_TextRe(); w.DevId = rep.DevId; server.Write(w); //.. to do something } }
以上就是實現的主要部分,最終調用 SerialServer 派生實例的 Start 方法即可;
最后附上,demo實現截圖
- 結束語
這個框架不太復雜,步驟有一些繁瑣,但代碼量很少,共享出來也希望給需要的人一些思路,同時也希望能提出一些建議,能更好的改進;
代碼已托管至:gitee