Publisher/Subscriber 訂閱-發布模式
本博后續將陸續整理這些年做的一些預研demo,及一些前沿技術的研究,與大家共研技術,共同進步。
關於發布訂閱有很多種實現方式,下面主要介紹WCF中的發布訂閱,主要參考書籍《Programming WCF Services》,閑話不多說進入正題。使用傳統的雙工回調(例子 http://www.cnblogs.com/artech/archive/2007/03/02/661969.html)實現發布訂閱模式存在許多缺陷,主要問題是,它會引入發布者和訂閱者之間的高度耦合。訂閱者必須先知道發布者在哪里,然后才能訂閱它們,任何訂閱者不知道的服務都無法通知事件的訂閱者,部署好的應用程序中添加新的訂閱者(或者移除已經存在的訂閱者)是十分困難的事情。大致相同的是發布者也只能給它知道的訂閱者發送通知消息,同時發布者還需要管理訂閱者列表,這些與業務服務無關,這些邏輯增加了發布者的復雜度,另外在安全方面也存在訂閱者與發布者也存在耦合,而且在發布者進程宕機時,所有訂閱都會丟失。
要解決上面提及的問題最常見的解決方案就是發布-訂閱模式(Publish-Subscribe 【OBSERVER】),如圖D-1所示。
這里將訂閱者區分為臨時訂閱者與持久訂閱者,持久訂閱者可以保存到磁盤上,當事件觸發時可以通知訂閱者,也可以很方便的通過傳遞回調使用回調機制,對於持久訂閱者,需要記錄訂閱者地址,當觸發事件時,發布服務將會調用持久訂閱者地址 ,然后傳遞事件,因持久訂閱者保存了訂閱者地址至數據庫或磁盤,因此當發布服務宕機時提高了管理性。
以上主要介紹理論,下面進入實踐階段,首先下載ServiceModelEx(Programming WCF Services 里面書籍作者提供的簡化WCF編程的動態庫), https://github.com/CaseyBurns/ServiceModelEx,我們暫時不需要服務總線所以我們引入ServiceModelEx (.NET 4.0 no service bus) ,建好測試服務端(這里為了方便測試使用GUI 應用程序作為宿主),客戶端。
管理臨時訂閱
例子D-1使用ServiceModelEx 提供的ISubscriptionService接口管理臨時訂閱者
[ServiceContract] public interface ISubscriptionService { [OperationContract] void Subscribe(string eventOperation); [OperationContract] void Unsubscribe(string eventOperation); }
作為通用接口它不關心回調契約,然后添加臨時訂閱者契約繼承通用接口,並設置回調契約
[ServiceContract(CallbackContract = typeof(IMyEvents))] public interface IMySubscriptionService : ISubscriptionService { }
回調契約
[ServiceContract] public interface IMyEvents { [OperationContract(IsOneWay = true)] void OnEvent1(); [OperationContract(IsOneWay = true)] void OnEvent2(int number); [OperationContract(IsOneWay = true)] void OnEvent3(int number, string text); }
實現臨時訂閱服務.
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] public class MySubscriptionService : SubscriptionManager<IMyEvents>, IMySubscriptionService,IPersistentSubscriptionService { }
這里有幾點需要注意:服務類型必須是會話服務(InstanceContextMode = InstanceContextMode.PerCall),會話服務才能夠使用回調,另外ServiceModelEx 中的類 SubscriptionManager<T> 已經實現了通用接口所定義的添加訂閱者與取消訂閱接口,所以這里不需要我們再寫任何代碼。IPersistentSubscriptionService 作為持久訂閱者接口,SubscriptionManager<T> 也實現了該接口,接下來會講到。
配置文件配置發布訂閱者服務
<system.serviceModel> <serviceHostingEnvironment multipleSiteBindingsEnabled="true" /> <bindings> <netTcpBinding> <binding name="NetTcpBinding_IService1" receiveTimeout="00:25:00" maxBufferSize="2147483647" maxReceivedMessageSize="2147483647" transactionFlow="true"> <reliableSession inactivityTimeout="00:25:00" enabled="true" /> <security mode="None" /> </binding> </netTcpBinding> </bindings> <services> <service behaviorConfiguration="MyBehavior" name="Service.sub.MySubscriptionService"> <host> <baseAddresses> <add baseAddress="net.tcp://localhost:8022/"/> <add baseAddress="http://localhost:8023/"/> </baseAddresses> </host> <endpoint name="Sub" address="Sub" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1" contract="Service.sub.IMySubscriptionService" /> <endpoint name="PersistentSub" address="PersistentSub" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1" contract="ServiceModelEx.IPersistentSubscriptionService" /> </service> <service behaviorConfiguration="MyBehavior" name="Service.pub.MyPublishService"> <host> <baseAddresses> <add baseAddress="net.tcp://localhost:8022/MyPub/"/> <add baseAddress="http://localhost:8023/MyPub/"/> </baseAddresses> </host> <endpoint name="PubMyEvents" address="PubMyEvents" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1" contract="Service.sub.IMyEvents" /> </service> </services> <behaviors> <serviceBehaviors> <behavior name="MyBehavior"> <serviceMetadata httpGetEnabled="true"/> <serviceThrottling maxConcurrentCalls="1000" maxConcurrentSessions="10000" /> <serviceDebug includeExceptionDetailInFaults="true" /> </behavior> </serviceBehaviors> </behaviors> </system.serviceModel>
其中Service.pub.MyPublishService 服務為發布者服務配置 接下來會講到。
這樣臨時訂閱者就實現了,接下來看持久訂閱者.持久訂閱者的通用接口使用ServiceModelEx中定義的IPersistentSubscriptionService
[ServiceContract] public interface IPersistentSubscriptionService { [OperationContract(Name = "SubscribePersistent")] [TransactionFlow(TransactionFlowOption.Allowed)] void Subscribe(string address,string eventsContract,string eventOperation); [OperationContract(Name = "UnSubscribePersistent")] [TransactionFlow(TransactionFlowOption.Allowed)] void Unsubscribe(string address,string eventsContract,string eventOperation); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] PersistentSubscription[] GetAllSubscribers(); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] PersistentSubscription[] GetSubscribersToContract(string eventsContract); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] string[] GetSubscribersToContractEventType(string eventsContract,string eventOperation); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] PersistentSubscription[] GetAllSubscribersFromAddress(string address); }
這里我添加了對[OperationContract(Name = "SubscribePersistent")] 將添加訂閱方法進行重命名,以區別臨時訂閱接口的Subscribe方法.持久訂閱不需要回調函數,接下來實現持久訂閱同樣簡單,上面已經貼過代碼,ServiceModelEx中的SubscriptionManager<T>同樣已經實現了IPersistentSubscriptionService接口,這樣臨時訂閱與持久訂閱完成,接下來看發布服務。
發布服務應該支持與訂閱服務一樣的事件契約,這是訂閱服務與發布服務唯一的連接點,使用IMyEvents 作為例子,另外ServiceModelEx提供了用於簡化發布服務的幫助類PublishService<T>
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] public class MyPublishService : PublishService<IMyEvents>, IMyEvents { public void OnEvent1() { FireEvent(); } public void OnEvent2(int number) { FireEvent(number); } public void OnEvent3(int number, string text) { FireEvent(number, text); } }
其中FireEvent()被用作激發所有訂閱者的事件,無論是臨時還是持久訂閱者,幫助類PublishService<T>已經做了實現,接下來配置發布服務
<service behaviorConfiguration="MyBehavior" name="Service.pub.MyPublishService"> <host> <baseAddresses> <add baseAddress="net.tcp://localhost:8022/MyPub/"/> <add baseAddress="http://localhost:8023/MyPub/"/> </baseAddresses> </host> <endpoint name="PubMyEvents" address="PubMyEvents" binding="netTcpBinding" bindingConfiguration="NetTcpBinding_IService1" contract="Service.sub.IMyEvents" /> </service>
這樣發布服務完成,使用Gui應用程序作為宿主,可以使用ServiceModelEx 中ServiceHost<T> 作為發布的幫助類 。
public partial class Form1 : Form { public Form1() { InitializeComponent(); } ServiceHost<MyPublishService> hostPub = new ServiceHost<MyPublishService>(); ServiceHost<MySubscriptionService> host = new ServiceHost<MySubscriptionService>(); private void Form1_Load(object sender, EventArgs e) { try { host.EnableMetadataExchange(); host.Open(); hostPub.EnableMetadataExchange(); hostPub.Open(); } catch (Exception ex) { throw; } } private void Form1_FormClosed(object sender, FormClosedEventArgs e) { try { host.Close(); } catch (Exception) { try { host.Abort(); } catch (Exception) { } } try { hostPub.Close(); } catch (Exception) { try { hostPub.Abort(); } catch (Exception) { } } } }
其中 host.EnableMetadataExchange(); 能夠幫助發布元數據,不需要再到配置中進行配置,服務配置好后接下來看客戶端使用,
客戶端可以直接添加服務引用生成服務代理,但是一般本人喜歡使用SvcUtil工具生成代理,或者干脆直接使用通道進行服務調用,后者更為我所喜愛,因為這樣代碼閱讀行更強,更簡練。例子中偷了下懶,直接添加服務引用,然后用通道調用服務,這樣剩了點復制配置或者接口的功夫,所以看到例子不要感到奇怪,全因懶造成的,廢話不多說,接下來看臨時訂閱客戶端調用
DuplexChannelFactory<IMySubscriptionService, IMySubscriptionServiceCallback> channelFactory = null; IMySubscriptionService proxy = null; private void btnSub_Click(object sender, EventArgs e) { MyEventsCallback callBack = new MyEventsCallback(); callBack.OnResultEvent += CallBack_OnResultEvent; InstanceContext<IMySubscriptionServiceCallback> instanceContext = new InstanceContext<IMySubscriptionServiceCallback>(callBack); channelFactory = new DuplexChannelFactory<IMySubscriptionService, IMySubscriptionServiceCallback>(instanceContext, "Sub"); proxy = channelFactory.CreateChannel(); proxy.Subscribe(null); }
這里使用ServiceModelEx 中提供的DuplexChannelFactory<T,C> 類型安全的雙向通道類創建代理,MyEventsCallback 實現回調接口,具體實現如下:
internal class MyEventsCallback : IMySubscriptionServiceCallback { SynchronizationContext sc = SynchronizationContext.Current; public event EventHandler<EventsCallbackArgs> OnResultEvent; public void OnEvent1() { sc.Post(result => { EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent1", System.Environment.NewLine) }; e.Raise(this, ref OnResultEvent); }, null); } public void OnEvent2(int number) { sc.Post(result => { EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent2:", number, System.Environment.NewLine) }; e.Raise(this, ref OnResultEvent); }, null); } public void OnEvent3(int number, string text) { sc.Post(result => { EventsCallbackArgs e = new EventsCallbackArgs() { Msg = string.Concat("OnEvent3:", number, "text:", text + System.Environment.NewLine) }; e.Raise(this, ref OnResultEvent); }, null); } }
public static class EventArgExtensions { public static void Raise<TEventArgs>(this TEventArgs e, Object sender, ref EventHandler<TEventArgs> eventDelegate) where TEventArgs : EventArgs { EventHandler<TEventArgs> temp = Interlocked.CompareExchange(ref eventDelegate, null, null); if (temp != null) temp(sender, e); } }
SynchronizationContext 上下文提供post方法調用gui線程更新ui,e.Raise 使用擴展以線程安全方式調用事件,客戶端調用訂閱者就完成了,別忘了關閉代理,接下來看客戶端調用發布者
客戶端調用發布服務:
public partial class PubMessageForm : Form { public PubMessageForm() { InitializeComponent(); } ChannelFactory<IMyEvents> channelFactory = null; IMyEvents proxy = null; private void btnStartPub_Click(object sender, EventArgs e) { channelFactory = new ChannelFactory<IMyEvents>("PubMyEvents"); proxy = channelFactory.CreateChannel(); } private void PubMessageForm_FormClosed(object sender, FormClosedEventArgs e) { try { using (proxy as IDisposable) { } channelFactory.Close(); } catch { channelFactory.Abort(); } } private void btnPub_Click(object sender, EventArgs e) { proxy.OnEvent1(); } private void btnPub2_Click(object sender, EventArgs e) { proxy.OnEvent2(2); } private void btnPub3_Click(object sender, EventArgs e) { proxy.OnEvent3(3, txtPubMessage.Text); } private void PubMessageForm_Load(object sender, EventArgs e) { } }
使用ChannelFactory<T> 通道調用發布服務
這樣WCF發布訂閱服務就完成了,另外如果發布服務或訂閱服務不需要同步綁定,可以考慮使用msmq ,這樣發布-訂閱模式兼具松耦合和無連接系統的優勢。
需要注意的是隊列化發布-訂閱服務不支持臨時訂閱,需要使用持久訂閱,具體實現在此不多講,另外還可以結合服務發現實現另外一種模式的發布訂閱模式,具體可以參考書籍《Programming WCF Services》。
Demo下載 http://files.cnblogs.com/files/skystar/Demo.7z
書中自有黃金屋,書中自有顏如玉