Publisher/Subscriber 訂閱-發布模式


Publisher/Subscriber 訂閱-發布模式

本博后續將陸續整理這些年做的一些預研demo,及一些前沿技術的研究,與大家共研技術,共同進步。

關於發布訂閱有很多種實現方式,下面主要介紹WCF中的發布訂閱,主要參考書籍《Programming WCF Services》,閑話不多說進入正題。使用傳統的雙工回調(例子 http://www.cnblogs.com/artech/archive/2007/03/02/661969.html)實現發布訂閱模式存在許多缺陷,主要問題是,它會引入發布者和訂閱者之間的高度耦合。訂閱者必須先知道發布者在哪里,然后才能訂閱它們,任何訂閱者不知道的服務都無法通知事件的訂閱者,部署好的應用程序中添加新的訂閱者(或者移除已經存在的訂閱者)是十分困難的事情。大致相同的是發布者也只能給它知道的訂閱者發送通知消息,同時發布者還需要管理訂閱者列表,這些與業務服務無關,這些邏輯增加了發布者的復雜度,另外在安全方面也存在訂閱者與發布者也存在耦合,而且在發布者進程宕機時,所有訂閱都會丟失。

要解決上面提及的問題最常見的解決方案就是發布-訂閱模式(Publish-Subscribe 【OBSERVER】),如圖D-1所示。

 

這里將訂閱者區分為臨時訂閱者與持久訂閱者,持久訂閱者可以保存到磁盤上,當事件觸發時可以通知訂閱者,也可以很方便的通過傳遞回調使用回調機制,對於持久訂閱者,需要記錄訂閱者地址,當觸發事件時,發布服務將會調用持久訂閱者地址 ,然后傳遞事件,因持久訂閱者保存了訂閱者地址至數據庫或磁盤,因此當發布服務宕機時提高了管理性。

以上主要介紹理論,下面進入實踐階段,首先下載ServiceModelEx(Programming WCF Services 里面書籍作者提供的簡化WCF編程的動態庫),  https://github.com/CaseyBurns/ServiceModelEx,我們暫時不需要服務總線所以我們引入ServiceModelEx (.NET 4.0 no service bus) ,建好測試服務端(這里為了方便測試使用GUI 應用程序作為宿主),客戶端。

管理臨時訂閱

例子D-1使用ServiceModelEx 提供的ISubscriptionService接口管理臨時訂閱者

復制代碼
   [ServiceContract]
   public interface ISubscriptionService
   {
      [OperationContract]
      void Subscribe(string eventOperation);

      [OperationContract]
      void Unsubscribe(string eventOperation);
   }
復制代碼

作為通用接口它不關心回調契約,然后添加臨時訂閱者契約繼承通用接口,並設置回調契約

    [ServiceContract(CallbackContract = typeof(IMyEvents))]
    public interface IMySubscriptionService : ISubscriptionService
    {
    }

回調契約

復制代碼
    [ServiceContract]
    public interface IMyEvents
    {
        [OperationContract(IsOneWay = true)]
        void OnEvent1();
        [OperationContract(IsOneWay = true)]
        void OnEvent2(int number);
        [OperationContract(IsOneWay = true)]
        void OnEvent3(int number, string text);
    }
復制代碼

實現臨時訂閱服務.

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
    public class MySubscriptionService : SubscriptionManager<IMyEvents>, IMySubscriptionService,IPersistentSubscriptionService
    {
       
    }

這里有幾點需要注意:服務類型必須是會話服務(InstanceContextMode = InstanceContextMode.PerCall),會話服務才能夠使用回調,另外ServiceModelEx 中的類 SubscriptionManager<T> 已經實現了通用接口所定義的添加訂閱者與取消訂閱接口,所以這里不需要我們再寫任何代碼。IPersistentSubscriptionService 作為持久訂閱者接口,SubscriptionManager<T> 也實現了該接口,接下來會講到。

配置文件配置發布訂閱者服務

復制代碼
  <system.serviceModel>
    <serviceHostingEnvironment multipleSiteBindingsEnabled="true" />
    <bindings>
      <netTcpBinding>
        <binding name="NetTcpBinding_IService1" receiveTimeout="00:25:00"
          maxBufferSize="2147483647" maxReceivedMessageSize="2147483647" transactionFlow="true">
          <reliableSession inactivityTimeout="00:25:00" enabled="true" />
          <security mode="None" />
        </binding>
      </netTcpBinding>
    </bindings>
    <services>
      <service behaviorConfiguration="MyBehavior" name="Service.sub.MySubscriptionService">
        <host>
          <baseAddresses>
            <add baseAddress="net.tcp://localhost:8022/"/>
            <add baseAddress="http://localhost:8023/"/>
          </baseAddresses>
        </host>
        <endpoint name="Sub" address="Sub" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
          contract="Service.sub.IMySubscriptionService" />
        <endpoint name="PersistentSub" address="PersistentSub" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
            contract="ServiceModelEx.IPersistentSubscriptionService" />
      </service>
      <service behaviorConfiguration="MyBehavior" name="Service.pub.MyPublishService">
        <host>
          <baseAddresses>
            <add baseAddress="net.tcp://localhost:8022/MyPub/"/>
            <add baseAddress="http://localhost:8023/MyPub/"/>
          </baseAddresses>
        </host>
        <endpoint name="PubMyEvents" address="PubMyEvents" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
          contract="Service.sub.IMyEvents" />
      </service>
    </services>
    <behaviors>
      <serviceBehaviors>
        <behavior name="MyBehavior">
          <serviceMetadata httpGetEnabled="true"/>
          <serviceThrottling maxConcurrentCalls="1000" maxConcurrentSessions="10000" />
          <serviceDebug includeExceptionDetailInFaults="true" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
  </system.serviceModel>
復制代碼

其中Service.pub.MyPublishService 服務為發布者服務配置 接下來會講到。

這樣臨時訂閱者就實現了,接下來看持久訂閱者.持久訂閱者的通用接口使用ServiceModelEx中定義的IPersistentSubscriptionService

復制代碼
   [ServiceContract]
   public interface IPersistentSubscriptionService
   {
      [OperationContract(Name = "SubscribePersistent")]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      void Subscribe(string address,string eventsContract,string eventOperation);

      [OperationContract(Name = "UnSubscribePersistent")]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      void Unsubscribe(string address,string eventsContract,string eventOperation);

      [OperationContract]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      PersistentSubscription[] GetAllSubscribers();

      [OperationContract]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      PersistentSubscription[] GetSubscribersToContract(string eventsContract);

      [OperationContract]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      string[] GetSubscribersToContractEventType(string eventsContract,string eventOperation);

      [OperationContract]
      [TransactionFlow(TransactionFlowOption.Allowed)]
      PersistentSubscription[] GetAllSubscribersFromAddress(string address);
   }
復制代碼

這里我添加了對[OperationContract(Name = "SubscribePersistent")] 將添加訂閱方法進行重命名,以區別臨時訂閱接口的Subscribe方法.持久訂閱不需要回調函數,接下來實現持久訂閱同樣簡單,上面已經貼過代碼,ServiceModelEx中的SubscriptionManager<T>同樣已經實現了IPersistentSubscriptionService接口,這樣臨時訂閱與持久訂閱完成,接下來看發布服務。

發布服務應該支持與訂閱服務一樣的事件契約,這是訂閱服務與發布服務唯一的連接點,使用IMyEvents 作為例子,另外ServiceModelEx提供了用於簡化發布服務的幫助類PublishService<T>

復制代碼
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
    public class MyPublishService : PublishService<IMyEvents>, IMyEvents
    {
        public void OnEvent1()
        {
            FireEvent();
        }

        public void OnEvent2(int number)
        {
            FireEvent(number);
        }

        public void OnEvent3(int number, string text)
        {
            FireEvent(number, text);
        }
    }
復制代碼

其中FireEvent()被用作激發所有訂閱者的事件,無論是臨時還是持久訂閱者,幫助類PublishService<T>已經做了實現,接下來配置發布服務

復制代碼
      <service behaviorConfiguration="MyBehavior" name="Service.pub.MyPublishService">
        <host>
          <baseAddresses>
            <add baseAddress="net.tcp://localhost:8022/MyPub/"/>
            <add baseAddress="http://localhost:8023/MyPub/"/>
          </baseAddresses>
        </host>
        <endpoint name="PubMyEvents" address="PubMyEvents" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1"
          contract="Service.sub.IMyEvents" />
      </service>
復制代碼

這樣發布服務完成,使用Gui應用程序作為宿主,可以使用ServiceModelEx 中ServiceHost<T> 作為發布的幫助類 。

復制代碼
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }
        ServiceHost<MyPublishService> hostPub = new ServiceHost<MyPublishService>();
        ServiceHost<MySubscriptionService> host = new ServiceHost<MySubscriptionService>(); 
        private void Form1_Load(object sender, EventArgs e)
        {
            try
            {
                host.EnableMetadataExchange();
                host.Open();

                hostPub.EnableMetadataExchange();
                hostPub.Open();
            }
            catch (Exception ex)
            {
                throw;
            }

        }

      
        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            try
            {
                host.Close();
            }
            catch (Exception)
            {
                try
                {
                    host.Abort();
                }
                catch (Exception)
                {

                }
            }
            try
            {
                hostPub.Close();
            }
            catch (Exception)
            {
                try
                {
                    hostPub.Abort();
                }
                catch (Exception)
                {

                }
            }
        }
    }
復制代碼

其中 host.EnableMetadataExchange(); 能夠幫助發布元數據,不需要再到配置中進行配置,服務配置好后接下來看客戶端使用,

客戶端可以直接添加服務引用生成服務代理,但是一般本人喜歡使用SvcUtil工具生成代理,或者干脆直接使用通道進行服務調用,后者更為我所喜愛,因為這樣代碼閱讀行更強,更簡練。例子中偷了下懶,直接添加服務引用,然后用通道調用服務,這樣剩了點復制配置或者接口的功夫,所以看到例子不要感到奇怪,全因懶造成的,廢話不多說,接下來看臨時訂閱客戶端調用

復制代碼
        DuplexChannelFactory<IMySubscriptionService, IMySubscriptionServiceCallback> channelFactory = null;
        IMySubscriptionService proxy = null;
        private void btnSub_Click(object sender, EventArgs e)
        {
            MyEventsCallback callBack = new MyEventsCallback();
            callBack.OnResultEvent += CallBack_OnResultEvent;
            InstanceContext<IMySubscriptionServiceCallback> instanceContext = new InstanceContext<IMySubscriptionServiceCallback>(callBack);
            channelFactory = new DuplexChannelFactory<IMySubscriptionService, IMySubscriptionServiceCallback>(instanceContext, "Sub");
            proxy = channelFactory.CreateChannel();
            proxy.Subscribe(null);
        }
復制代碼

這里使用ServiceModelEx 中提供的DuplexChannelFactory<T,C>  類型安全的雙向通道類創建代理,MyEventsCallback 實現回調接口,具體實現如下:

復制代碼
    internal class MyEventsCallback : IMySubscriptionServiceCallback
    {
        SynchronizationContext sc = SynchronizationContext.Current;
        public event EventHandler<EventsCallbackArgs> OnResultEvent;
        public void OnEvent1()
        {
            sc.Post(result =>
            {
                EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent1", System.Environment.NewLine) };
                e.Raise(this, ref OnResultEvent);
            }, null);
        }

        public void OnEvent2(int number)
        {
            sc.Post(result =>
            {
                EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent2:", number, System.Environment.NewLine) };
                e.Raise(this, ref OnResultEvent);
            }, null);
        }

        public void OnEvent3(int number, string text)
        {
            sc.Post(result =>
            {
                EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent3:", number, "text:", text + System.Environment.NewLine) };
                e.Raise(this, ref OnResultEvent);
            }, null);
        }
    }
復制代碼
復制代碼
    public static class EventArgExtensions
    {
        public static void Raise<TEventArgs>(this TEventArgs e, Object sender, ref EventHandler<TEventArgs> eventDelegate) where TEventArgs : EventArgs
        {
            EventHandler<TEventArgs> temp = Interlocked.CompareExchange(ref eventDelegate, null, null);
            if (temp != null) temp(sender, e);
        }
    }
復制代碼

 

SynchronizationContext 上下文提供post方法調用gui線程更新ui,e.Raise 使用擴展以線程安全方式調用事件,客戶端調用訂閱者就完成了,別忘了關閉代理,接下來看客戶端調用發布者

客戶端調用發布服務:

復制代碼
  public partial class PubMessageForm : Form
    {
        public PubMessageForm()
        {
            InitializeComponent();
        }
        ChannelFactory<IMyEvents> channelFactory = null;
        IMyEvents proxy = null;
        private void btnStartPub_Click(object sender, EventArgs e)
        {
            channelFactory = new ChannelFactory<IMyEvents>("PubMyEvents");
            proxy = channelFactory.CreateChannel();
        }

        private void PubMessageForm_FormClosed(object sender, FormClosedEventArgs e)
        {
            try
            {
                using (proxy as IDisposable)
                {

                }
                channelFactory.Close();
            }
            catch
            {
                channelFactory.Abort();
            }
        }

        private void btnPub_Click(object sender, EventArgs e)
        {
            proxy.OnEvent1();
        }

        private void btnPub2_Click(object sender, EventArgs e)
        {
            proxy.OnEvent2(2);
        }

        private void btnPub3_Click(object sender, EventArgs e)
        {
            proxy.OnEvent3(3, txtPubMessage.Text);
        }

        private void PubMessageForm_Load(object sender, EventArgs e)
        {

        }
    }
復制代碼

使用ChannelFactory<T> 通道調用發布服務

這樣WCF發布訂閱服務就完成了,另外如果發布服務或訂閱服務不需要同步綁定,可以考慮使用msmq ,這樣發布-訂閱模式兼具松耦合和無連接系統的優勢。

需要注意的是隊列化發布-訂閱服務不支持臨時訂閱,需要使用持久訂閱,具體實現在此不多講,另外還可以結合服務發現實現另外一種模式的發布訂閱模式,具體可以參考書籍《Programming WCF Services》。

 Demo下載 http://files.cnblogs.com/files/skystar/Demo.7z

書中自有黃金屋,書中自有顏如玉


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM