分享一個與硬件通訊的分布式監控與遠程控制程序的設計(下:通訊協議設計與實現)


4 基於RoundTrip(往返)的通訊協議設計

通訊服務器插件的核心為3部分:(1)與通訊方式、業務邏輯無關的通訊協議實現;(2)和通訊方式、業務邏輯有關的通訊業務邏輯的實現;(3)遠程通訊消息隊列。在這里我將重點描述通訊協議的實現。這個通訊協議的實現比較靈巧。

4.1 通訊協議基本單元——消息

通訊協議的通訊單元是消息,以下是來自硬件開發工程師編寫的協議,消息包由前導符、起始符、消息頭、校驗碼、消息體、結束符等部分組成。不同的通訊指令,發出的消息和接收到消息均不相同。

clip_image002

通訊協議必須能夠發出正確的消息和解析響應的消息包,此外,硬件能接受的消息是字節格式,而通訊服務器軟件能夠正確識別的則是各個有意義的字段。為此,我們為消息設計了如下的基類。消息較小的單元是一個MessagePart,它提供了ToContent和ToMessage方法分別用於轉換成字節碼和字符串,此外,它還定義了TryParse方法用於將字節碼解析成有意義的MessagePart對象。這里定義了ParseMessageException異常,當消息解析失敗時,拋出該異常。下面是消息頭、消息體以及消息基類的定義。消息基類由前綴、起始、頭、體和后綴部分組成。

image

接着我們根據硬件開發工程師提供的SCATA 3.0協議,定義與通訊協議相關的消息基類Scata30Message。這個消息提供了一個默認的消息頭的實現,但是消息體則需要根據指令進一步實現。下圖是通訊協議涉及的大部分消息體的實現,消息體基本都是一對的,即消息體和響應消息體。

clip_image005

消息體一般是指服務器發出給硬件的指令,這樣的消息體需要構造所有的字段,並要實現ToContent方法,將消息轉換成字節碼,發送給硬件;而響應消息一般是由硬件發送給服務器的消息,它至少需要實現TryParse方法,將硬件字節碼解析成有意義的字段,供業務邏輯層訪問。

下面是一個消息的定義。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using UIShell.CommServerService.Utility;
using System.ComponentModel;

namespace UIShell.CommServerService.Protocol.Scata30.Message
{
    [Description("讀取單一表")]
    public class Scata30ReadMeterMessageBody : Scata30MessageBody
    {
        public byte MeterProtocolCategory;
        public byte Channel;
        public byte[] MeterAddressBCD;
        public long MeterAddress;

        internal Scata30ReadMeterMessageBody()
        { 
        }

        public Scata30ReadMeterMessageBody(byte meterProtocol, byte channel, 
long meterAddress)
        {
            MeterProtocolCategory = meterProtocol;
            Channel = channel;
            MeterAddress = meterAddress;
            MeterAddressBCD = ProtocolUtility.MeterAddressFromLong(meterAddress, true);
        }

        protected override bool TryParseWithoutCheckCode(byte[] bodyContent)
        {
            throw new NotImplementedException();
        }

        protected override byte[] ToContentWithoutCheckCode()
        {
            return new byte[] { 
MeterProtocolCategory, Channel }.Concat(MeterAddressBCD).ToArray();
        }

        public override string ToString()
        {
            return string.Format("協議類型={0},通道號={1},表地址={2}", 
MeterProtocolCategory, Channel, MeterAddress);
        }
    }
}

  

下面則是響應消息的實現。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using UIShell.CommServerService.Utility;
using System.ComponentModel;

namespace UIShell.CommServerService.Protocol.Scata30.Message
{
    [Description("讀取單一表響應")]
    public class Scata30ReadMeterResponseMessageBody : Scata30MessageBody
    {
        public Scata30ResponseStatus ResponseStatus;
        /// <summary>
        /// 表數據,同讀取多表的數據類似。
        /// </summary>
        public byte[] MeterBodyContent;

        public Scata30ReadMeterResponseMessageBody()
        {
            
        }

        protected override bool TryParseWithoutCheckCode(byte[] bodyContent)
        {
            if (bodyContent == null || bodyContent.Length == 0)
            {
                _log.Error(string.Format(UIShell.CommServerService.Properties.Resources.
ParseMessageBodyFailed, ProtocolUtility.BytesToHexString(bodyContent)));
                return false;
            }

            if (bodyContent.Length == 1)
            {
                if (bodyContent[0] != (byte)Scata30ResponseStatus.Failed)
                {
_log.Error(string.Format(UIShell.CommServerService.Properties.Resources.
ParseMessageBodyFailed, ProtocolUtility.BytesToHexString(bodyContent)));
                    return false;
                }
                else
                {
                    ResponseStatus = (Scata30ResponseStatus)bodyContent[0];
                }
            }
            else
            {
                ResponseStatus = Scata30ResponseStatus.Success;
                MeterBodyContent = bodyContent;
            }
            return true;
        }

        protected override byte[] ToContentWithoutCheckCode()
        {
            if (ResponseStatus == Scata30ResponseStatus.Failed)
            {
                return new byte[] { (byte)ResponseStatus };
            }
            return MeterBodyContent;
        }

        public override string ToString()
        {
            return string.Format("狀態={0},表數據={1}", 
EnumDescriptionHelper.GetDescription(ResponseStatus), ProtocolUtility.BytesToHexString(MeterBodyContent));
        }
    }
}

  

4.2 通訊協議的組成——RoundTrip(往返)

通訊服務器與硬件的通訊過程是由一組的對話來實現的,每一組對話都是問答式的方式來完成。我們把一次問答式的對話用RoundTripBase這個類型來表示。問答式的對話又分成主動式(ActiveRoundTrip)和被動式(PassiveRoundTrip),即服務器發起然后硬件響應,或者硬件發起服務器響應。有時,一次問答式的對話可能需要由若干組的子對話來實現,我們稱其為組合對話(CompositeRoundTripBase)。有關通訊協議對話過程涉及的基類設計如下。

image

對話RoundTripBase的詳細設計如下所示,它由優先級、時間戳屬性組成,提供了Start方法表示會話開始,以及OnCompleted和OnError事件。RoundTripQueue則是對話隊列,它嚴格限制通訊協議每次只能執行一個RoundTrip,不能交叉運行,這個RoundTripQueue是一個線程安全的,因為通訊協議會被遠程通訊線程、協議線程、UI線程等線程來訪問。

clip_image008

4.3 協議的RoundTrip實現

在本系統中,我們使用SCATA 3.0通訊協議,這里我們實現了2個基類:Scata30ActiveRoundTrip和Scata30PassiveRoundTrip。

image

在Scata30ActiveRoundTrip中,它在Start方法中,將利用StreamAdapter來從通訊信道中獲取一條消息,一旦消息解析成功后,將發送響應消息包。這個對話,一旦中間發生錯誤或者超時,將重試若干次。同理,Scata30PassiveRoundTrip也是如此實現。

接下來,我們根據通訊協議,定義了如下的對話。

clip_image011

下面我們來看一個對話的實現。

using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Text;
 using UIShell.CommServerService.Protocol.Scata30.Message;
 using UIShell.CommServerService.Utility;
 using System.ComponentModel;
 
 namespace UIShell.CommServerService.Protocol.Scata30.RoundTrip
 {
     [Description("讀取指定時間點表數據")]
     public class Scata30ReadHistoricalMeterRoundTrip : Scata30HasNextActiveRoundTrip<Scata30ReadHistoricalMeterMessageBody, Scata30ReadHistoricalMeterResponseMessageBody>
     {
         public DateTime HistoricalDateTime;
 
         public Scata30ReadHistoricalMeterRoundTrip(
             ushort destinationAddress,
             ushort destinationZigbeeAddress,
             DateTime timeStamp,
             Scata30Protocol protocol)
             : base(destinationAddress, destinationZigbeeAddress, new Scata30Message<Scata30ReadHistoricalMeterMessageBody>(Scata30MessageType.ReadMeterByDate, protocol.MasterStationAddress, destinationAddress, 0, DateTime.Now, new Scata30ReadHistoricalMeterMessageBody(timeStamp)), Scata30MessageType.ReadMeterByDateResponse, protocol)
         {
             HistoricalDateTime = timeStamp;
         }
 
         public override void ReceiveResponseMessages()
         {
             base.ReceiveResponseMessages();
             foreach (var message in ReceivedResponseMessages)
             {
                 if (!message.Body.HistoricalDateTime.Equals(HistoricalDateTime))
                 {
                     _log.Error(string.Format("Read the historical meter content error since the date time mismatched. The require date time is '{0}', return by concentrator is '{1}'", HistoricalDateTime.ToString("yyyy-MM-dd HH:mm:ss"), message.Body.HistoricalDateTime.ToString("yyyy-MM-dd HH:mm:ss")));
                     // throw new Exception("Parse message error since historical date time mismatched.");
                 }
             }
         }
     }
 }

  

 

4.4 通訊協議的實現

通訊協議的實現類圖如下所示,由於通訊協議與通訊方式、業務邏輯無關,因此,在這里我們引入StreamAdapter和StreamProvider來屏蔽這些上下文。StreamAdapter的功能是獲取一條消息和發送一條消息,StreamProvider則是為不同通訊方式提供通訊流。

protocol

下面我來描述協議類的關鍵實現。協議類內部有一個線程來實現與硬件的通訊。這個線程會一直運行,然后從對話隊列中不停獲取RoundTrip,一旦獲取的RoundTrip不會空,則運行這個RoundTrip,否則線程進入休眠狀態。

public bool Start()
{
    if (_started)
    {
        return true;
    }

    FireOnStarting();

    try
    {
        CommStreamProvider.Start();
    }
    catch (Exception ex)
    {
        _log.Error("Start the communication provider failed.", ex);
        return false;
    }
            
    _thread = new Thread(() => {
        RoundTripBase roundTrip;
        while (!_exited)
        {
            Monitor.Enter(_queue.SyncRoot);

            roundTrip = Dequeue();
            if (roundTrip != null)
            {
                try
                {
                    Monitor.Exit(_queue.SyncRoot);
                    OnRoundTripStartingHandler(this, 
new RoundTripEventArgs() { RoundTrip = roundTrip });
                    roundTrip.Start();
                }
                catch (ThreadAbortException)
                {
                    Trace("通訊線程被終止。");
                    throw;
                }
                catch (Scata30StreamException ex) // 無法獲取Stream的時候,直接退出
                {
                    _exited = true;
                    roundTrip.Trace("會話失敗,因為:連接已經關閉。");
                }
                catch (Exception ex)
                {
                    string error = GetErrorMessage(ex);
                    roundTrip.Trace(string.Format("會話失敗,因為:{0}。", error));
                }
                if (!_exited)
                {
                    roundTrip.Trace(Environment.NewLine);
                    OnRoundTripStartedHandler(this, 
new RoundTripEventArgs() { RoundTrip = roundTrip });
                }
                else
                {
                    // 1 將當前失敗的RoundTrip保存入隊
                    FailedRoundTrips.Enqueue(roundTrip);
                    // 2 保存其它沒有處理的RoundTrip
                    do
                    {
                        roundTrip = _queue.Dequeue();
                        if (roundTrip != null)
                        {
                            FailedRoundTrips.Enqueue(roundTrip);
                        }
                    } while (roundTrip != null);
                    // 3 停止當前協議
                    Stop();
                }
                // 執行完RoundTrip后,開始清理資源
                roundTrip.Dispose();
            }
            else
            {
                Monitor.Exit(_queue.SyncRoot);
                OnIdleHandler(this, new RoundTripEventArgs());
                _autoResetEvent.WaitOne();
            }
        }
    });

    _thread.Start();
    _started = true;

    FireOnStarted();
    return true;
}

  

執行對話,是以異步的方式來進行,通過事件進行通知。如下所示。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using UIShell.CommServerService.Protocol.Scata30.RoundTrip; 
using UIShell.CommServerService.Protocol.Scata30.Message;

namespace UIShell.CommServerService.Protocol.Scata30 
{ 
    public partial class Scata30Protocol 
    { 
        public Scata30SetConcentratorTimeRoundTrip SetConcentratorTime( 
            ushort concentratorAddress, 
            ushort concentratorZigbeeAddress, 
            DateTime timeStamp, 
            EventHandler<RoundTripEventArgs> onMessageSend, 
            EventHandler<RoundTripEventArgs> onCompleted, 
            EventHandler<RoundTripEventArgs> onError) 
        { 
            var roundTrip = new Scata30SetConcentratorTimeRoundTrip( 
                concentratorAddress, 
                concentratorZigbeeAddress, 
                timeStamp, 
                this);

            if (onMessageSend != null) 
            { 
                roundTrip.OnMessageSend += onMessageSend; 
            } 
            if (onCompleted != null) 
            { 
                roundTrip.OnCompleted += onCompleted; 
            } 
            if (onError != null) 
            { 
                roundTrip.OnError += onError; 
            } 
            Enqueue(roundTrip); 
            return roundTrip; 
        } 
    } 
}

 

這個通訊協議的實現非常優雅,在維護的過程中,通訊指令的變更和通訊方式的轉變,都不需要再修改協議和RoundTrip本身,只需要對消息體進行變更並增加新的StreamProvider,並在上層的業務邏輯進行實現。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM