P2P直連?經服務器中轉?


      當同一個系統的兩個客戶端A、B相互發送消息給對方時,如果它們之間存在P2P通道,那么消息傳送的路徑就有兩種:直接經P2P通道傳送、或者經服務器中轉。如下圖所示:

     

      通常就一般應用而言,如果P2P通道能夠成功創建(即所謂的打洞成功),A和B之間的所有消息將直接走P2P通道,這樣可以有效節省服務器的帶寬和降低服務器的負載。這種模型即是所謂的“P2P通道優先”模型,也是最簡單的通道選擇模型。

一.通道質量優先模型

      然而,有些系統可能不能就如此簡單的處理,最簡單的例子,如果A和B之間傳遞的某些類型的消息必需讓服務器監控到,那么,這樣的消息就必需經過服務器中轉。接下來,我們討論一種較為復雜的情況。比如,在網絡語音對話系統中,通道的質量直接決定着用戶體驗的好壞。我們希望,在這種系統中,語音數據需要始終經由兩條通道中的那個質量較高的通道進行傳送。這種策略就是所謂的“通道質量優先”模型。

      “通道質量優先”模型理解起來很簡單,但是在實際中實現時,卻還是很有難度的。通常有兩種實現方式:

(1)定時檢測、比較通道延時,並自動切換通道。

(2)由上層應用決定何時切換通道。一般而言,是當應用發現當前使用的通道不滿足要求時,就主動要求切換到另外一條通道。

二.模型實現

      下面,我們就基於ESFramework提供的通信功能,來實現這兩種方式。我們使用AgileP2PCustomizeOutter類來封裝它,並可通過屬性控制來啟用哪種方式。 

public class AgileP2PCustomizeOutter:IEngineActor
    {
        //字典。userID - 當前選擇的通道(如果為true,表示P2P通道;否則是經服務器中轉)?
        private ObjectManager<string, bool> channelStateManager = new ObjectManager<string, bool>(); 
       private ICustomizeOutter customizeOutter; //消息發送器
        private IP2PController p2PController;//P2P控制器
        private IBasicOutter basicOutter; //心跳發送器
        private AgileCycleEngine agileCycleEngine; //定時檢測引擎

        #region PingTestSpanInSecs
        private int pingTestSpanInSecs = 60;
        /// <summary>
        /// 定時進行ping測試以及自動切換通道的時間間隔,單位:秒。默認值為60。
        /// 如果設置為小於等於0,則表示不進行定時ping測試,也不會自動切換通道。
        /// </summary>
        public int PingTestSpanInSecs
        {
            get { return pingTestSpanInSecs; }
            set { pingTestSpanInSecs = value; }
        }    
        #endregion   

        #region Initialize
        public void Initialize(ICustomizeOutter _customizeOutter, IP2PController _p2PController, IBasicOutter _basicOutter)
        {
            this.customizeOutter = _customizeOutter;
            this.p2PController = _p2PController;
            this.basicOutter = _basicOutter;

            this.p2PController.P2PChannelOpened += new ESBasic.CbGeneric<P2PChannelState>(p2PController_P2PChannelOpened);
            this.p2PController.P2PChannelClosed += new ESBasic.CbGeneric<P2PChannelState>(p2PController_P2PChannelClosed);
            this.p2PController.AllP2PChannelClosed += new ESBasic.CbGeneric(p2PController_AllP2PChannelClosed);
            Dictionary<string, P2PChannelState> dic = this.p2PController.GetP2PChannelState();
            foreach (P2PChannelState state in dic.Values)
            {
                bool p2pFaster = this.TestSpeed(state.DestUserID);
                this.channelStateManager.Add(state.DestUserID, p2pFaster);
            }

            if (this.pingTestSpanInSecs > 0)
            {
                this.agileCycleEngine = new AgileCycleEngine(this);
                this.agileCycleEngine.DetectSpanInSecs = this.pingTestSpanInSecs;
                this.agileCycleEngine.Start();
            }
        }

        //定時執行,當前客戶端到其它客戶端之間的通道選擇
        public bool EngineAction()
        {
            foreach (string userID in this.channelStateManager.GetKeyList())
            {
                bool p2pFaster = this.TestSpeed(userID);
                this.channelStateManager.Add(userID, p2pFaster);
            }

            return true;
        }

        void p2PController_AllP2PChannelClosed()
        {
            this.channelStateManager.Clear();
        }

        void p2PController_P2PChannelClosed(P2PChannelState state)
        {
            this.channelStateManager.Remove(state.DestUserID);
        }

        void p2PController_P2PChannelOpened(P2PChannelState state)
        {
            bool p2pFaster = this.TestSpeed(state.DestUserID);
            this.channelStateManager.Add(state.DestUserID, p2pFaster);
        }
        #endregion

        #region TestSpeed
        /// <summary>
        /// 定時測試
        /// </summary>     
        private bool TestSpeed(string userID)
        {
            try
            {
                int transfer = this.basicOutter.PingByServer(userID);
                int p2p = this.basicOutter.PingByP2PChannel(userID);
                return p2p <= transfer;
            }
            catch (Exception ee)
            {
                return false;
            }
        }
        #endregion

        /// <summary>
        /// 手動切換通道。
        /// </summary>        
        public void SwitchChannel(string destUserID)
        {
            if (this.channelStateManager.Contains(destUserID))
            {
                bool p2p = this.channelStateManager.Get(destUserID);
                this.channelStateManager.Add(destUserID, !p2p);
            }
        }

        /// <summary>
        /// 到目標用戶是否使用的是P2P通道。
        /// </summary>        
        public bool IsUsingP2PChannel(string destUserID)
        {
            return this.channelStateManager.Get(destUserID);
        }

        public bool IsExistP2PChannel(string destUserID)
        {
            return this.channelStateManager.Contains(destUserID);
        }

        /// <summary>
        /// 向在線用戶發送信息。
        /// </summary>
        /// <param name="targetUserID">接收消息的目標用戶ID。</param>
        /// <param name="informationType">自定義信息類型</param>
        /// <param name="post">是否采用Post模式發送消息</param>  
        /// <param name="action">當通道繁忙時所采取的動作</param>  
        public void Send(string targetUserID, int informationType, byte[] info, bool post, ActionTypeOnChannelIsBusy action)
        {
            bool p2pFaster = this.channelStateManager.Get(targetUserID);
            ChannelMode mode = p2pFaster ? ChannelMode.ByP2PChannel : ChannelMode.TransferByServer;
            this.customizeOutter.Send(targetUserID, informationType, info, post, action, mode);
        }

        /// <summary>
        /// 向在線用戶或服務器發送大的數據塊信息。直到數據發送完畢,該方法才會返回。如果擔心長時間阻塞調用線程,可考慮異步調用本方法。
        /// </summary>
        /// <param name="targetUserID">接收消息的目標用戶ID。如果為null,表示接收者為服務器。</param>
        /// <param name="informationType">自定義信息類型</param>
        /// <param name="blobInfo">大的數據塊信息</param>  
        /// <param name="fragmentSize">分片傳遞時,片段的大小</param>  
        public void SendBlob(string targetUserID, int informationType, byte[] blobInfo, int fragmentSize)
        {
            bool p2pFaster = this.channelStateManager.Get(targetUserID);
            ChannelMode mode = p2pFaster ? ChannelMode.ByP2PChannel : ChannelMode.TransferByServer;
            this.customizeOutter.SendBlob(targetUserID, informationType, blobInfo, fragmentSize, mode);
        }       
    }

       現在,我們對上面的實現簡單解釋一下。

(1)由於當前客戶端可能會與多個其它的客戶端進行通信,而與每一個其它的客戶端之間的通信都有通道選擇的問題,所以需要一個字典ObjectManager將它們管理起來。

(2)當某個P2P通道創建成功時,將進行首次ping比較,並將結果記錄到字典中。

(3)定時引擎每隔60秒,分別針對每個其它客戶端進行通道檢測比較,自動選擇ping值小的那個通道。

(4)當我們將PingTestSpanInSecs設為0時,就可以使用SwitchChannel方法來手動切換通道,即實現了上述的方式2。

(5)我們最終的目的是實現Send方法和SendBlob方法,之后,就可以使用AgileP2PCustomizeOutter類來替換ICustomizeOutter發送消息。

三.方式選擇

      上面講到“通道質量優先”模型的兩種實現方式,那么在實際的應用中,如何進行選擇了?

1.ping檢測比較,自動切換     

      就這種方式而言,其缺陷在於,在客戶端之間需要進行高頻通信的系統中,ping檢測可能是非常不准確的,甚至是錯誤的。

      比如,在實時視頻對話系統中,其對帶寬的要求是比較高的,假設,現在所有的視頻數據走的都是P2P通道,那么P2P通道就非常忙碌,而經服務器中轉的通道幾乎就是空閑的。所以,當下一次定時ping檢測到來時,P2P通道的ping值就會比實際的大。從而導致判斷失誤,而發生錯誤的自動切換。

2.手動切換

      對於剛才視頻對話的例子,使用手動切換可能是更好的選擇,由應用根據上層的實際效果來決定是否需要切換通道。比如,還以視頻對話系統為例,應用可以根據信息接收方的定時反饋(在一段時間內,缺少音/視頻包的個數,音/視頻包的總延時等統計信息)來決定是否要切換到另外一個通道。這種方式更簡潔描述可以表達為:如果當前通道質量已達到應用需求,即使另一個通道更快更穩定,也不進行切換;如果當前通道質量達不到應用需求,則切換到另一個通道(有可能另一個通道的質量更糟糕)。 

      本文只是簡單地引出通道選擇模型的問題,實際上,這個問題是相當復雜的,特別是在一些通信要求很高的項目中,而且,如果將廣播消息的通道模型考慮進來就更麻煩了,有興趣的朋友可以留言進行更深入的討論。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM