.Net開發筆記(十四) 基於“泵”的UDP通信(接上篇)


上一篇中說到了“泵”在編程中的作用以及一些具體用處,但沒有實際demo,可能不好理解,這篇文章我分享一個UDP通信的demo,大概實現了類似“飛鴿傳書”在局域網中文本消息和文件傳輸的功能。功能不全也不是很完善,但足以說明“泵”在代碼中的具體應用。

先來回憶一下上篇中“泵”的含義,首先它是可持續運作的,其次它可以將“數據”從一個地方傳遞到另外一個地方,供其他人使用。搬一張上篇的圖:

圖1 程序中“泵”結構圖

如上圖所示,每個泵有一個“待處理”的數據容器(緩沖區),泵循環里面還可以有一個“預處理”的地方,可以在數據被傳遞之前,先做處理,最后,泵會輸出數據,供別人使用。完成“數據源”->“泵”->“使用者”這樣一個過程,明顯“泵”起到動力的作用。我們可以看見,“使用者”使用數據的過程包含在“泵循環”之內,也就是說,如果使用數據時,耗時太長,一次“泵循環”不能及時返回,那么,緩沖區中的數據就會累積,得不到及時處理,影響“泵”的工作效率。能解決該問題有兩種途徑:

  • 1)在使用數據時,不做長時間耗時操作;
  • 2)先不直接使用數據,先將“泵”傳遞出來的數據存入另外一個容器(緩沖區),再另外開辟線程去處理緩沖區中的數據。

其中1)明顯治標不治本,不能解決實際問題,2)倒是可以一試,因為它將“使用數據”和“泵循環”分離開來,使“使用數據”不會直接影響到“泵”的工作。其實上一篇中說到“UDP通信結構”時就已經按照2)所說的實現的,我那時候說不能將數據處理、數據分析都放在“數據接收泵”中,因為有一個主要原因就是,那樣做會影響數據接收的效率。后來我畫了一張UDP通信結構圖,里面用到了三個“泵”(數據接收泵、數據分析泵、數據處理泵),如下圖:

圖2 UDP通信中“泵”的使用

如上圖,“數據分析泵”使用到了“數據接收泵”傳遞出來的數據,“數據處理泵”使用到了“數據分析泵”傳遞出來的數據,它們都沒有直接使用,都是先將數據存入對應緩沖區,然后開辟新的“泵”(也是另開辟線程吧)去處理。這樣一來,每個環節互不影響其他環節的工作效率。

在實現這個UDP通信demo時,我定義了三個“泵”類,一個數據發送類,和一個幫助類,分別如下:

  • UDPSocket:主要實現了數據接受泵,以及socket綁定等;
  • DataAnalyse:數據分析泵;
  • DataDeal:數據處理泵;
  • UDPSender:數據發送;
  • Help.cs中一些類型:輔助功能。

其中,DataAnalyse是UDPSocket中“泵”傳遞出來數據的使用者,DataDeal是DataAnalyse中“泵”傳遞出來數據的使用者,然后我們(開發者)是DataDeal中“泵”傳遞出來數據的使用者。

需要說明的是,默認請款下,DataDeal中的“泵”是不會傳遞出來任何數據,DataAnalyse中的“泵”也不會傳遞出來任何數據,我們需要根據具體需求,實現自己的DataDeal和DataAnalyse類,重寫Deal和Analyse虛方法。我定義了兩個類,分別為MyDataDeal和MyDataAnalyse,具體代碼如下:

    /// <summary>
    /// 自定義數據處理類 重寫Deal方法
    /// 負責處理某一些數據,其余交給基類
    /// 可以繼續從MyDataDeal派生新的類,繼續重寫Deal方法
    /// </summary>
    public class MyDataDeal : DataDeal
    {
        public event LoginEventHandler Login;
        public event LoginBackEventHandler LoginBack;
        public event TxtMessageReceivedEventHandler TxtMessageReceived;
        public event TxtMessageBackEventHandler TxtMessageBack;
        public event LogoutEventHandler Logout;
        public event AskSendFileEventHandler AskSendFile;
        public event AskSendFileBackEventHandler AskSendFileBack;
        public event SendFileDataEventHandler SendFileData;
        public event ReceiveFileOKEventHandler ReceiveFileOK;
        public event SendFileStopEventHandler SendFileStop;
        public event ReceiveFileStopEventHandler ReceiveFileStop;
        public event SendFileDataBackEventHandler SendFileDataBack; 

        public MyDataDeal(DataAnalyse dataAnalyse)
            : base(dataAnalyse)
        {

        }
        protected override void Deal(Data data)
        {
            switch (data.Msg)
            {
                case Msg.ZMsg1:
                    {
                        if (Login != null)
                        {
                            Login(data.RemoteIP, data.RemotePort, data.Content.ToString());
                        }
                        break;
                    }
                case Msg.ZMsg2:
                    {
                        if (LoginBack != null)
                        {
                            LoginBack(data.RemoteIP, data.RemotePort, data.Content.ToString());
                        }
                        break;
                    }
                case Msg.ZMsg3:
                    {
                        if (TxtMessageReceived != null)
                        {
                            TxtMessageReceived(data.RemoteIP, data.RemotePort, data.Content.ToString());
                        }
                        break;
                    }
                case Msg.ZMsg4:
                    {
                        if(TxtMessageBack != null)
                        {
                            TxtMessageBack(data.RemoteIP, data.RemotePort, data.Content.ToString());
                        }
                        break;
                    }
                case Msg.ZMsg5:
                    {
                        if (Logout != null)
                        {
                            Logout(data.RemoteIP, data.RemotePort, data.Content.ToString());
                        }
                        break;
                    }
                case Msg.ZMsg6:
                    {
                        if (AskSendFile != null)
                        {
                            object[] objs = data.Content as object[];
                            int uid = (int)objs[0];
                            long length = (long)objs[1];
                            string filename = objs[2].ToString();

                            AskSendFile(data.RemoteIP, data.RemotePort, filename, length, uid); 
                        }
                        break;
                    }
                case Msg.ZMsg7:
                    {
                        if (AskSendFileBack != null)
                        {
                            object[] objs = data.Content as object[];
                            int uid = (int)objs[0];
                            int result = (int)objs[1];
                            AskSendFileBack(data.RemoteIP, data.RemotePort, result, uid); 
                        }
                        break;
                    }
                case Msg.ZMsg8:
                    {
                        if (SendFileData != null)
                        {
                            object[] objs = data.Content as object[];
                            int uid = (int)objs[0];
                            int part = (int)objs[1];
                            byte[] dat = objs[2] as byte[];
                            SendFileData(data.RemoteIP, data.RemotePort, dat, uid, part);
                        }
                        break;
                    }
                case Msg.ZMsg9:
                    {
                        if (ReceiveFileOK != null)
                        {
                            int uid = (int)data.Content;
                            ReceiveFileOK(data.RemoteIP, data.RemotePort, uid);
                        }
                        break;
                    }
                case Msg.ZMsg10:
                    {
                        if (SendFileStop != null)
                        {
                            int uid = (int)data.Content;
                            SendFileStop(data.RemoteIP, data.RemotePort, uid);
                        }
                        break;
                    }
                case Msg.ZMsg11:
                    {
                        if (ReceiveFileStop != null)
                        {
                            int uid = (int)data.Content;
                            ReceiveFileStop(data.RemoteIP, data.RemotePort, uid);
                        }
                        break;
                    }
                case Msg.ZMsg12:
                    {
                        if (SendFileDataBack != null)
                        {
                            object[] objs = data.Content as object[];
                            int uid = (int)objs[0];
                            int part = (int)objs[1];
                            SendFileDataBack(data.RemoteIP, data.RemotePort, uid, part);
                        }
                        break;
                    }
                default:
                    {
                        base.Deal(data); //其他由基類處理
                        break;
                    }
            }
        }
    }
    public delegate void LoginEventHandler(string remoteIP,int remotePort,string remoteName);
    public delegate void LoginBackEventHandler(string remoteIP,int remotePort,string remoteName);
    public delegate void TxtMessageReceivedEventHandler(string remoteIP,int remotePort,string txtMsg);
    public delegate void TxtMessageBackEventHandler(string remoteIP,int remotePort,string txtMsgBack);
    public delegate void LogoutEventHandler(string remoteIP,int remotePort,string remoteName);
    public delegate void AskSendFileEventHandler(string remoteIP,int remotePort,string filename,long length,int uid);
    public delegate void AskSendFileBackEventHandler(string remoteIP,int remotePort,int result,int uid);
    public delegate void SendFileDataEventHandler(string remoteIP,int remotePort,byte[] data,int uid,int part);
    public delegate void ReceiveFileOKEventHandler(string remoteIP,int remotePort,int uid);
    public delegate void SendFileStopEventHandler(string remoteIP,int remotePort,int uid);
    public delegate void ReceiveFileStopEventHandler(string remoteIP,int remotePort,int uid);
    public delegate void SendFileDataBackEventHandler(string remoteIP,int remotePort,int uid,int part);
MyDataDeal
  1     /// <summary>
  2     /// 自定義數據分析類 重寫Analyse方法
  3     /// 負責分析某一些數據,其余交給基類
  4     /// 可以繼續從MyDataAnalyse派生新的類,繼續重寫Analyse方法
  5     /// </summary>
  6     public class MyDataAnalyse : DataAnalyse
  7     {
  8         public MyDataAnalyse(UDPSocket udpSocket)
  9             : base(udpSocket)
 10         {
 11 
 12         }
 13         protected override void Analyse(RawData rawData)
 14         {
 15             string ip = (rawData.RemoteEP as IPEndPoint).Address.ToString(); //遠端IP
 16             int port = (rawData.RemoteEP as IPEndPoint).Port; //遠端端口
 17             if (rawData.Buffer.Length > 0)
 18             {
 19                 Msg msg = (Msg)(rawData.Buffer[0]); //
 20                 switch (msg)
 21                 {
 22                     case Msg.ZMsg1: //登錄
 23                         {
 24                             string data = Encoding.Unicode.GetString(rawData.Buffer, 1, rawData.Length - 1);  //從1開始  因為前面有一個 頭  下同,注意是 rawData.Length(實際接收數據)  不是rawData.Buffer.Length
 25                             OnAnalysed(msg, data, ip, port);
 26                             break;
 27                         }
 28                     case Msg.ZMsg2: //登錄反饋
 29                         {
 30                             string data = Encoding.Unicode.GetString(rawData.Buffer, 1, rawData.Length - 1);
 31                             OnAnalysed(msg, data, ip, port);
 32                             break;
 33                         }
 34                     case Msg.ZMsg3: //文本信息
 35                         {
 36                             string data = Encoding.Unicode.GetString(rawData.Buffer, 1, rawData.Length - 1);
 37                             OnAnalysed(msg, data, ip, port);
 38                             break;
 39                         }
 40                     case Msg.ZMsg4: //文本信息反饋
 41                         {
 42                             string data = Encoding.Unicode.GetString(rawData.Buffer, 1, rawData.Length - 1);
 43                             OnAnalysed(msg, data, ip, port);
 44                             break;
 45                         }
 46                     case Msg.ZMsg5: //退出
 47                         {
 48                             string data = Encoding.Unicode.GetString(rawData.Buffer, 1, rawData.Length - 1);
 49                             OnAnalysed(msg, data, ip, port);
 50                             break;
 51                         }
 52                     case Msg.ZMsg6: //請求發送文件
 53                         {
 54                             int uid = BitConverter.ToInt32(rawData.Buffer, 1); //文件唯一標示
 55                             long length = BitConverter.ToInt64(rawData.Buffer, 5); //文件大小
 56                             string filename = Encoding.Unicode.GetString(rawData.Buffer, 13, rawData.Length - 13); //文件名稱
 57                             OnAnalysed(msg, new object[] { uid, length, filename }, ip, port); //
 58                             break;
 59                         }
 60                     case Msg.ZMsg7: //請求發送文件反饋
 61                         {
 62                             int uid = BitConverter.ToInt32(rawData.Buffer, 1); //文件唯一標示
 63                             int result = BitConverter.ToInt32(rawData.Buffer, 5); //反饋結果
 64                             OnAnalysed(msg, new object[] { uid, result }, ip, port); 
 65                             break;
 66                         }
 67                     case Msg.ZMsg8: //發送文件數據
 68                         {
 69                             int uid = BitConverter.ToInt32(rawData.Buffer, 1); //文件唯一標示
 70                             int part = BitConverter.ToInt32(rawData.Buffer, 5); //段ID
 71                             byte[] data = new byte[rawData.Length - 9]; //實際文件數據
 72                             Buffer.BlockCopy(rawData.Buffer, 9, data, 0, data.Length);
 73                             OnAnalysed(msg, new object[] { uid, part, data }, ip, port);
 74                             break;
 75                         }
 76                     case Msg.ZMsg9: //接收文件完畢
 77                         {
 78                             int uid = BitConverter.ToInt32(rawData.Buffer, 1); //文件唯一標示
 79                             OnAnalysed(msg, uid, ip, port);
 80                             break;
 81                         }
 82                     case Msg.ZMsg10: //發送方中止發送文件
 83                         {
 84                             int uid = BitConverter.ToInt32(rawData.Buffer, 1); //文件唯一標示
 85                             OnAnalysed(msg, uid, ip, port);
 86                             break;
 87                         }
 88                     case Msg.ZMsg11: //接收方中止接收文件
 89                         {
 90                             int uid = BitConverter.ToInt32(rawData.Buffer, 1); //文件唯一標示
 91                             OnAnalysed(msg, uid, ip, port);
 92                             break;
 93                         }
 94                     case Msg.ZMsg12: //發送文件數據反饋
 95                         {
 96                             int uid = BitConverter.ToInt32(rawData.Buffer, 1); //文件唯一標示
 97                             int part = BitConverter.ToInt32(rawData.Buffer, 5); //段ID
 98                             OnAnalysed(msg, new object[] { uid, part }, ip, port);
 99                             break;
100                         }
101                     default:
102                         {
103                             base.Analyse(rawData);  //其他由基類分析
104                             break;
105                         }
106                 }
107             }
108         }
109     }
MyDataAnalyse

如代碼所示,各位下載源碼后,可以將UDPSocket、UDPSender、DataDeal、DataAnalyse以及Help.cs文件中的一些類型封裝起來,生成一個通用程序集,實際使用中,只需要引用該程序集,並且實現自己的DataDeal和DataAnalyse就行。

另外,demo中我還定義了一套自己的“通信協議”,如下:

 1 ZMsg1:用戶新上線                  頭+用戶信息
 2 ZMsg2:用戶新上線反饋              頭+反饋用戶信息
 3 ZMsg3:文本消息                    頭+密碼+消息正文
 4 ZMsg4:文本消息反饋                頭+原始消息
 5 ZMsg5:離線                        頭+任意文本
 6 ZMsg6:請求發送文件                頭+發送文件的唯一標示+大小+文件名
 7 ZMsg7:請求發送文件反饋            頭+文件的唯一標示+1或0           1代表接收 0代表拒絕
 8 ZMsg8:發送文件數據                頭+文件唯一標示+段ID+文件數據(byte[])
 9 ZMsg9:接收文件完畢                頭+文件唯一標示
10 ZMsg10:發送方中止發送文件         頭+文件唯一標示
11 ZMsg11:接收方中止接收文件         頭+文件唯一標示
12 ZMsg12:發送文件數據反饋           頭+文件唯一標示+段ID
消息協議

通信協議根據具體需求而不同。

總結一下,在其他項目中具體使用步驟如下:

  1. 根據具體需求,定義一套“消息協議”;
  2. 從DataAnalyse派生出一個新的類,根據“消息協議”重寫Analyse方法,進行數據分析;
  3. 從DataDeal派生出一個新的類,根據具體業務邏輯重寫Deal方法,進行數據處理;
  4. 數據發送方必須嚴格按照“消息協議”發送數據;
  5. 最后,你要做的就是注冊DataDeal派生類的一些事件,使用“泵”傳遞出來的程序可識別數據。

源碼下載地址:http://files.cnblogs.com/xiaozhi_5638/UDPMessager.rar

希望對各位有幫助。附幾張效果圖


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM