分享一個分布式消息總線,基於.NET Socket Tcp的發布-訂閱框架,附代碼下載


一、分布式消息總線

     在很多MIS項目之中都有這樣的需求,需要一個及時、高效的的通知機制,即比如當使用者A完成了任務X,就需要立即告知使用者B任務X已經完成,在通常的情況下,開發人中都是在使用者B所使用的程序之中寫數據庫輪循代碼,這樣就會產品一個很嚴重的兩個問題,第一個問題是延遲,輪循機制要定時執行,必須會引起延遲,第二個問題是數據庫壓力過大,當進行高頻度的輪循會生產大量的數據庫查詢,並且如果有大量的使用者進行輪循,那數據庫的壓力就更大了。

     那么在這個時間,就需要一套能支持發布-訂閱模式的分布式消息總線,那這個問題就可以很好的解決了,比如采用一些成熟的消息總線進行實現,比如MSMQ或者采用比如開源的NServiceBus的發布訂閱機制就可以實現處理這種需求,其系統結構就會變成如下所示:

2010-11-29-11-30-48

    本分布式消息總線,目前廣泛的被應用於分布式緩存的更新通知,當在N百台客戶短在使用緩存的過程之中,某個操作修改了緩存的數據,必須會導致其他終端緩存的失效,那么使用基於Socket的分布式消息總線之后,我們可以做了修改了即可實時通知,做到緩存數據保持最新,再比如醫療應用之中的危急值管理,當發現檢驗、檢查危急值之后,需要及時通知病區啟動聲光報警系統等,提醒醫護工作人員及相關領導做出相應的措施,再比如應用於異構系統整合,當檢驗系統做出檢驗報告,通過消息總線進行發布,HIS系統則即時會收到檢驗報告數據而實現系統的整合。

二、基於Socket的實現

     目前能夠實現發布訂閱模式的開源產品非常之多,為什么還要制造輪子呢,其主要原因有以下幾點

     1)像NServiceBus這種東西基於MSMQ,在大量的發布者-訂閱者的情況下性能不佳。

     2)此類東西太過於龐大、不易使用和配置。

     3)學習成本過高。

     那為什么要使用Socket技術進行實現呢,其主要原因是有以下幾點:

     1)使用高效的Socket通信技術,高效、支持更多的客戶端。

     2)使用簡單,不需要定義太多額外的東西,只需要定義主題和消息即可使用。

     目前本發布訂閱框架是基於AgileEAS.NET SOA中間件平台Socket框架實現的,有關於些Socket框架的技術細節請參考AgileEAS.NET SOA 中間件平台.Net Socket通信框架-介紹AgileEAS.NET SOA 中間件平台.Net Socket通信框架-簡單例子-實現簡單的服務端客戶端消息應答AgileEAS.NET SOA 中間件平台.Net Socket通信框架-完整應用例子-在線聊天室系統-下載配置AgileEAS.NET SOA 中間件平台.Net Socket通信框架-完整應用例子-在線聊天室系統-代碼解析文章進行了解和學習。

     目前本發布訂閱框架並直接集成於AgileEAS.NET SOA Socket通信框架之中並且隨其一並發布,下面簡單介紹一下其API:

在本框架之中定義了一個消息總線接口IMessageBus:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Text;
   5: using System.Collections;
   6:  
   7: namespace EAS.Messages
   8: {
   9:     /// <summary>
  10:     /// 消息總線接口定義。
  11:     /// </summary>
  12:     public interface IMessageBus : IDisposable
  13:     {
  14:         /// <summary>
  15:         /// 注冊發布者。
  16:         /// </summary>
  17:         /// <param name="publisher">發布者。</param>
  18:         void AddPublisher(string publisher);
  19:  
  20:         /// <summary>
  21:         /// 注冊發布者。
  22:         /// </summary>
  23:         /// <param name="publisher">發布者。</param>
  24:         /// <param name="topic">主題。</param>
  25:         void AddPublisher(string publisher, string topic);
  26:  
  27:         /// <summary>
  28:         /// 發布一條消息到總線。
  29:         /// </summary>
  30:         /// <param name="topic">主題。</param>
  31:         /// <param name="message">發布的消息。</param>
  32:         void Publish(string topic, object message);
  33:  
  34:         /// <summary>
  35:         /// 訂閱消息。
  36:         /// </summary>
  37:         /// <param name="subscriber">訂閱者。</param>
  38:         /// <param name="topic">主題。</param>
  39:         /// <param name="notifyHandler">訂閱通知。</param>
  40:         void Subscribe(object subscriber, string topic, MessageNotifyHandler notifyHandler);
  41:  
  42:         /// <summary>
  43:         /// 訂閱消息。
  44:         /// </summary>
  45:         /// <param name="subscriber">訂閱者。</param>
  46:         /// <param name="friendName">訂閱者名稱,用於處理離線訂閱。</param>
  47:         /// <param name="topic">主題。</param>
  48:         /// <param name="notifyHandler">訂閱通知。</param>
  49:         void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler);
  50:  
  51:         /// <summary>
  52:         /// 訂閱消息。
  53:         /// </summary>
  54:         /// <param name="subscriber">訂閱者。</param>
  55:         /// <param name="friendName">訂閱者名稱,用於處理離線訂閱。</param>
  56:         /// <param name="topic">主題。</param>
  57:         /// <param name="notifyHandler">訂閱通知。</param>
  58:         /// <param name="changedHandler">發布者狀態變化委托。</param>
  59:         void Subscribe(object subscriber, string friendName, string topic, MessageNotifyHandler notifyHandler,PublisherSstatusChangedHandler changedHandler);
  60:  
  61:         /// <summary>
  62:         /// 退訂消息。
  63:         /// </summary>
  64:         /// <param name="subscriber">訂閱者。</param>
  65:         void Unsubscribe(object subscriber);
  66:  
  67:         /// <summary>
  68:         /// 退訂消息。
  69:         /// </summary>
  70:         /// <param name="subscriber">訂閱者。</param>
  71:         /// <param name="topic">主題。</param>
  72:         void Unsubscribe(object subscriber, string topic);
  73:  
  74:         /// <summary>
  75:         /// 退訂消息。
  76:         /// </summary>
  77:         /// <param name="subscriber">訂閱者。</param>
  78:         /// <param name="friendName">訂閱者名稱,用於處理離線訂閱。</param>
  79:         /// <param name="topic">主題。</param>
  80:         void Unsubscribe(object subscriber, string friendName, string topic);
  81:     }
  82: }

    IMessageBus就基於Socket發布訂閱消息總線的靈魂接口,也是基唯一的發布者調用者功能入口,也就是說不管你是發布者還是訂閱者都需要調用這個接口,如果你是發布者請調用IMessageBus接口的Publish方法向消息總線發布消息,如果是你訂閱者請通過IMessageBus的訂閱方法進行訂閱,當你訂閱了某個主題之后,有發布者發布該主題的消息,你即可以收到消息並調用訂閱回調函數進行處理。

三、實現一個簡單的例子

     現在我們開始一個簡單的應用消息總線的例子,本例子代碼解決方案由下圖4個項目組成:

(`I6]J`YK1((6M3M8Q2}(5X

     其中:Demo.Messages項目定義發布者、訂閱者所使用的消息對象和消息主題。

                Demo.Publisher項目為發布者代碼。

                Demo.Subscriber項目為訂閱者代碼。

                Demo.Server項目為服務端代碼。

     在Demo.Messages項目之中,我們定義了一個消息Message:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Text;
   5: using System.Xml.Serialization;
   6:  
   7: namespace Demo.Messages
   8: {
   9:     [Serializable]
  10:     public class Message
  11:     {
  12:         [XmlElement]
  13:         public Guid ID
  14:         {
  15:             get;
  16:             set;
  17:         }
  18:     }
  19: }

     消息Message很簡單,只有一個屬性ID,同時 還需要定義一個消息主題:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Text;
   5:  
   6: namespace Demo.Messages
   7: {
   8:     public class Topics
   9:     {
  10:         public static readonly string DEMO_TOPIC = "演示消息";
  11:     }
  12: }

     我們定義了一個消息主題為“演示消息”。

     在Demo.Publisher項目之中,沒有太多額外的代碼,只有在Program.cs寫了以下簡單的調用代碼:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Text;
   5: using EAS.Messages;
   6:  
   7: namespace Demo.Publisher
   8: {
   9:     class Program
  10:     {
  11:         static void Main(string[] args)
  12:         {
  13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();
  14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;
  15:             System.Console.WriteLine("Publisher");
  16:  
  17:             while (System.Console.ReadLine()!="exit")
  18:             {
  19:                 var m = new Messages.Message { ID = Guid.NewGuid() };
  20:                 bus.Publish(Demo.Messages.Topics.DEMO_TOPIC, m);
  21:                 System.Console.WriteLine(string.Format("Publish:{0}", m.ID));
  22:             }
  23:         }
  24:     }
  25: }

     從IOC容器獲取一個消息總線IMessageBus對象,並調用Publish函數發布消息”bus.Publish(Demo.Messages.Topics.DEMO_TOPIC, m);“。

     當然了,使用了IOC容器,就離來開配置文件了,其App.config文件內容如下:

   1: <?xml version="1.0" encoding="utf-8"?>
   2: <configuration>
   3:   <configSections>
   4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
   5:   </configSections>
   6:   <eas>
   7:     <objects>
   8:       <!--消息總線-->
   9:       <object name="MessageBus" assembly="EAS.MicroKernel" type="EAS.Sockets.Bus.SocketBus" LifestyleType="Singleton">
  10:         <property name="Url" type="string" value="socket.tcp://127.0.0.1:6606/"/>
  11:       </object>
  12:     </objects>
  13:   </eas>
  14: </configuration>

     在Demo.Subscriber項目之中,使用與Demo.Publisher一模一樣的配置文件,其Program.cs代碼如下:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Text;
   5: using EAS.Messages;
   6:  
   7: namespace Demo.Subscriber
   8: {
   9:     class Program
  10:     {
  11:         static void Main(string[] args)
  12:         {
  13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();
  14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;
  15:             System.Console.WriteLine("Subscriber");
  16:  
  17:             bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);
  18:             System.Console.ReadLine();
  19:         }
  20:  
  21:         static void MessageNotify(object m)
  22:         {
  23:             Demo.Messages.Message message = m as Demo.Messages.Message;
  24:             System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID));
  25:         }
  26:     }
  27: }

     其中代碼bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);:完成把消息訂閱到MessageNotify通知函數之中。

     在Demo.Server項目之中,啟動服務器並且開始接收訂閱和發布:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Text;
   5: using EAS.Sockets;
   6:  
   7: namespace Demo.Server
   8: {
   9:     class Program
  10:     {
  11:         static void Main(string[] args)
  12:         {
  13:             SocketServer socketServer = new SocketServer(128);
  14:             socketServer.Port = 6606;
  15:             socketServer.Logger = new EAS.Loggers.ConsoleLogger(); 
  16:             socketServer.Initialize();
  17:             System.Console.WriteLine("Server Starting...");
  18:             socketServer.StartServer();
  19:             System.Console.WriteLine("Server Startup!");
  20:             System.Console.ReadLine();
  21:         }
  22:     }
  23: }

     到此為止,所有代碼均已完成,是不是很簡單,接下來,我們跑起來驗證一下效果。

四、驗證效果

     我們在編譯輸入目錄Publish下先啟動Demo.Server.exe,再啟動兩個Demo.Subscriber.exe,再啟動一個Demo.Publisher.exe,在Demo.Publisher.exe控制台按回車鍵:

image

OK,搞定。

五、源代碼下載

     本程序的源代碼已上傳到服務器,請通過http://112.74.65.50/downloads/eas/Demo.Pub_Sub.rar進行下載,如果在開發過程之中想要了解更多有關Socket通信框架以及更多AgileEAS.NET SOA中間件平台的技術資源,請通過AgileEAS.NET SOA 網站:http://www.smarteas.net最新下載欄目進行下載。    

六、問題反饋

     麻煩大家在通過視頻進行學習的時候能及時把問題反饋給樓主,或者有什么需要改進的一些建議都請向樓主直接反饋,以下是聯系方式:

團隊網站:http://www.agilelab.cn

AgileEAS.NET網站:http://www.agileeas.net

官方博客:http://eastjade.cnblogs.com

github:https://github.com/agilelab/eas

QQ群:113723486(AgileEAS SOA 平台)/上限1000人

199463175(AgileEAS SOA 交流)/上限1000人

120661978(AgileEAS.NET 平台交流)/上限1000人

郵件:james@agilelab.cn,mail.james@qq.com,

電話:18629261335。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM