廣播與P2P通道(下) -- 方案實現


廣播與P2P通道(上) -- 問題與方案 一文中,我們已經找到了最優的模型,即將廣播與P2P通道相結合的方案,這樣能使服務器的帶寬消耗降到最低,最大節省服務器的寬帶支出。當然,如果從零開始實現這種方案無疑是非常艱巨的,但基於ESFramework提供的通信功能和P2P功能來做,就不再那么遙不可及了。

1.P2P通道狀態

根據上文模型3的討論,要實現該模型,每個客戶端需要知道自己與哪些用戶創建了P2P通道,服務器也要知道每個客戶端已建立的P2P通道的狀態。

使用ESFramework,在客戶端已經可以通過IRapidPassiveEngine.P2PController接口知道當前客戶端與哪些其它客戶端成功建立了P2P通道,並且可以通過P2PController接口發起與新的客戶端建立新的P2P通道的嘗試。但在服務端,對於每個客戶端建立了哪些P2P通道,服務端是一無所知的。所以,基於ESFramework實現模型3的第一件事情,就是客戶端要實時把自己的P2P狀態變化報告給服務端,而服務端也要管理每個客戶端的P2P通道狀態。(注意。下面的所有實現,需要引用ESFramework.dll、ESPlus.dll、ESBasic.dll)

(1)P2PChannelManager

我們在服務端設計P2PChannelManager類來管理每個在線客戶端已成功創建的所有P2P通道。

    public class P2PChannelManager
    {
        //key 表示P2P通道的起始點用戶ID,value 表示P2P通道的目的點用戶列表。(單向,因為某些P2P通道就是單向的)
        private SortedArray<string, SortedArray<string>> channels = new SortedArray<string, SortedArray<string>>();

        public void Initialize(IUserManager userManager)
        {
            userManager.SomeOneDisconnected += new ESBasic.CbGeneric<UserData, ESFramework.Server.DisconnectedType>(userManager_SomeOneDisconnected);
        }

        void userManager_SomeOneDisconnected(UserData user, ESFramework.Server.DisconnectedType obj2)
        {
            this.channels.RemoveByKey(user.UserID);
        }

        public void Register(string startUserID, string destUserID)
        {
            if (!this.channels.ContainsKey(startUserID))
            {
                this.channels.Add(startUserID, new SortedArray<string>());
            }

            this.channels[startUserID].Add(destUserID);
        }

        public void Unregister(string startUserID, string destUserID)
        {
            if (this.channels.ContainsKey(startUserID))
            {
                this.channels[startUserID].Remove(destUserID);
            }
        }

        public bool IsP2PChannelExist(string startUserID, string destUserID)
        {
            if (!this.channels.ContainsKey(startUserID))
            {
                return false;
            }

            return this.channels[startUserID].Contains(destUserID);
        }

     }

P2PChannelManager提供了注冊P2P通道、注銷P2P通道、以及查詢P2P通道是否存在的方法。其內部使用類似字典的SortedArray來管理每個用戶的已經成功建立的P2P通道(即與哪些其它用戶打通了P2P)。另外,P2PChannelManager預定了IUserManager的SomeOneDisconnected事件,這樣,當某個用戶掉線時,就可以清除其所有的P2P狀態。因為,在ESFramework中,當客戶端與服務器的TCP連接斷開時,客戶端會自動關閉所有的P2P通道。

(2)客戶端實時報告自己的P2P狀態變化給服務端

當客戶端每次成功創建一個P2P通道、或者已有P2P通道中斷時,客戶端要發消息告訴服務端。這樣,我們就需要定義這個消息的類型:

    public static class MyInfoTypes
    {
        public const int P2PChannelOpen = 1;
        public const int P2PChannelClose = 2;
    }

再定義消息協議:

    public class P2PChannelReportContract
    {
        public P2PChannelReportContract() { }
        public P2PChannelReportContract(string dest)
        {
            this.destUserID = dest;
        }

        #region DestUserID
        private string destUserID;
        public string DestUserID
        {
            get { return destUserID; }
            set { destUserID = value; }
        } 
        #endregion
    }

定好了消息類型和contract類,我們在客戶端預定P2P通道的狀態變化,並報告給服務端:

   public void Initialize(IRapidPassiveEngine rapidPassiveEngine)
    {           
        rapidPassiveEngine.P2PController.P2PChannelOpened += new CbGeneric<P2PChannelState>(P2PController_P2PChannelOpened);
        rapidPassiveEngine.P2PController.P2PChannelClosed += new CbGeneric<P2PChannelState>(P2PController_P2PChannelClosed);
    }

void P2PController_P2PChannelClosed(P2PChannelState state) { this.P2PChannelReport(false, state.DestUserID); } void P2PController_P2PChannelOpened(P2PChannelState state) { this.P2PChannelReport(true, state.DestUserID); } private void P2PChannelReport(bool open, string destUserID) { P2PChannelReportContract contract = new P2PChannelReportContract(destUserID); int messageType = open ? MyInfoTypes.P2PChannelOpen : MyInfoTypes.P2PChannelClose; this.rapidPassiveEngine.CustomizeOutter.Send(messageType, CompactPropertySerializer.Default.Serialize(contract)); }

在服務端,我們需要處理這兩種類型的消息(實現ICustomizeHandler接口的HandleInformation方法):

    private P2PChannelManager p2PChannelManager = new P2PChannelManager();
   public void HandleInformation(string sourceUserID, int informationType, byte[] information)
    {
        if (informationType == MyInfoTypes.P2PChannelOpen)
        {
            P2PChannelReportContract contract = CompactPropertySerializer.Default.Deserialize<P2PChannelReportContract>(information, 0);
            this.p2PChannelManager.Register(sourceUserID, contract.DestUserID);
            return ;
        }

        if (informationType == MyInfoTypes.P2PChannelClose)
        {
            P2PChannelReportContract contract = CompactPropertySerializer.Default.Deserialize<P2PChannelReportContract>(information, 0);
            this.p2PChannelManager.Unregister(sourceUserID, contract.DestUserID);
            return ;
        }

這樣,服務端就實時地知道每個客戶端的P2P狀態了。

2.與廣播結合

同樣的,我們首先為廣播消息定義一個消息類型:

    public static class MyInfoTypes
    {
        public const int P2PChannelOpen = 1;
        public const int P2PChannelClose = 2;
        public const int Broadcast = 3; //廣播消息
    }

再定義對應的協議類:

    public class BroadcastContract
    {
        #region Ctor
        public BroadcastContract() { }
        public BroadcastContract(string _broadcasterID, string _groupID, int infoType ,byte[] info )
        {
            this.broadcasterID = _broadcasterID;
            this.groupID = _groupID;
            this.content = info;
            this.informationType = infoType;
            this.actionTypeOnChannelIsBusy = action;
        }        
        #endregion

        #region BroadcasterID
        private string broadcasterID = null;
        /// <summary>
        /// 發出廣播的用戶ID。
        /// </summary>
        public string BroadcasterID
        {
            get { return broadcasterID; }
            set { broadcasterID = value; }
        } 
        #endregion

        #region GroupID
        private string groupID = "";
        /// <summary>
        /// 接收廣播的組ID
        /// </summary>
        public string GroupID
        {
            get { return groupID; }
            set { groupID = value; }
        } 
        #endregion

        #region InformationType
        private int informationType = 0;
        /// <summary>
        /// 廣播信息的類型。
        /// </summary>
        public int InformationType
        {
            get { return informationType; }
            set { informationType = value; }
        } 
        #endregion

        #region Content
        private byte[] content;
        public byte[] Content
        {
            get { return content; }
            set { content = value; }
        }
        #endregion

  }

(1)在客戶端發送廣播消息

在客戶端,我們根據與組內成員的P2P通道的狀態,來判斷發送的方案,就像依據上文提到的,可細分為三種情況:

a.當某個客戶端發現自己和組內的所有其它成員都建立了P2P通道時,那么,它就不用把廣播消息發送給服務器了。

b.如果客戶端與組內的所有其它成員的P2P通道都沒有建立成功,那么,它只需要將廣播消息發送給服務器。

c.如果客戶端與部分組內的成員建立了P2P通道,那么,它不僅需要將廣播消息發送給服務器,還需要將該廣播消息經過每個P2P通道發送一次。 

    public void Broadcast(string currentUserID, string groupID, int broadcastType, byte[] broadcastContent)
    {           
        BroadcastContract contract = new BroadcastContract(currentUserID, groupID, broadcastType, broadcastContent);
        byte[] info = CompactPropertySerializer.Default.Serialize(contract);
        List<string> members = this.groupManager.GetGroupMembers(groupID);
        if (members == null)
        {
            return;
        }
        bool allP2P = true;
        foreach (string memberID in members)
        {
            if (memberID == this.currentUserID)
            {
                continue;
            }

            if (rapidPassiveEngine.P2PController.IsP2PChannelExist(memberID))
            {
                rapidPassiveEngine.CustomizeOutter.SendByP2PChannel(memberID, MyInfoTypes.Broadcast, info, ActionTypeOnNoP2PChannel.Discard, true, ActionTypeOnChannelIsBusy.Continue);
            }
            else
            {
                allP2P = false;
            }
        }

        if (!allP2P) //只要有一個組成員沒有成功建立P2P,就要發給服務端。
        {
            this.rapidPassiveEngine.CustomizeOutter.Send(null, this.groupInfoTypes.Broadcast, info, true, action);
        }
    }

 (2)服務端轉發廣播

當服務器收到一個廣播消息時,首先,查看目標組中的用戶,然后,根據廣播消息的發送者的P2P通道狀態,來綜合決定該廣播消息需要轉發給哪些客戶端。我們只需在上面的HandleInformation方法中增加代碼就可以了:

   if (informationType == MyInfoTypes.Broadcast)
    {                BroadcastContract contract = CompactPropertySerializer.Default.Deserialize<BroadcastContract>(information, 0);
        string groupID = contract.GroupID;
               
        List<string> members = this.groupManager.GetGroupMembers(groupID);
        if (members != null)
        {
            foreach (string memberID in members)
            {
                bool useP2PChannel = this.p2PChannelManager.IsP2PChannelExist(sourceUserID, memberID);
                if (memberID != sourceUserID && !useP2PChannel)
                {                            
                    this.customizeController.Send(memberID, MyInfoTypes.Broadcast, information, true, ActionTypeOnChannelIsBusy.Continue);  
                }
            }
        }
        return;
    }

(3)客戶端處理接收到的廣播消息

客戶端也只要實現ICustomizeHandler接口的HandleInformation方法,就可以處理來自P2P通道或者轉發自服務端的廣播消息了(即處理MyInfoTypes.Broadcast類型的消息),這里就不贅述了。 

實際上,本文的實現還可以進一步優化,特別是在高頻的廣播消息時(如前文舉的視頻會議的例子),這種優化效果是很明顯的。那就是,比如,我們在客戶端可以將組內的成員分成兩類管理起來,一類是P2P已經打通的,一類是沒有通的,並根據實際的P2P狀態變化而調整。這樣,客戶端每次發送廣播消息時,就不用遍歷自己與每個組員的P2P通道的狀態,這可以節省不少的cpu時間。同理,服務端也可以如此處理。

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM