在廣播與P2P通道(上) -- 問題與方案 一文中,我們已經找到了最優的模型,即將廣播與P2P通道相結合的方案,這樣能使服務器的帶寬消耗降到最低,最大節省服務器的寬帶支出。當然,如果從零開始實現這種方案無疑是非常艱巨的,但基於ESFramework提供的通信功能和P2P功能來做,就不再那么遙不可及了。
1.P2P通道狀態
根據上文模型3的討論,要實現該模型,每個客戶端需要知道自己與哪些用戶創建了P2P通道,服務器也要知道每個客戶端已建立的P2P通道的狀態。
使用ESFramework,在客戶端已經可以通過IRapidPassiveEngine.P2PController接口知道當前客戶端與哪些其它客戶端成功建立了P2P通道,並且可以通過P2PController接口發起與新的客戶端建立新的P2P通道的嘗試。但在服務端,對於每個客戶端建立了哪些P2P通道,服務端是一無所知的。所以,基於ESFramework實現模型3的第一件事情,就是客戶端要實時把自己的P2P狀態變化報告給服務端,而服務端也要管理每個客戶端的P2P通道狀態。(注意。下面的所有實現,需要引用ESFramework.dll、ESPlus.dll、ESBasic.dll)
(1)P2PChannelManager
我們在服務端設計P2PChannelManager類來管理每個在線客戶端已成功創建的所有P2P通道。
public class P2PChannelManager { //key 表示P2P通道的起始點用戶ID,value 表示P2P通道的目的點用戶列表。(單向,因為某些P2P通道就是單向的) private SortedArray<string, SortedArray<string>> channels = new SortedArray<string, SortedArray<string>>(); public void Initialize(IUserManager userManager) { userManager.SomeOneDisconnected += new ESBasic.CbGeneric<UserData, ESFramework.Server.DisconnectedType>(userManager_SomeOneDisconnected); } void userManager_SomeOneDisconnected(UserData user, ESFramework.Server.DisconnectedType obj2) { this.channels.RemoveByKey(user.UserID); } public void Register(string startUserID, string destUserID) { if (!this.channels.ContainsKey(startUserID)) { this.channels.Add(startUserID, new SortedArray<string>()); } this.channels[startUserID].Add(destUserID); } public void Unregister(string startUserID, string destUserID) { if (this.channels.ContainsKey(startUserID)) { this.channels[startUserID].Remove(destUserID); } } public bool IsP2PChannelExist(string startUserID, string destUserID) { if (!this.channels.ContainsKey(startUserID)) { return false; } return this.channels[startUserID].Contains(destUserID); } }
P2PChannelManager提供了注冊P2P通道、注銷P2P通道、以及查詢P2P通道是否存在的方法。其內部使用類似字典的SortedArray來管理每個用戶的已經成功建立的P2P通道(即與哪些其它用戶打通了P2P)。另外,P2PChannelManager預定了IUserManager的SomeOneDisconnected事件,這樣,當某個用戶掉線時,就可以清除其所有的P2P狀態。因為,在ESFramework中,當客戶端與服務器的TCP連接斷開時,客戶端會自動關閉所有的P2P通道。
(2)客戶端實時報告自己的P2P狀態變化給服務端
當客戶端每次成功創建一個P2P通道、或者已有P2P通道中斷時,客戶端要發消息告訴服務端。這樣,我們就需要定義這個消息的類型:
public static class MyInfoTypes { public const int P2PChannelOpen = 1; public const int P2PChannelClose = 2; }
再定義消息協議:
public class P2PChannelReportContract { public P2PChannelReportContract() { } public P2PChannelReportContract(string dest) { this.destUserID = dest; } #region DestUserID private string destUserID; public string DestUserID { get { return destUserID; } set { destUserID = value; } } #endregion }
定好了消息類型和contract類,我們在客戶端預定P2P通道的狀態變化,並報告給服務端:
public void Initialize(IRapidPassiveEngine rapidPassiveEngine) { rapidPassiveEngine.P2PController.P2PChannelOpened += new CbGeneric<P2PChannelState>(P2PController_P2PChannelOpened); rapidPassiveEngine.P2PController.P2PChannelClosed += new CbGeneric<P2PChannelState>(P2PController_P2PChannelClosed); }
void P2PController_P2PChannelClosed(P2PChannelState state) { this.P2PChannelReport(false, state.DestUserID); } void P2PController_P2PChannelOpened(P2PChannelState state) { this.P2PChannelReport(true, state.DestUserID); } private void P2PChannelReport(bool open, string destUserID) { P2PChannelReportContract contract = new P2PChannelReportContract(destUserID); int messageType = open ? MyInfoTypes.P2PChannelOpen : MyInfoTypes.P2PChannelClose; this.rapidPassiveEngine.CustomizeOutter.Send(messageType, CompactPropertySerializer.Default.Serialize(contract)); }
在服務端,我們需要處理這兩種類型的消息(實現ICustomizeHandler接口的HandleInformation方法):
private P2PChannelManager p2PChannelManager = new P2PChannelManager(); public void HandleInformation(string sourceUserID, int informationType, byte[] information) { if (informationType == MyInfoTypes.P2PChannelOpen) { P2PChannelReportContract contract = CompactPropertySerializer.Default.Deserialize<P2PChannelReportContract>(information, 0); this.p2PChannelManager.Register(sourceUserID, contract.DestUserID); return ; } if (informationType == MyInfoTypes.P2PChannelClose) { P2PChannelReportContract contract = CompactPropertySerializer.Default.Deserialize<P2PChannelReportContract>(information, 0); this.p2PChannelManager.Unregister(sourceUserID, contract.DestUserID); return ; }
}
這樣,服務端就實時地知道每個客戶端的P2P狀態了。
2.與廣播結合
同樣的,我們首先為廣播消息定義一個消息類型:
public static class MyInfoTypes { public const int P2PChannelOpen = 1; public const int P2PChannelClose = 2; public const int Broadcast = 3; //廣播消息 }
再定義對應的協議類:
public class BroadcastContract { #region Ctor public BroadcastContract() { } public BroadcastContract(string _broadcasterID, string _groupID, int infoType ,byte[] info ) { this.broadcasterID = _broadcasterID; this.groupID = _groupID; this.content = info; this.informationType = infoType; this.actionTypeOnChannelIsBusy = action; } #endregion #region BroadcasterID private string broadcasterID = null; /// <summary> /// 發出廣播的用戶ID。 /// </summary> public string BroadcasterID { get { return broadcasterID; } set { broadcasterID = value; } } #endregion #region GroupID private string groupID = ""; /// <summary> /// 接收廣播的組ID /// </summary> public string GroupID { get { return groupID; } set { groupID = value; } } #endregion #region InformationType private int informationType = 0; /// <summary> /// 廣播信息的類型。 /// </summary> public int InformationType { get { return informationType; } set { informationType = value; } } #endregion #region Content private byte[] content; public byte[] Content { get { return content; } set { content = value; } } #endregion }
(1)在客戶端發送廣播消息
在客戶端,我們根據與組內成員的P2P通道的狀態,來判斷發送的方案,就像依據上文提到的,可細分為三種情況:
a.當某個客戶端發現自己和組內的所有其它成員都建立了P2P通道時,那么,它就不用把廣播消息發送給服務器了。
b.如果客戶端與組內的所有其它成員的P2P通道都沒有建立成功,那么,它只需要將廣播消息發送給服務器。
c.如果客戶端與部分組內的成員建立了P2P通道,那么,它不僅需要將廣播消息發送給服務器,還需要將該廣播消息經過每個P2P通道發送一次。
public void Broadcast(string currentUserID, string groupID, int broadcastType, byte[] broadcastContent) { BroadcastContract contract = new BroadcastContract(currentUserID, groupID, broadcastType, broadcastContent); byte[] info = CompactPropertySerializer.Default.Serialize(contract); List<string> members = this.groupManager.GetGroupMembers(groupID); if (members == null) { return; } bool allP2P = true; foreach (string memberID in members) { if (memberID == this.currentUserID) { continue; } if (rapidPassiveEngine.P2PController.IsP2PChannelExist(memberID)) { rapidPassiveEngine.CustomizeOutter.SendByP2PChannel(memberID, MyInfoTypes.Broadcast, info, ActionTypeOnNoP2PChannel.Discard, true, ActionTypeOnChannelIsBusy.Continue); } else { allP2P = false; } } if (!allP2P) //只要有一個組成員沒有成功建立P2P,就要發給服務端。 { this.rapidPassiveEngine.CustomizeOutter.Send(null, this.groupInfoTypes.Broadcast, info, true, action); } }
(2)服務端轉發廣播
當服務器收到一個廣播消息時,首先,查看目標組中的用戶,然后,根據廣播消息的發送者的P2P通道狀態,來綜合決定該廣播消息需要轉發給哪些客戶端。我們只需在上面的HandleInformation方法中增加代碼就可以了:
if (informationType == MyInfoTypes.Broadcast) { BroadcastContract contract = CompactPropertySerializer.Default.Deserialize<BroadcastContract>(information, 0); string groupID = contract.GroupID; List<string> members = this.groupManager.GetGroupMembers(groupID); if (members != null) { foreach (string memberID in members) { bool useP2PChannel = this.p2PChannelManager.IsP2PChannelExist(sourceUserID, memberID); if (memberID != sourceUserID && !useP2PChannel) { this.customizeController.Send(memberID, MyInfoTypes.Broadcast, information, true, ActionTypeOnChannelIsBusy.Continue); } } } return; }
(3)客戶端處理接收到的廣播消息
客戶端也只要實現ICustomizeHandler接口的HandleInformation方法,就可以處理來自P2P通道或者轉發自服務端的廣播消息了(即處理MyInfoTypes.Broadcast類型的消息),這里就不贅述了。
實際上,本文的實現還可以進一步優化,特別是在高頻的廣播消息時(如前文舉的視頻會議的例子),這種優化效果是很明顯的。那就是,比如,我們在客戶端可以將組內的成員分成兩類管理起來,一類是P2P已經打通的,一類是沒有通的,並根據實際的P2P狀態變化而調整。這樣,客戶端每次發送廣播消息時,就不用遍歷自己與每個組員的P2P通道的狀態,這可以節省不少的cpu時間。同理,服務端也可以如此處理。