當同一個系統的兩個客戶端A、B相互發送消息給對方時,如果它們之間存在P2P通道,那么消息傳送的路徑就有兩種:直接經P2P通道傳送、或者經服務器中轉。如下圖所示:
通常就一般應用而言,如果P2P通道能夠成功創建(即所謂的打洞成功),A和B之間的所有消息將直接走P2P通道,這樣可以有效節省服務器的帶寬和降低服務器的負載。這種模型即是所謂的“P2P通道優先”模型,也是最簡單的通道選擇模型。
一.通道質量優先模型
然而,有些系統可能不能就如此簡單的處理,最簡單的例子,如果A和B之間傳遞的某些類型的消息必需讓服務器監控到,那么,這樣的消息就必需經過服務器中轉。接下來,我們討論一種較為復雜的情況。比如,在網絡語音對話系統中,通道的質量直接決定着用戶體驗的好壞。我們希望,在這種系統中,語音數據需要始終經由兩條通道中的那個質量較高的通道進行傳送。這種策略就是所謂的“通道質量優先”模型。
“通道質量優先”模型理解起來很簡單,但是在實際中實現時,卻還是很有難度的。通常有兩種實現方式:
(1)定時檢測、比較通道延時,並自動切換通道。
(2)由上層應用決定何時切換通道。一般而言,是當應用發現當前使用的通道不滿足要求時,就主動要求切換到另外一條通道。
二.模型實現
下面,我們就基於ESFramework提供的通信功能,來實現這兩種方式。我們使用AgileP2PCustomizeOutter類來封裝它,並可通過屬性控制來啟用哪種方式。
public class AgileP2PCustomizeOutter:IEngineActor { //字典。userID - 當前選擇的通道(如果為true,表示P2P通道;否則是經服務器中轉)? private ObjectManager<string, bool> channelStateManager = new ObjectManager<string, bool>(); private ICustomizeOutter customizeOutter; //消息發送器 private IP2PController p2PController;//P2P控制器 private IBasicOutter basicOutter; //心跳發送器 private AgileCycleEngine agileCycleEngine; //定時檢測引擎 #region PingTestSpanInSecs private int pingTestSpanInSecs = 60; /// <summary> /// 定時進行ping測試以及自動切換通道的時間間隔,單位:秒。默認值為60。 /// 如果設置為小於等於0,則表示不進行定時ping測試,也不會自動切換通道。 /// </summary> public int PingTestSpanInSecs { get { return pingTestSpanInSecs; } set { pingTestSpanInSecs = value; } } #endregion #region Initialize public void Initialize(ICustomizeOutter _customizeOutter, IP2PController _p2PController, IBasicOutter _basicOutter) { this.customizeOutter = _customizeOutter; this.p2PController = _p2PController; this.basicOutter = _basicOutter; this.p2PController.P2PChannelOpened += new ESBasic.CbGeneric<P2PChannelState>(p2PController_P2PChannelOpened); this.p2PController.P2PChannelClosed += new ESBasic.CbGeneric<P2PChannelState>(p2PController_P2PChannelClosed); this.p2PController.AllP2PChannelClosed += new ESBasic.CbGeneric(p2PController_AllP2PChannelClosed); Dictionary<string, P2PChannelState> dic = this.p2PController.GetP2PChannelState(); foreach (P2PChannelState state in dic.Values) { bool p2pFaster = this.TestSpeed(state.DestUserID); this.channelStateManager.Add(state.DestUserID, p2pFaster); } if (this.pingTestSpanInSecs > 0) { this.agileCycleEngine = new AgileCycleEngine(this); this.agileCycleEngine.DetectSpanInSecs = this.pingTestSpanInSecs; this.agileCycleEngine.Start(); } } //定時執行,當前客戶端到其它客戶端之間的通道選擇 public bool EngineAction() { foreach (string userID in this.channelStateManager.GetKeyList()) { bool p2pFaster = this.TestSpeed(userID); this.channelStateManager.Add(userID, p2pFaster); } return true; } void p2PController_AllP2PChannelClosed() { this.channelStateManager.Clear(); } void p2PController_P2PChannelClosed(P2PChannelState state) { this.channelStateManager.Remove(state.DestUserID); } void p2PController_P2PChannelOpened(P2PChannelState state) { bool p2pFaster = this.TestSpeed(state.DestUserID); this.channelStateManager.Add(state.DestUserID, p2pFaster); } #endregion #region TestSpeed /// <summary> /// 定時測試 /// </summary> private bool TestSpeed(string userID) { try { int transfer = this.basicOutter.PingByServer(userID); int p2p = this.basicOutter.PingByP2PChannel(userID); return p2p <= transfer; } catch (Exception ee) { return false; } } #endregion /// <summary> /// 手動切換通道。 /// </summary> public void SwitchChannel(string destUserID) { if (this.channelStateManager.Contains(destUserID)) { bool p2p = this.channelStateManager.Get(destUserID); this.channelStateManager.Add(destUserID, !p2p); } } /// <summary> /// 到目標用戶是否使用的是P2P通道。 /// </summary> public bool IsUsingP2PChannel(string destUserID) { return this.channelStateManager.Get(destUserID); } public bool IsExistP2PChannel(string destUserID) { return this.channelStateManager.Contains(destUserID); } /// <summary> /// 向在線用戶發送信息。 /// </summary> /// <param name="targetUserID">接收消息的目標用戶ID。</param> /// <param name="informationType">自定義信息類型</param> /// <param name="post">是否采用Post模式發送消息</param> /// <param name="action">當通道繁忙時所采取的動作</param> public void Send(string targetUserID, int informationType, byte[] info, bool post, ActionTypeOnChannelIsBusy action) { bool p2pFaster = this.channelStateManager.Get(targetUserID); ChannelMode mode = p2pFaster ? ChannelMode.ByP2PChannel : ChannelMode.TransferByServer; this.customizeOutter.Send(targetUserID, informationType, info, post, action, mode); } /// <summary> /// 向在線用戶或服務器發送大的數據塊信息。直到數據發送完畢,該方法才會返回。如果擔心長時間阻塞調用線程,可考慮異步調用本方法。 /// </summary> /// <param name="targetUserID">接收消息的目標用戶ID。如果為null,表示接收者為服務器。</param> /// <param name="informationType">自定義信息類型</param> /// <param name="blobInfo">大的數據塊信息</param> /// <param name="fragmentSize">分片傳遞時,片段的大小</param> public void SendBlob(string targetUserID, int informationType, byte[] blobInfo, int fragmentSize) { bool p2pFaster = this.channelStateManager.Get(targetUserID); ChannelMode mode = p2pFaster ? ChannelMode.ByP2PChannel : ChannelMode.TransferByServer; this.customizeOutter.SendBlob(targetUserID, informationType, blobInfo, fragmentSize, mode); } }
現在,我們對上面的實現簡單解釋一下。
(1)由於當前客戶端可能會與多個其它的客戶端進行通信,而與每一個其它的客戶端之間的通信都有通道選擇的問題,所以需要一個字典ObjectManager將它們管理起來。
(2)當某個P2P通道創建成功時,將進行首次ping比較,並將結果記錄到字典中。
(3)定時引擎每隔60秒,分別針對每個其它客戶端進行通道檢測比較,自動選擇ping值小的那個通道。
(4)當我們將PingTestSpanInSecs設為0時,就可以使用SwitchChannel方法來手動切換通道,即實現了上述的方式2。
(5)我們最終的目的是實現Send方法和SendBlob方法,之后,就可以使用AgileP2PCustomizeOutter類來替換ICustomizeOutter發送消息。
三.方式選擇
上面講到“通道質量優先”模型的兩種實現方式,那么在實際的應用中,如何進行選擇了?
1.ping檢測比較,自動切換
就這種方式而言,其缺陷在於,在客戶端之間需要進行高頻通信的系統中,ping檢測可能是非常不准確的,甚至是錯誤的。
比如,在實時視頻對話系統中,其對帶寬的要求是比較高的,假設,現在所有的視頻數據走的都是P2P通道,那么P2P通道就非常忙碌,而經服務器中轉的通道幾乎就是空閑的。所以,當下一次定時ping檢測到來時,P2P通道的ping值就會比實際的大。從而導致判斷失誤,而發生錯誤的自動切換。
2.手動切換
對於剛才視頻對話的例子,使用手動切換可能是更好的選擇,由應用根據上層的實際效果來決定是否需要切換通道。比如,還以視頻對話系統為例,應用可以根據信息接收方的定時反饋(在一段時間內,缺少音/視頻包的個數,音/視頻包的總延時等統計信息)來決定是否要切換到另外一個通道。這種方式更簡潔描述可以表達為:如果當前通道質量已達到應用需求,即使另一個通道更快更穩定,也不進行切換;如果當前通道質量達不到應用需求,則切換到另一個通道(有可能另一個通道的質量更糟糕)。
本文只是簡單地引出通道選擇模型的問題,實際上,這個問題是相當復雜的,特別是在一些通信要求很高的項目中,而且,如果將廣播消息的通道模型考慮進來就更麻煩了,有興趣的朋友可以留言進行更深入的討論。