在這個文章里面我將用一個實際的案例來分享如何來構建一個能夠接受3000+個連接的GPRS通訊服務器軟件。在這里,我將分享GPRS通訊服務器設計過程中面臨的問題,分享通訊協議的設計,分享基於異步事件的設計,分享避免內存泄露的解決方案,分享軟件的發布與升級方法,分享通訊協議的單元測試構建等。
1 GPRS通訊服務器軟件介紹
首先我們來看一下這個通訊服務器軟件,如下圖所示。通訊服務器軟件的作用是遵循國家能耗平台技術導則的數據傳輸導則,與GPRS硬件進行通訊,實現數據的遠程傳輸和遠程實時控制。
這個軟件的主要功能有:
(1)接收GPRS采集器的連接,實現對采集器的控制;
(2)實現能耗A~D類數據庫的管理。
下面我來介紹通訊服務器的設計方法和思路,接着再介紹如何實現。
2 通訊服務器的設計模型
2.1 通訊服務器架構
通訊服務器架構采用的是異步事件 + 分層的體系結構。通過異步事件實現不同職責代碼的分離,利用分層將相同職責的代碼組織到同一個層次。整體設計如下所示。
通訊服務器使用EventDispatcher實現不同事件類型的異步路由。通訊服務器是整個系統的中心,它接收來自硬件層GPRS的連接(實際是HTTP的連接),為每一個連接創建一個會話(CommProtocol),每一個會話使用一個線程(也支持線程池)進行通訊,HttpCommServer實現會話的管理,此外,與領域層實現事件傳遞。領域層實現與上層應用的通訊,包括:(1)將數據結果存儲到數據庫;(2)通過消息隊列接受來自硬件的通訊指令;(3)與通訊服務器打交道。通訊協議層實現與不同連接的采集器進行通訊,它是整個系統的難點。
2.2 通訊協議層的設計
通訊協議層核心類為CommProtocol,它使用線程來與硬件通訊,與硬件的通訊過程被拆分一個個的對話,每一個對話用一個RoundTrip類來表示,CommProtocol使用一個RoundTrip來存儲所有的對話,利用線程不停的輪詢存儲的對話,然后一個個的按順序/按優先級來執行對話。下圖是通訊協議層的設計模型。
在與硬件的通訊過程中,通訊以對話作為單位的,以消息作為對話的基石,一次對話實現一組消息的傳遞。通訊協議中,消息有兩種類型:(1)服務器發送給硬件的消息稱為主動消息;(2)硬件發送給服務器的消息稱為被動消息。對話則有三種類型:(1)服務器發送消息給硬件,然后等待硬件的回復消息或者不等待回復,我們稱之為主動對話;(2)服務器等待硬件發送的數據,收到數據后給硬件回復或者不回復,我們稱之為被動對話;(3)以上二者的組合,來實現一組控制指令的傳遞,我們稱之為組合對話。在這個模型中,服務器需要來控制硬件時,會調用CommProtocol的一個方法,比如QueryConfig方法,用於查詢硬件的配置信息,此時,將創建一個主動會話,然后發送到對話隊列中,對話處理線程將從對話隊列中按序取出對話,並執行;當對話隊列為空時,對話處理線程將會使用被動對話類型注冊表,嘗試從通訊鏈路獲取一條完整消息,然后創建一個被動對話並執行。在對話處理線程處理一個主動對話時,它通常是:(1)使用消息適配器發送一個消息,如果失敗后,會重試幾次;接着使用消息適配器來獲取一條響應或者直接返回,當消息發送時會拋出OnMessageSend事件,當對話成功時會發出OnCompleted事件,當失敗時拋出OnError事件。類似的,被動對話的設計也相似,不同的是,其消息已經提前收到了。下面我們就來看看通訊協議層詳細的設計。
2.3 通訊協議層詳細設計
2.3.1 消息的設計
首先我們先來看看公共建築數據傳輸規范里面的消息定義方式。
下面我們來看看消息類型的設計
在上述的消息定義中,MessageBase表示所有消息的基類由消息頭、消息體組成,它們都從MessagePart派生。每一個消息頭由MessageHeader,它定義了能耗建築的建築物ID、采集器ID和消息類型。MessageSerializer消息序列化靜態類用於實現消息的解析與反解析。
以下XML格式是服務器配置數據采集器時的消息格式。通訊時,服務器發送一個period類型的消息,用於配置采集器定時上報數據的間隔,然后數據采集器響應一條period_ack消息。
<?xml version="1.0" encoding="utf-8" ?> <root> <!-- 通用部分 --> <!-- building_id:樓棟編號 gateway_id:采集器編號 type:配置信息數據包的類型 --> <common> <building_id>XXXXXX</building_id > <gateway_id>XXX</gateway_id > <type>以2種操作類型之一</type> </common> <!-- 配置信息 --> <!--操作有2種類型 period:表示服務器對采集器采集周期的配置,period子元素有效 period_ack:表示采集器對服務器采集周期配置信息的應答 --> <config operation="period/period_ack"> <period>15</period> </config> </root>
根據規范的消息格式,我們定義的配置消息由主動消息體、主動消息、被動消息體和被動消息四個類構成。
主動消息體定義如下。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Xml.Serialization; namespace UIShell.EcmCommServerService.Protocol.Message { [XmlRoot("config", Namespace = "", IsNullable = false)] public class ConfigActiveMessageBody : MessagePart { [XmlAttribute("operation")] public string Operation { get; set; } [XmlElement("period")] public int Period { get; set; } public ConfigActiveMessageBody() { Operation = "period"; } } }
主動消息定義如下。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Xml.Serialization; using UIShell.EcmCommServerService.Utility; namespace UIShell.EcmCommServerService.Protocol.Message { [XmlRoot("root", Namespace = "", IsNullable = false)] public class ConfigActiveMessage : MessageBase { public static ConfigActiveMessage New(string buildingId, string gatewayId, int period) { var message = new ConfigActiveMessage(); message.Header.BuildingId = buildingId; message.Header.GatewayId = gatewayId; message.Body.Period = period; return message; } [XmlElement("config")] public ConfigActiveMessageBody Body { get; set; } public ConfigActiveMessage() : base(StringEnum.GetStringValue(MessageType.Config_Period)) { Body = new ConfigActiveMessageBody(); } } }
被動消息體定義如下。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Xml.Serialization; namespace UIShell.EcmCommServerService.Protocol.Message { [XmlRoot("config", Namespace = "", IsNullable = false)] public class ConfigAckPassiveMessageBody : MessagePart { [XmlAttribute("operation")] public string Operation { get; set; } public ConfigAckPassiveMessageBody() { Operation = "period_ack"; } } }
被動消息定義如下。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Xml.Serialization; using UIShell.EcmCommServerService.Utility; namespace UIShell.EcmCommServerService.Protocol.Message { [XmlRoot("root", Namespace = "", IsNullable = false)] public class ConfigAckPassiveMessage : MessageBase { public static ConfigAckPassiveMessage New(string buildingId, string gatewayId) { var message = new ConfigAckPassiveMessage(); message.Header.BuildingId = buildingId; message.Header.GatewayId = gatewayId; return message; } [XmlElement("config")] public ConfigAckPassiveMessageBody Body { get; set; } public ConfigAckPassiveMessage() : base(StringEnum.GetStringValue(MessageType.Config_Period_Ack)) { Body = new ConfigAckPassiveMessageBody(); } } }
根據以上的模式,我們為能耗平台定義的所有消息如下。
2.3.2 RoundTrip的設計
RoundTrip表示一次對話,由一組消息的交換來實現。RoundTrip有三種類型,其設計如下所示。RoundTripBase是對話的基類,它定義了OnCompleted、OnError異步事件、Start方法和其它基本屬性;ActiveRoundTripBase表示主動對話基類,表示服務器發送給采集器消息,然后等待或者不等待消息,這個類在RoundTripBase基礎上定義了OnMessageSend異步事件;PassiveRoundTripBase表示被動對話基類,定義了OnMessageReceived事件,表示已經從采集器接收到消息。這些基類都與領域知識無關,只是定義了對話基類所需的方法、屬性、事件。
ActiveRoundTrip則是針對能耗平台定義的所有主動消息的基類,它定義了領域相關的屬性,實現了Start方法,並定義了相關抽象類。我們來看一下Start方法,它首先調用Send方法來發送消息,然后拋出OnMessageSend異步事件,接着調用ReceiveResponseMessage嘗試從采集器收取消息然后拋出OnCompleted異步事件,這個過程如果失敗了,能夠重試,不過如果重試也失敗,則拋出OnError事件。
public override void Start() { Trace(string.Format("開始與集中器{0}會話。", ToBeSentMessage.Header.GatewayId)); // 如果發送失敗,則重試。 // 嘗試次數為: 1 + 失敗時重復次數 for (int i = 0; i <= MessageConstants.RetryTimesOnTimeout; i++) { try { Send(); OnRoundTripMessageSend(new RoundTripEventArgs() { RoundTrip = this }); try { Trace(string.Format("開始第{0}次消息接收。", i + 1)); ReceiveResponseMessages(); OnRoundTripCompleted(new RoundTripEventArgs() { RoundTrip = this }); break; } catch (Exception ex) { Trace(string.Format("第{0}次接收消息失敗,因為:{1},繼續嘗試。", i + 1, CommProtocol.GetErrorMessage(ex))); throw; } } catch (Exception ex) { Trace(string.Format("第{0}次發送命令失敗,因為:{1},繼續嘗試。", i + 1, CommProtocol.GetErrorMessage(ex))); if (i == MessageConstants.RetryTimesOnTimeout) { Trace(string.Format("第{0}次發送命令失敗,因為:{1},停止嘗試。", i + 1, CommProtocol.GetErrorMessage(ex))); OnRoundTripError(new RoundTripEventArgs() { RoundTrip = this, Exception = ex }); throw; } } } Completed = true; if (ReceivedResponseMessages != null) { Trace(string.Format("當前會話'{0}'接收了{1}個響應消息,詳細如下:", RoundTripDescription, ReceivedResponseMessages.Length)); foreach (var message in ReceivedResponseMessages) { //Trace("響應命令內容:" + ProtocolUtility.BytesToHexString(message.ToContent())); Trace("響應消息:" + message.ToString()); } } else { Trace("當前會話接收的響應消息為空。"); } Trace(string.Format("與集中器{0}會話成功。", ToBeSentMessage.Header.GatewayId)); }
發送消息Send方法實現如下,它使用StreamAdapter來發送一條原始消息。
public override void Send() { StreamAdapter.SendRawMessage(ToBeSentMessage.ToContent(), ToBeSentMessage.ToXmlContent()); }
而ReceiveResponseMessages方法則是一個抽象方法。
public abstract void ReceiveResponseMessages();
同理,PassiveRoundTrip則是針對能耗平台定義的所有被動消息的基類,它定義了領域相關的屬性,實現了Start方法和相應的抽象方法。
public abstract TResponseMessage CreateResponseMessage(); public override void Receive() { if (ReceivedMessage == null) { MessageHeader header; var receivedMessageContent = StreamAdapter.ReceiveRawMessage(BuildingId, GatewayId, ReceivedMessageType, out header); try { ReceivedMessage = MessageSerialiser.DeserializeRaw<TReceivedMessage>(receivedMessageContent); } catch (Exception ex) { throw new ReceiveMessageException("Parse the received message failed.", ex) { ErrorStatus = ReceiveMessageStatus.Failed }; } } } public void SendResponseMessage() { ResponsedMessage = CreateResponseMessage(); if (ResponsedMessage != null) { //Trace("開始發送的響應消息內容:" + ProtocolUtility.BytesToHexString(ResponsedMessage.ToContent())); Trace("開始發送的響應消息:" + ResponsedMessage.ToXmlContent()); StreamAdapter.SendRawMessage(ResponsedMessage.ToContent(), ResponsedMessage.ToString()); } else { Trace("不發送響應消息。"); } } public override void Start() { Trace(string.Format("開始嘗試與集中器{0}進行被動式會話。", GatewayId)); try { Receive(); OnRoundTripMessageReceived(new RoundTripEventArgs() { RoundTrip = this }); try { //Trace("接收到消息內容:" + ProtocolUtility.BytesToHexString(ReceivedMessage.ToContent())); Trace("接收到消息:" + ReceivedMessage.ToXmlContent()); SendResponseMessage(); Completed = true; OnRoundTripCompleted(new RoundTripEventArgs() { RoundTrip = this }); } catch (Exception ex) { Trace(string.Format("嘗試發送響應消息到集中器{0}失敗,因為:{1}。", GatewayId, CommProtocol.GetErrorMessage(ex))); throw; } } catch (Exception ex) { Trace(string.Format("嘗試從集中器{0}接收消息失敗,因為:{1}。", GatewayId, CommProtocol.GetErrorMessage(ex))); OnRoundTripError(new RoundTripEventArgs() { RoundTrip = this, Exception = ex }); throw; } Trace(string.Format("與集中器{0}進行被動式會話成功。", GatewayId)); }
組合對話CompositeRoundTrip是根據能耗平台設計的,它比較簡單,主要是控制每條對話的執行時序,默認的實現就是按順序來執行每一個對話。
public override void Start() { int i = 1; int roundTripsCount = RoundTrips.Count; Trace(string.Format("開始組合會話,由{0}個子會話組成。", roundTripsCount)); RoundTripBase roundTrip; while (RoundTrips.Count > 0) { roundTrip = RoundTrips.Dequeue(); try { Trace(string.Format("開始執行第{0}個子會話。", i)); roundTrip.Start(); Trace(string.Format("第{0}個子會話執行完成。", i)); } catch (Exception ex) { Trace(string.Format("組合會話失敗,第{0}個子會話執行失敗。", i)); OnRoundTripError(new RoundTripEventArgs() { RoundTrip = roundTrip, Exception = ex }); throw; } finally { roundTrip.Dispose(); } i++; } Trace(string.Format("組合會話完成,由{0}個子會話組成。", roundTripsCount)); OnRoundTripCompleted(new RoundTripEventArgs() { RoundTrip = this }); }
接下來我們看看一個主動對話的實現,以ConfigActiveRoundTrip為例。
using System; using System.Collections.Generic; using System.Linq; using System.Net.Sockets; using System.Text; using UIShell.EcmCommServerService.Protocol.Message; using UIShell.EcmCommServerService.Utility; namespace UIShell.EcmCommServerService.Protocol.RoundTrip.Active { public class ConfigActiveRoundTrip : ActiveRoundTrip<ConfigActiveMessage, ConfigAckPassiveMessage> { public ConfigActiveRoundTrip( string buildingId, string gatewayId, int period, MessageConstants messageConstants, TcpClient client) : base(buildingId, gatewayId, StringEnum.GetStringValue(MessageType.Config_Period_Ack), ConfigActiveMessage.New(buildingId, gatewayId, period), messageConstants, client) { } public override void ReceiveResponseMessages() { MessageHeader header; var messageContent = ReceiveRawMessage(out header); var message = MessageSerialiser.DeserializeRaw<ConfigAckPassiveMessage>(messageContent); ReceivedResponseMessages = new ConfigAckPassiveMessage[] { message }; } } }
下面再看看被動對話的實現,這是一條心跳檢測消息,由采集器定時發送給服務器來保持通訊鏈路。
using System; using System.Collections.Generic; using System.Linq; using System.Net.Sockets; using System.Text; using UIShell.EcmCommServerService.Protocol.Message; using UIShell.EcmCommServerService.Utility; namespace UIShell.EcmCommServerService.Protocol.RoundTrip.Passive { public class HeartBeatPassiveRoundTrip : PassiveRoundTrip<HeartBeatNotifyPassiveMessage, HeartBeatTimeActiveMessage> { public HeartBeatPassiveRoundTrip( string buildingId, string gatewayId, MessageConstants messageConstants, TcpClient client) : base(buildingId, gatewayId, StringEnum.GetStringValue(MessageType.HeartBeat_Notify), messageConstants, client) { IsKeepAliveRoundTrip = true; } public HeartBeatPassiveRoundTrip( HeartBeatNotifyPassiveMessage receiveMessage, MessageConstants messageConstants, TcpClient client) : this(receiveMessage.Header.BuildingId, receiveMessage.Header.GatewayId, messageConstants, client) { ReceivedMessage = receiveMessage; } public override HeartBeatTimeActiveMessage CreateResponseMessage() { return HeartBeatTimeActiveMessage.New(BuildingId, GatewayId, DateTime.Now); } } }
下面我們看看一個比較復雜的對話,文件傳輸。文件傳輸的過程為:(1)將文件分包,然后一包一包傳輸;(2)查詢缺包情況;(3)如果有缺包,則繼續發送缺失的包,直至成功;如果沒有缺包,則傳輸完成。
public override void Start() { // 檢查離線存儲區是否存在未傳輸完成的文件 var item = ContinuousFileStorage.GetNotCompletedFile(BuildingId, GatewayId); if (item == null && Content == null) // 說明被調用的是protected的構造函數,用於檢測是否需要進行斷點續傳。 { Trace(string.Format("集中器{0}不需要進行文件斷點續傳。", GatewayId)); return; } try { if (item == null) // 從頭開始傳輸文件 { Trace(string.Format("集中器{0}開始進行文件傳輸。", GatewayId)); Trace(string.Format("文件名稱:{0},文件長度:{1},包大小:{2},包數:{3}。", FileName, Content.Length, PackageSize, PackageCount)); // 創建離線存儲區 ContinuousFileStorage.StartFileTransfer(BuildingId, GatewayId, FileType, FileName, Content, PackageSize); List<int> indexes = new List<int>(); for (int index = 1; index <= PackageCount; index++) { indexes.Add(index); } SendFilePackage(indexes); } else // 開始斷點續傳 { Trace(string.Format("集中器{0}上次文件傳輸未完成,繼續進行斷點續傳。", GatewayId)); Trace(string.Format("斷點續傳的文件名稱:{0},文件長度:{1},包大小:{2},包數:{3}。", FileName, Content.Length, PackageSize, PackageCount)); } // 查詢丟失的包並重傳 if (QueryLostPackageAndResend()) { // 刪除離線存儲區 ContinuousFileStorage.EndFileTransfer(BuildingId, GatewayId, FileType, FileName, PackageSize); Trace(string.Format("集中器{0}文件傳輸成功。", GatewayId)); OnRoundTripCompleted(new RoundTripEventArgs() { RoundTrip = this }); } else { Trace(string.Format("集中器{0}文件傳輸未完成。", GatewayId)); OnRoundTripError(new RoundTripEventArgs() { RoundTrip = this, Exception = new Exception(string.Format("集中器{0}文件傳輸未完成。", GatewayId)) }); ContinuousFileStorage.IncrementFileTransferFailedCount(BuildingId, GatewayId); } } catch(Exception ex) { OnRoundTripError(new RoundTripEventArgs() { RoundTrip = this, Exception = ex }); ContinuousFileStorage.IncrementFileTransferFailedCount(BuildingId, GatewayId); } }
最后我們看一下能耗平台的對話類型,它由主動、被動和組合對話構成。
2.3.3 通訊協議類的設計
通訊協議類是系統的一個核心類,它為每一個通訊連接創建了一個獨立的通訊線程和對話隊列,並在隊列空閑的時候一直嘗試從鏈路中獲取被動消息,一旦有被動消息獲取,則創建被動對話,然后發送到隊列中。
以下方法是實現的核心,通訊線程首先從對話隊列中獲取對話,然后運行該對話,如果對話拋出了CommStreamException,說明鏈路關閉,則需要停止當前通訊協議;如果拋出了ThreadAboutException,說明被終止,則需要直接拋出異常;另外,如果對話隊列為空時,則嘗試檢查被動對話。
public bool Start() { if (_started) { return true; } FireOnStarting(); _commThread = new Thread(() => { RoundTripBase roundTrip; while (!_exited) { Monitor.Enter(_queue.SyncRoot); roundTrip = Dequeue(); if (roundTrip != null) { try { try { Monitor.Exit(_queue.SyncRoot); OnRoundTripStartingHandler(this, new RoundTripEventArgs() { RoundTrip = roundTrip }); roundTrip.Start(); } catch (ThreadAbortException) { Trace("通訊線程被終止。"); throw; } catch (CommStreamException ex) // 無法獲取Stream的時候,直接退出??需要加一個標志位 // 需要拋出事件,通知后續處理,如將RoundTrip另存 { _exited = true; roundTrip.Trace("會話失敗,因為:鏈路已經關閉。"); _log.Error(string.Format("Start the round trip '{0}' error.", roundTrip), ex); } catch (Exception ex) { string error = GetErrorMessage(ex); roundTrip.Trace(string.Format("會話失敗,因為:{0}。", error)); _log.Error(string.Format("Start the round trip '{0}' error.", roundTrip), ex); } if (!_exited) { roundTrip.Trace(Environment.NewLine); OnRoundTripStartedHandler(this, new RoundTripEventArgs() { RoundTrip = roundTrip }); } else { // 1 將當前失敗的RoundTrip保存入隊 FailedRoundTrips.Enqueue(roundTrip); // 2 保存其它沒有處理的RoundTrip do { roundTrip = _queue.Dequeue(); if (roundTrip != null) { FailedRoundTrips.Enqueue(roundTrip); } } while (roundTrip != null); // 3 停止當前協議 Stop(); } // 執行完RoundTrip后,開始清理資源 roundTrip.Dispose(); } catch (ThreadAbortException) { Trace("通訊線程被終止。"); throw; } catch (Exception ex) { _log.Error("Unhandled exception in CommProtocol.", ex); } } else { Monitor.Exit(_queue.SyncRoot); OnIdleHandler(this, new RoundTripEventArgs()); ContinuousFileTransfer(); try { CheckPassiveRoundTripAvailableAndEnqueue(); if (_queue.Count == 0) { Thread.Sleep((int)MessageConstants.PassiveRoundTripCheckInterval.TotalMilliseconds); } } catch (ThreadAbortException) { Trace("通訊線程被終止。"); throw; } catch (CommStreamException ex) // 無法獲取Stream的時候,直接退出??需要加一個標志位 // 需要拋出事件,通知后續處理,如將RoundTrip另存 { _exited = true; Trace("檢查被動消息失敗,因為:鏈路已經關閉。"); _log.Error("Check the passive message error.", ex); } catch (Exception ex) { string error = GetErrorMessage(ex); Trace(string.Format("檢查被動消息失敗,因為:{0}。", error)); _log.Error("Check the passive message error.", ex); } //_autoResetEvent.WaitOne(); } } }); _commThread.Start(); _started = true; FireOnStarted(); return true; }
檢查被動對話的方法實現如下,首先檢查鏈路是否關閉,如果關閉,則直接停止協議;接着從共享緩存獲取被動消息,如果查找到被動消息,則創建被動對話,然后加入對話隊列;最后,嘗試從鏈路中讀取一條被動消息。
public void CheckPassiveRoundTripAvailableAndEnqueue() { Trace("開始檢查被動通訊。"); if (!NetworkUtility.IsConnected(Client)) { Trace("被動通訊檢測時,鏈路已經關閉,關閉會話。"); Stop(); return; } // 1 Check ShardInputBuffer UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> tuple; foreach (var pair in _passiveRoundTripFactoryRegistry) { while ((tuple = SharedInputBuffer.FindAndThenRemove(BuildingId, GatewayId, pair.Key)) != null) { Trace(string.Format("從共享緩沖區獲取到一條被動消息,消息頭為:'{0}'。", pair.Key)); Enqueue(pair.Value(tuple.Item1, tuple.Item2)); } } // 2 Check StreamAdapter if (Client.Available == 0) { Trace("通訊鏈路沒有可用數據。"); if (!NetworkUtility.IsConnected(Client)) { Trace("被動通訊檢測時,鏈路已經關閉,關閉會話。"); Stop(); return; } } else { while (Client.Available > 0) { MessageHeader header; var content = CommStreamAdapter.ReceiveOneRawMessage(out header); if (content != null && content.Length > 0 && header != null) { // 1 如果當前消息在被動檢測時收到,但不屬於注冊的被動消息,則放棄該消息。 if (header.BuildingId.Equals(BuildingId) && header.GatewayId.Equals(GatewayId) && !_passiveRoundTripFactoryRegistry.ContainsKey(header.MessageType)) { Trace(string.Format("從通訊鏈路獲取到一條被動消息,該消息不是注冊的被動消息,忽略它,被忽略的消息頭為:'{0}'。", header)); continue; } // 2 如果當前消息在被動檢測時收到,並不屬於當前通訊線程處理的范圍,則添加到共享緩沖區。 // TODO: 這可能會產生一個Bug,如果接收到其它線程的消息時怎么辦? else if (!header.BuildingId.Equals(BuildingId) || !header.GatewayId.Equals(GatewayId) || !_passiveRoundTripFactoryRegistry.ContainsKey(header.MessageType)) { Trace(string.Format("從通訊鏈路獲取到一條被動消息,添加到共享緩沖區,消息頭為:'{0}'。", header)); SharedInputBuffer.AddSharedBufferItem(new OSGi.Utility.Tuple<MessageHeader, byte[]> { Item1 = header, Item2 = content }); } else // 3 如果是當前可以處理的被動消息,則創建一個被動RoundTrip { CreateRoundTripDelegate createRoundTrip; if (_passiveRoundTripFactoryRegistry.TryGetValue(header.MessageType, out createRoundTrip)) { Trace(string.Format("從通訊鏈路獲取到一條被動消息,添加到通訊隊列,消息頭為:'{0}'。", header)); Enqueue(createRoundTrip(header, content)); } else { Trace(string.Format("從通訊鏈路獲取到一條被動消息,添加到共享緩沖區,消息頭為:'{0}'。", header)); SharedInputBuffer.AddSharedBufferItem(new OSGi.Utility.Tuple<MessageHeader, byte[]> { Item1 = header, Item2 = content }); } } } } if (!NetworkUtility.IsConnected(Client)) { Trace("被動通訊檢測時,鏈路已經關閉,關閉會話。"); Stop(); return; } } Trace("檢查被動通訊完成。"); }
通訊協議使用RoundTripQueue來保存所有的對話,它是一個線程安全類,以下是Enqueue方法的實現。
public RoundTripQueue FailedRoundTrips = new RoundTripQueue(); public void Enqueue(RoundTripBase roundTrip) { if (!_started) { throw new Exception("The protocol is not started yet or exited."); } _queue.Enqueue(roundTrip); OnRoundTripEnquedHandler(this, new RoundTripEventArgs() { RoundTrip = roundTrip }); if (!(roundTrip.IsKeepAliveRoundTrip) || MessageConstants.ShowKeepAliveMessage) { roundTrip.OnTraceMessageAdded += DispatchAsyncTraceMessageAddedEvent; } try { _autoResetEvent.Set(); } catch { } }
上述3個方法實現了整個通訊模型。對於主動對話,我們還會為通訊協議創建一個相應的方法,並將對話加入到隊列中。下面是CommProtocol通訊協議類中Config的方法實現,Public方法為向領域層暴露的功能,而Internal方法則為了內部的單元測試,其實現非常簡單。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using UIShell.EcmCommServerService.Protocol.RoundTrip.Active; namespace UIShell.EcmCommServerService.Protocol { public partial class CommProtocol { public void Config( int period, EventHandler<RoundTripEventArgs> onMessageSend, EventHandler<RoundTripEventArgs> onCompleted, EventHandler<RoundTripEventArgs> onError) { ConfigActiveRoundTrip roundTrip; Config(period, onMessageSend, onCompleted, onError, out roundTrip); } internal void Config( int period, EventHandler<RoundTripEventArgs> onMessageSend, EventHandler<RoundTripEventArgs> onCompleted, EventHandler<RoundTripEventArgs> onError, out ConfigActiveRoundTrip roundTrip) { var configRoundTrip = new ConfigActiveRoundTrip(BuildingId, GatewayId, period, MessageConstants, Client); if (onMessageSend != null) { configRoundTrip.OnMessageSend += onMessageSend; } if (onCompleted != null) { configRoundTrip.OnCompleted += onCompleted; } if (onError != null) { configRoundTrip.OnError += onError; } Enqueue(configRoundTrip); roundTrip = configRoundTrip; } } }
2.3.4 通訊服務的實現
通訊服務器HttpCommServer用於打開一個TCP端口,接受TCP連接,當連接登錄成功后,為每一個連接創建一個會話,並與領域層業務邏輯關連。下面我們看一下它的實現,其核心方法為ListenGprsRequest,在該方法中,首先為每一個連接進行一次身份驗證,驗證通過后,創建一個會話,然后添加到會話列表中。
public partial class HttpCommServer : TrackableBase { public MessageConstants MessageConstants { get; private set; } public string IPAddressString { get; private set; } public int Port { get; private set; } public ThreadSafeList<CommProtocol> Sessions { get; private set; } private Thread _listenerThread; private TcpListener _listener; private volatile bool _exited; private ILog _log; private object _syncRoot = new object(); public HttpCommServer(string ipaddress, int port, MessageConstants messageConstants) { IPAddressString = ipaddress; Port = port; Sessions = new ThreadSafeList<CommProtocol>(); MessageConstants = messageConstants; _log = BundleActivator.LogService.CreateLog(BundleActivator.Bundle, GetType()); RegisterDomainHandlerCreationDelegates(); } public CommProtocol GetSession(string buildingId, string gatewayId) { return Sessions.Find(s => s.BuildingId.Equals(buildingId) && s.GatewayId.Equals(gatewayId)); } public void Start() { lock (_syncRoot) { IPEndPoint local = new IPEndPoint(IPAddress.Parse(IPAddressString), Port); _listener = new TcpListener(local); _listener.Start(); _listenerThread = new Thread(new ThreadStart(ListenGprsRequest)); _listenerThread.Start(); } OnSessionChanged += OnSessionChangedForDomain; } private void ListenGprsRequest() { while (!_exited) { // 接受一次連接 if (!_exited) { try { TcpClient tcpClient = null; try { tcpClient = _listener.AcceptTcpClient(); Trace(string.Format("接收到來自IP地址'{0}'的連接。", (tcpClient.Client.RemoteEndPoint as IPEndPoint).Address)); _log.Info(string.Format("Accept new connection from ip '{0}'.", (tcpClient.Client.RemoteEndPoint as IPEndPoint).Address)); lock (_syncRoot) { if (!_exited) { var loginRoundTrip = new LoginCompositeRoundTrip(MessageConstants, tcpClient); loginRoundTrip.ParentTracker = this; loginRoundTrip.Start(); loginRoundTrip.Dispose(); var session = new CommProtocol(loginRoundTrip.BuildingId, loginRoundTrip.GatewayId, MessageConstants, tcpClient); session.ParentTracker = this; session.Start(); AddSession(session); // 清空離線存儲區 ContinuousDataStorage.Reset(loginRoundTrip.GatewayId); _log.Info(string.Format("Start the session for gateway '{0}' of the building '{1}'.", session.GatewayId, session.BuildingId)); } } } catch (ThreadAbortException) { throw; } catch (Exception ex) { try { if (tcpClient != null) // 登錄失敗,斷開連接 { tcpClient.Close(); } } catch { } // Trace(string.Format("登錄失敗,失敗IP地址為'{0}'。", tcpClient != null ? (tcpClient.Client.RemoteEndPoint as IPEndPoint).Address.ToString() : "N/A")); _log.Error("The connection login failed.", ex); } } catch (ThreadAbortException) { throw; } catch (Exception ex) { _log.Error("Can not listen any more.", ex); break; } } } } public void Stop() { if (_exited) { return; } _log.Info("The server is stopping."); lock (_syncRoot) { _log.Info("The sessions are stopping."); // 不能使用 Sessions.ForEach(s => s.Stop()),這是因為s.Stop將會刪除Sessions // 從而改變ForEach的行為,造成Session泄露。 var sesions = Sessions.ToArray(); foreach (var session in sesions) { session.Stop(); } OnSessionChanged -= OnSessionChangedForDomain; _log.Info("The sessions are stopped and cleared."); _listener.Stop(); _log.Info("The listener is stopped."); Thread.Sleep(1000); try { _listenerThread.Abort(); } catch { } _log.Info("The listener thread is stopped."); _exited = true; } _log.Info("The server is stopped."); SharedInputBuffer.ClearSharedBuffer(); } public event EventHandler<SessionChangedEventArgs> OnSessionChanged; public void AddSession(CommProtocol session) { var oldSession = Sessions.Find(p => p.BuildingId.Equals(session.BuildingId) && p.GatewayId.Equals(session.GatewayId)); if (oldSession != null) { RemoveSession(oldSession); _log.Info(string.Format("The session for gateway '{0}' of building '{1}' already existed, it will be deleted first.", session.GatewayId, session.BuildingId)); } Sessions.Add(session); session.OnStopped += OnSessionStopped; _log.Info(string.Format("Add the session for gateway '{0}' of building '{1}'.", session.GatewayId, session.BuildingId)); Trace(string.Format("為采集器'{0}'創建通訊會話,目前會話數目為'{1}'。", session.GatewayId, SessionNumber)); if (OnSessionChanged != null) { OnSessionChanged(this, new SessionChangedEventArgs() { ChangedAction = CollectionChangedAction.Add, BuildingId = session.BuildingId, GatewayId = session.GatewayId, Session = session }); } } private void OnSessionStopped(object sender, EventArgs e) { RemoveSession(sender as CommProtocol); } public void RemoveSession(CommProtocol session) { session.OnStopped -= OnSessionStopped; Sessions.Remove(session); _log.Info(string.Format("Remove the session for gateway '{0}' of building '{1}'.", session.GatewayId, session.BuildingId)); Trace(string.Format("集中器'{0}'通訊會話已經斷開,目前會話數目為'{1}'。", session.GatewayId, SessionNumber)); if (OnSessionChanged != null) { OnSessionChanged(this, new SessionChangedEventArgs() { ChangedAction = CollectionChangedAction.Remove, BuildingId = session.BuildingId, GatewayId = session.GatewayId, Session = session }); } } }
關於通訊服務器核心的實現已經介紹完成了,下面我們來看看領域層的實現。
3 領域層的實現
本系統的核心設計是基於事件 + 分層的體系結構。通訊服務器與數據采集器硬件的通訊都與領域相關邏輯有關。為了使程序設計更加簡單化,引入事件對各個層次的代碼解耦,通過事件來關聯領域知識與硬件的通訊過程,這樣也方便通訊協議層的測試。這里HttpCommServer管理了所有的通訊會話實例和對話-領域處理器管理。
以下是領域邏輯關聯的代碼。它的作用為:1 監聽SessionChanged事件,為每一個Session的OnRoundTripEnqueued創建領域處理事件; 2 在領域處理事件中,為RoundTrip關聯相應的領域處理類,領域處理類訂閱了RoundTrip的OnCompleted和OnError事件,在里面進行相應處理。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using UIShell.EcmCommServerService.Domain; using UIShell.EcmCommServerService.Protocol; using UIShell.EcmCommServerService.Protocol.RoundTrip.Passive; using UIShell.OSGi.Utility; namespace UIShell.EcmCommServerService.Server { public partial class HttpCommServer { /// <summary> /// 跟蹤每一個Session的RoundTripEnqueued事件,當有RoundTrip注冊時,便注冊事件,處理領域知識。 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void OnSessionChangedForDomain(object sender, SessionChangedEventArgs e) { if (e.ChangedAction == OSGi.CollectionChangedAction.Add) { e.Session.OnRoundTripEnqued += OnSessionRoundTripEnqued; } else { e.Session.OnRoundTripEnqued -= OnSessionRoundTripEnqued; } } private Dictionary<Type, CreateDomainHandlerDelegate> _handlers = new Dictionary<Type, CreateDomainHandlerDelegate>(); private void RegisterDomainHandlerCreationDelegates() { _handlers.Add(typeof(DataReportPassiveRoundTrip), roundTrip => new DataReportDomainHandler() { RoundTrip = roundTrip }); } private void OnSessionRoundTripEnqued(object sender, RoundTripEventArgs e) { CreateDomainHandlerDelegate del; if (_handlers.TryGetValue(e.RoundTrip.GetType(), out del)) { del(e.RoundTrip); _log.Info(string.Format("Create handler for RoundTrip '{0}' completed.", e.RoundTrip.GetType().FullName)); } else { _log.Info(string.Format("The handler for RoundTrip '{0}' not found.", e.RoundTrip.GetType().FullName)); } } } }
下面是領域處理類RoundTripDomainHandler的基類。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using UIShell.EcmCommServerService.Protocol; using UIShell.OSGi.Utility; namespace UIShell.EcmCommServerService.Domain { /// <summary> /// RoundTrip領域處理器,當RoundTrip操作成功時,將相應結果保存到數據庫; /// 相反,如果操作失敗,則需要做異常處理。 /// </summary> /// <typeparam name="TRoundTrip">RoundTrip類型</typeparam> public abstract class RoundTripDomainHandler { private RoundTripBase _roundTrip; public RoundTripBase RoundTrip { get { return _roundTrip; } set { if (_roundTrip == null) { AssertUtility.NotNull(value); _roundTrip = value; _roundTrip.OnCompleted += OnCompleted; _roundTrip.OnError += OnError; } } } public abstract void OnCompleted(object sender, RoundTripEventArgs e); public abstract void OnError(object sender, RoundTripEventArgs e); } public delegate RoundTripDomainHandler CreateDomainHandlerDelegate(RoundTripBase roundTrip); }
以下則是數據上報對話的相關領域處理。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using UIShell.EcmCommServerService.Protocol; using UIShell.EcmCommServerService.Protocol.RoundTrip.Passive; using UIShell.EcmCommServerService.Server; namespace UIShell.EcmCommServerService.Domain { public class DataReportDomainHandler : RoundTripDomainHandler { public override void OnCompleted(object sender, RoundTripEventArgs e) { var dataReportRoundTrip = RoundTrip as DataReportPassiveRoundTrip; e.RoundTrip.Trace("開始將會話結果持久化到數據存儲。"); // ... e.RoundTrip.Trace("將會話結果持久化到數據存儲成功。"); } public override void OnError(object sender, RoundTripEventArgs e) { // 為采集器關聯的Building創建一條失敗記錄 // ... } } }
領域處理類將會調用數據訪問模型來操作數據庫。整個通訊服務器的大致實現已經介紹完成,接下來我將介紹一些非常有意思的技術細節。
4 通訊服務器有意思的技術細節
4.1 共享緩存
按照我的理解,GPRS通訊服務器指定端口的網絡存儲保存了與所有硬件設備的通訊數據,因此,我們需要來區分數據是由哪個采集器發送過來的;此外,在通訊過程中,我們需要處理好通訊的時序,就是說服務器向采集器發送配置主動消息時,期望采集器響應一條結果,此時返回的消息可能是其它消息,因為整個通訊是雙工的,采集器也可以主動向服務器發送消息。因此,我們使用一個SharedInputBuffer來處理上述兩個問題。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using UIShell.OSGi.Collection; namespace UIShell.EcmCommServerService.Protocol { /// <summary> /// 添加共享輸入緩沖區的原因如下: /// 1 對於GPRS服務器,所有的會話都將從同一個網絡數據緩沖區中讀取數據; /// 2 每一個集中器對應一個通訊會話; /// 3 這樣集中器A從網絡數據緩沖區讀取數據時,可能讀取到來自集中器B的數據, /// 因此,我們需要使用緩沖區將集中器B的數據緩存起來,並繼續讀取直到讀取到 /// A的數據或者讀取失敗; /// 4 此外,每一個集中器讀取數據時,都先嘗試從共享緩沖區讀取數據,然后再 /// 嘗試從網絡數據緩沖區讀取。 /// </summary> public static class SharedInputBuffer { private static ThreadSafeList<UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]>> _sharedBuffer = new ThreadSafeList<OSGi.Utility.Tuple<MessageHeader, byte[]>>(); public static ThreadSafeList<UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]>> SharedBuffer { get { return _sharedBuffer; } } public static int Count { get { using (var locker = SharedBuffer.Lock()) { return locker.Count; } } } public static UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> FindAndThenRemoveByMessageType(string type) { return FindAndThenRemove(p => p.Item1.MessageType.Equals(type)); } public static UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> FindAndThenRemove(string buildingId, string gatewayId, string type) { return FindAndThenRemove(p => p.Item1.BuildingId.Equals(buildingId) && p.Item1.GatewayId.Equals(gatewayId) && p.Item1.MessageType.Equals(type)); } public static UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> FindAndThenRemove(Predicate<UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]>> predicate) { using (var locker = SharedBuffer.Lock()) { if (locker.Count > 0) { // 查找緩沖項 var item = SharedBuffer.Find(predicate); if (item != null) { // 刪除並返回 RemoveSharedBufferItem(item); return item; } } return null; } } public static void AddSharedBufferItem(UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> item) { SharedBuffer.Add(item); } public static void RemoveSharedBufferItem(UIShell.OSGi.Utility.Tuple<MessageHeader, byte[]> item) { SharedBuffer.Remove(item); } public static void ClearSharedBuffer() { SharedBuffer.Clear(); } } }
4.2 內存泄露
在整個通訊服務器中,整個通訊過程中,創建了大量的RoundTrip實例,通訊服務器的運行是7 × 24小時 × 365天不間斷的運行,如何保證這個通訊服務器在持久的運行中,內存不會持續增加/CPU不會持續增長,從而保證系統不會崩潰,是必須解決的一個問題。在系統運行初期,我們很快就面臨這個問題的威脅,就是在系統運行中,內存一直在增長。因此,在初期,我們使用CLR Profiler來調試系統的內存和CPU使用情況,初步的調優記錄如下所示。
經過分析,發現新建的RoundTrip實例在對話執行完成后,並沒有被CLR回收,從而導致與RoundTrip關聯的類型都一直存留在內存中。了解.NET GC垃圾回收原理的同志應該知道,GC的條件是引用計數為0。經過分析,發現RoundTrip沒有被釋放的原因在於,我們的UI訂閱了每一個新建的RoundTrip的OnMessageSend/OnCompleted/OnError事件,用於打印通訊過程中的交互的所有消息,這些事件在RoundTrip執行完成后,沒有釋放,從而導致RoundTrip的引用計數始終不是0。
因此,我們為RoundTrip實現了IDisposable接口,在其實現中,來釋放所有的事件句柄。
private List<EventHandler<RoundTripEventArgs>> _onCompletedEventHandlers = new List<EventHandler<RoundTripEventArgs>>(); /// <summary> /// 這是一個異步事件,避免在處理事件時,阻塞其它RoundTrip的運行。 /// </summary> public event EventHandler<RoundTripEventArgs> OnCompleted { add { _onCompletedEventHandlers.Add(value); } remove { _onCompletedEventHandlers.Remove(value); } } public override void Dispose() { _onCompletedEventHandlers.Clear(); _onErrorEventHandlers.Clear(); base.Dispose(); }
在CommProtocol通訊協議類中,每一個RoundTrip執行完成后,都將調用Dispose方法。
try { try { Monitor.Exit(_queue.SyncRoot); OnRoundTripStartingHandler(this, new RoundTripEventArgs() { RoundTrip = roundTrip }); roundTrip.Start(); } catch (ThreadAbortException) { Trace("通訊線程被終止。"); throw; } catch (CommStreamException ex) // 無法獲取Stream的時候,直接退出??需要加一個標志位 // 需要拋出事件,通知后續處理,如將RoundTrip另存 { _exited = true; roundTrip.Trace("會話失敗,因為:鏈路已經關閉。"); _log.Error(string.Format("Start the round trip '{0}' error.", roundTrip), ex); } catch (Exception ex) { string error = GetErrorMessage(ex); roundTrip.Trace(string.Format("會話失敗,因為:{0}。", error)); _log.Error(string.Format("Start the round trip '{0}' error.", roundTrip), ex); } // ...... // 執行完RoundTrip后,開始清理資源 roundTrip.Dispose(); } catch (ThreadAbortException) { Trace("通訊線程被終止。"); throw; } catch (Exception ex) { _log.Error("Unhandled exception in CommProtocol.", ex); }
4.3 單元測試
該通訊協議的單元測試,有三個步驟:(1)在靜態類中啟動OSGi.NET插件框架;(2)在Setup方法中啟動服務器,服務器IP為本機IP——127.0.0.1,然后創建一個TCP連接,模擬連接操作,首先先登錄;(3)執行一個RoundTrip測試,模擬服務器和GPRS連接客戶端的行為。
以下方法用於啟動OSGi.NET插件框架。
[TestFixture] public partial class ProtocolTest { private HttpCommServer _commServer; private TcpClient _tcpClient; private CommProtocol _currentSession; private CommStreamAdapter _clientStremAdapter; public AutoResetEvent AutoResetEvent { get; set; } public const string BuildingId = "b001"; public const string GatewayId = "g001"; static ProtocolTest() { // 加載插件運行時,准備運行環境 if (BundleRuntime.Instance == null) { BundleRuntime bundleRuntime = new BundleRuntime("../../../"); bundleRuntime.Start(); } } }
啟動通訊服務器並模擬用戶登錄,_commServer為服務器,_tcpClient為模擬客戶端連接,_clientStreamAdapter為客戶端連接適配器。
[SetUp] public void Setup() { AutoResetEvent = new AutoResetEvent(false); MessageConstants.GprsMessageConstants.Timeout = new TimeSpan(0, 0, 5); MessageConstants.GprsMessageConstants.RetryTimesOnTimeout = 1; _commServer = new HttpCommServer("127.0.0.1", 39999, MessageConstants.GprsMessageConstants); _commServer.OnTraceMessageAdded += (sender, e) => { Debug.WriteLine(e.Message); }; _commServer.Start(); _tcpClient = new TcpClient(); _tcpClient.Connect("127.0.0.1", 39999); _clientStremAdapter = new CommStreamAdapter(_commServer.MessageConstants, _tcpClient); _clientStremAdapter.ParentTracker = _commServer; var request = ValidateRequestPassiveMessage.New(BuildingId, GatewayId); _clientStremAdapter.SendRawMessage(request.ToContent(), request.ToXmlContent()); var prefix = ProtocolUtility.BytesToHexString(MessageConstants.XmlMessagePrefixBytes); var root = ProtocolUtility.BytesToHexString(MessageConstants.XmlMessageRootStartBytes); var common = ProtocolUtility.BytesToHexString(MessageConstants.XmlMessageCommonStartBytes); var commonend = ProtocolUtility.BytesToHexString(MessageConstants.XmlMessageCommonEndBytes); var rootend = ProtocolUtility.BytesToHexString(MessageConstants.XmlMessageRootEndBytes); MessageHeader header; var messageContent = _clientStremAdapter.ReceiveOneRawMessage(out header); Assert.AreEqual(header.BuildingId, BuildingId); Assert.AreEqual(header.GatewayId, GatewayId); Assert.AreEqual(header.MessageType, StringEnum.GetStringValue(MessageType.Validate_Sequence)); var sequenceMessage = MessageSerialiser.DeserializeRaw<ValidateSequenceActiveMessage>(messageContent); string md5 = CreateSequenceAndHash(sequenceMessage.Body.Sequence); var md5Message = ValidateMd5PassiveMessage.New(BuildingId, GatewayId, md5); _clientStremAdapter.SendRawMessage(md5Message.ToContent(), md5Message.ToXmlContent()); messageContent = _clientStremAdapter.ReceiveOneRawMessage(out header); Assert.AreEqual(header.BuildingId, BuildingId); Assert.AreEqual(header.GatewayId, GatewayId); Assert.AreEqual(header.MessageType, StringEnum.GetStringValue(MessageType.Validate_Result)); var resultMessage = MessageSerialiser.DeserializeRaw<ValidateResultActiveMessage>(messageContent); Assert.AreEqual(resultMessage.Body.Result, "pass"); while (_currentSession == null) { _currentSession = _commServer.Sessions.Find(s => s.BuildingId.Equals(BuildingId) && s.GatewayId.Equals(GatewayId)); Thread.Sleep(1000); } }
接着就可以來定義一個測試。這個測試在OnMessageSend事件中,客戶端將模擬通訊協議,發送一個響應消息。由於通訊過程是基於異步方式,我們需要使用AutoResetEvent來等待對話完成信號。等對話執行完成時,再來檢查結果。
[Test] public void ConfigRoundTrip() { bool completed = false; Exception ex = null; MessageHeader receivedMessageHeader = null; ConfigActiveMessage receivedMessage = null; _currentSession.Config(10, (sender, e) => { var configMessage = _clientStremAdapter.ReceiveOneRawMessage(out receivedMessageHeader); receivedMessage = MessageSerialiser.DeserializeRaw<ConfigActiveMessage>(configMessage); var configAckMessage = ConfigAckPassiveMessage.New(BuildingId, GatewayId); _clientStremAdapter.SendRawMessage(configAckMessage.ToContent(), configAckMessage.ToXmlContent()); }, (sender, e) => { ex = e.Exception; completed = true; AutoResetEvent.Set(); }, (sender, e) => { ex = e.Exception; completed = false; AutoResetEvent.Set(); }); AutoResetEvent.WaitOne(); Assert.IsTrue(completed); Assert.AreEqual(receivedMessageHeader.MessageType, StringEnum.GetStringValue(MessageType.Config_Period)); Assert.AreEqual(receivedMessage.Body.Period, 10); }
以下是單元測試的輸出消息。
------ Test started: Assembly: UIShell.EcmCommServerService.dll ------
[Id:1, 2013-07-07 19:17:16]接收到來自IP地址'127.0.0.1'的連接。
[Id:1, 2013-07-07 19:17:16]開始清空緩沖區。
[Id:1, 2013-07-07 19:17:16]清空緩沖區成功。
[Id:1, 2013-07-07 19:17:16]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>request</type></common><id_validate operation="request"></id_validate></root>
[Id:1, 2013-07-07 19:17:16]正在讀取消息,目前沒有可用數據,等待數據。
[Id:1, 2013-07-07 19:17:16]登錄組合會話開始。
[Id:1, 2013-07-07 19:17:16]開始嘗試與集中器N/A進行被動式會話。
[Id:4, 2013-07-07 19:17:16]接收到消息,消息頭為:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>request</type></common>。
[Id:1, 2013-07-07 19:17:16]接收到消息:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>request</type></common><id_validate operation="request"></id_validate></root>
[Id:1, 2013-07-07 19:17:16]開始發送的響應消息:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>sequence</type></common><id_validate operation="sequence"><sequence>14b12261-5182-47a6-bdfa-cf21e4e5cfd7-bcc63da5-5694-427f-b358-9a113125f74d</sequence></id_validate></root>
[Id:4, 2013-07-07 19:17:16]開始清空緩沖區。
[Id:4, 2013-07-07 19:17:16]清空緩沖區成功。
[Id:4, 2013-07-07 19:17:16]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>sequence</type></common><id_validate operation="sequence"><sequence>14b12261-5182-47a6-bdfa-cf21e4e5cfd7-bcc63da5-5694-427f-b358-9a113125f74d</sequence></id_validate></root>
[Id:1, 2013-07-07 19:17:16]與集中器g001進行被動式會話成功。
[Id:1, 2013-07-07 19:17:16]收到請求消息並發送序列'14b12261-5182-47a6-bdfa-cf21e4e5cfd7-bcc63da5-5694-427f-b358-9a113125f74d',該序列計算的MD5值為'8224D3FC5FCC21E45E82FF5F9AB364CD'。
[Id:1, 2013-07-07 19:17:16]開始嘗試與集中器g001進行被動式會話。
[Id:6, 2013-07-07 19:17:16]共享緩沖區的消息數量:0。
[Id:6, 2013-07-07 19:17:16]正在讀取消息,目前沒有可用數據,等待數據。
[Id:1, 2013-07-07 19:17:19]接收到消息,消息頭為:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>sequence</type></common>。
[Id:1, 2013-07-07 19:17:19]開始清空緩沖區。
[Id:1, 2013-07-07 19:17:19]清空緩沖區成功。
[Id:1, 2013-07-07 19:17:19]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>md5</type></common><id_validate operation="md5"><md5>8224D3FC5FCC21E45E82FF5F9AB364CD</md5></id_validate></root>
[Id:1, 2013-07-07 19:17:19]正在讀取消息,目前沒有可用數據,等待數據。
[Id:6, 2013-07-07 19:17:19]接收到消息,消息頭為:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>md5</type></common>。
[Id:1, 2013-07-07 19:17:19]接收到消息:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>md5</type></common><id_validate operation="md5"><md5>8224D3FC5FCC21E45E82FF5F9AB364CD</md5></id_validate></root>
[Id:1, 2013-07-07 19:17:19]開始發送的響應消息:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>result</type></common><id_validate operation="result"><result>pass</result></id_validate></root>
[Id:6, 2013-07-07 19:17:19]開始清空緩沖區。
[Id:6, 2013-07-07 19:17:19]清空緩沖區成功。
[Id:6, 2013-07-07 19:17:19]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>result</type></common><id_validate operation="result"><result>pass</result></id_validate></root>
[Id:1, 2013-07-07 19:17:19]與集中器g001進行被動式會話成功。
[Id:1, 2013-07-07 19:17:19]登錄組合會話完成,登錄結果為:成功。
[Id:1, 2013-07-07 19:17:19]為采集器'g001'創建通訊會話,目前會話數目為'1'。
[Id:13, 2013-07-07 19:17:33]開始與集中器g001會話。
[Id:13, 2013-07-07 19:17:33]開始清空緩沖區。
[Id:13, 2013-07-07 19:17:33]清空緩沖區成功。
[Id:13, 2013-07-07 19:17:33]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>period</type></common><config operation="period"><period>10</period></config></root>
[Id:1, 2013-07-07 19:17:36]接收到消息,消息頭為:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>pack_lost</type></common>。
[Id:1, 2013-07-07 19:17:36]接收到消息,消息頭為:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>pack_lost</type></common>。
[Id:1, 2013-07-07 19:17:36]接收到消息,消息頭為:<?xml version="1.0" encoding="utf-8"?><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>period</type></common>。
[Id:1, 2013-07-07 19:17:44]開始清空緩沖區。
[Id:1, 2013-07-07 19:17:44]清空緩沖區成功。
[Id:1, 2013-07-07 19:17:44]開始發送命令:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>period_ack</type></common><config operation="period_ack"></config></root>
[Id:13, 2013-07-07 19:17:46]開始第1次消息接收。
[Id:13, 2013-07-07 19:17:46]共享緩沖區的消息數量:0。
[Id:1, 2013-07-07 19:17:46]開始檢查被動通訊。
[Id:1, 2013-07-07 19:17:49]通訊鏈路沒有可用數據。
[Id:13, 2013-07-07 19:17:46]當前會話'未知'接收了1個響應消息,詳細如下:
[Id:13, 2013-07-07 19:17:46]響應消息:<?xml version="1.0" encoding="utf-8"?><root><common><building_id>b001</building_id><gateway_id>g001</gateway_id><type>period_ack</type></common><config operation="period_ack"></config></root>
[Id:13, 2013-07-07 19:17:46]與集中器g001會話成功。
[Id:1, 2013-07-07 19:17:49]檢查被動通訊完成。
[Id:1, 2013-07-07 19:17:50]通訊線程被終止。
對於被動對話,其測試方法需要稍作改變,因為被動消息的發起在在通訊會話檢測到有被動消息時才會創建一個被動對話的,因此,我們首先需要先模擬客戶端發送一條被動消息,並監聽當前會話的OnRoundTripStarted事件,如下所示。
[Test] public void HeartBeatPassiveRoundTrip() { bool completed = false; Exception ex = null; HeartBeatTimeActiveMessage responseMessage = null; _currentSession.OnRoundTripStarted += (sender, e) => { if (e.RoundTrip is HeartBeatPassiveRoundTrip) { var roundTrip = e.RoundTrip as HeartBeatPassiveRoundTrip; responseMessage = roundTrip.ResponsedMessage; ex = e.Exception; completed = true; AutoResetEvent.Set(); } }; var heartbeatMessage = HeartBeatNotifyPassiveMessage.New(BuildingId, GatewayId); _clientStremAdapter.SendRawMessage(heartbeatMessage.ToContent(), heartbeatMessage.ToXmlContent()); AutoResetEvent.WaitOne(); Assert.IsTrue(completed); Assert.NotNull(responseMessage); Assert.AreEqual(responseMessage.Header.MessageType, StringEnum.GetStringValue(MessageType.HeartBeat_Time)); }
4.4 程序的部署與升級
通訊服務器軟件由軟件團隊開發,硬件團隊測試,並且需要部署到多個點。為了避免手工部署和升級麻煩,整個通訊服務器基於開放工廠(http://www.iopenworks.com/)平台開發,程序使用開放工廠提供的OSGi.NET插件框架(http://www.iopenworks.com/Products/SDKDownload)構建,使用開放工廠私有插件倉庫實現應用程序的自動升級。
以下是整個通訊服務器的代碼。核心是一個CommServerService插件,實現了通訊服務。
整個應用程序由12個插件構成,其它插件均為開放工廠提供的插件和Web界面應用程序插件。
在發布通訊服務插件的時候,右鍵,點擊“發布插件”菜單,即可將插件及其升級版本發布到插件倉庫。
接着,在后面的頁面中輸入私有倉庫用戶名/密碼即可發布。
發布完成后,你可以在私有倉庫中,查看到該插件的發布版本。
發布完成后,進入該系統插件管理頁面的私有倉庫,即可下載到最新升級包。如下所示。
下面就可以下載安裝升級包了。
好了,整個GPRS通訊服務器的構建方法就分享到這。
5 總結
(1)為通訊協議設計了一個好的模型,這個模型以消息、對話為基礎;
(2)采用了不錯的架構,基於事件 + 分層,事件非常適用於異步處理和解耦,分層易於代理的理解和組織;
(3)非常OO,整個設計采用比較優雅的面向對象設計,遵守OO的設計原則SRP、OCP等;
(4)使用插件化的方法,進行模塊化開發;
(5)引用單元測試保證通訊協議可測試性,避免與硬件聯調。