EasyNetQ使用(三)【Publish與Subcribe】


EasyNetQ支持的最簡單的消息模式是發布/訂閱.這個模式是一個極好的方法用來解耦消息提供者和消費者。消息發布者只要簡單的對世界說,“這里有些事發生” 或者 “我現在有一個信息”。它不關心有沒有人監聽,或者接收者是誰,或者接收者在那里。我們能夠添加和移除特定類型的消息的訂閱者,不需發布者做任何的重新配置。我們也能夠有多個發布者發布相同的消息,添加和刪除發布者也不用其他的發布者或者訂閱者做任何重新配置。

EasyNetQ發布消息(假定你已經重建了一個IBus實例)

  1. 創建你自己的消息實例,可以是任何可序列化的 .NET 類型。
  2. 調用IBus上的Publish方法,並傳入你的消息實例。

代碼如下:

var message = new MyMessage{ Text = "Hello Rabbit" }; bus.Publish(message);

 

為確保消息投遞成功,請看Publisher Confirms.

發布者和訂閱者之間彼此是不知道對方的。發布者簡單的對世界說“這兒有事情發生”,訂閱者告訴世界“我關心這種事兒的發生”。在這個模型中這是很好的,沒有人關心特定的事件。可能有一個訂閱者關心這個消息,也可能有200個,或者沒有人關心它。發布者不應該關心EasyNetQ對這個消息模式的實現。假如你開始去發布消息,而沒有任何訂閱者曾經定義此消息,那么這個消息就簡單的消失了。這是我們的設計意圖。


一個EasyNetQ訂閱者訂閱一種消息類型(消息類為.NET 類型)。通過調用Subcribe方法一旦對一個類型設置了訂閱,一個持久化隊列就會在RabbitMQ broker上被創建,這個類型的任何消息都會被發送到這個隊列上。訂閱者無論什么時候連接上,RabbitMQ都將會將消息從隊列中發送給訂閱者。

不管消息什么時候送達到,訂閱這個消息的訂閱者需要給RabbitMQ一個可執行的操作。我們通過傳遞一個訂閱代理:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
  • 1

現在每一次MyMessage實例被發送后,EasyNetQ將會調用我們的代理,打印這個消息的Text屬性到控制台。

你傳給訂閱的訂閱Id是重要的。 EasyNetQ將會在RabbitMQ Broker上為特定的消息類型的和訂閱id的組合創建唯一的隊列。

每一次調用Subscribe方法會創建一個新的隊列消費者。如果你用相同的消息和訂閱id調用Subscribe兩次,你將會創建兩個消費者去消費同一個隊列。然后RabbitMQ將會依次連續輪詢消息給每一個消費者。這種可伸縮性和工作分擔是非常棒的。比如說,你創建了一個處理特殊消息的服務,但是他已經超負荷工作了。簡單的創建一個新的服務實例(在同一個機器上,或者不同的機器上),不用配置任何東西,你自動就得到了伸縮性。

假如相同的消息類型,用不同的訂閱id調用了兩次Subscribe,你將創建兩個隊列,每一個隊列有自己的消費者。每一個消息的副本將會路由到每一個隊列,因此不同的消費者都將得到所有消息(這個類型的)。假如你有幾個不同的服務都關心相同類型的消息,這這樣很好。
寫訂閱回調委托時的注意事項

通過EasyNetQ訂閱到一個來至隊列的消息,他們被放置在內存隊列中。一個單獨線程循環對壘得到消息,調用他們的委托方法。因為在一個獨立線程上一個委托一次處理一個消息,你應該避免長時間的同步的IO操作。應該盡快從委托返回控制。

使用異步訂閱 SubscribeAsync

SubscribeAsync 允許你的訂閱者委托到一個能立即返回的Task,然后異步的執行長時間IO操作。一但長時間運行的訂閱完成后,就簡單的完成這個任務。下面的例子,我們請求一個web service使用一個異步IO操作(DownloadStringTask)。當這個task完成事,寫一行信息到控制台。

bus.SubscribeAsync<MyMessage>("subscribe_async_test",message => new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500")) .ContinueWith(task => Console.WriteLine("Received:'{0}',Downloaded:'{1}'", message.Text, task.Result)))

 

另一個列子是如果有錯誤發生,返回結果會有異常拋出,那么消息將會被放到一個默認的錯誤隊列中。

_bus.SubscribeAsync<MessageType>("queue_Identifier", Message => Task.Factory.StartNew(() => { //這里執行一些操作 //如果這里有一個異常,那么在這個Task執行完畢后,這個異常會作為結果返回, // 然后任務將繼續執行下去。 }).ContinueWith(task => { if ( task.IsCompleted && ! task.IsFaulted) { // 一切都很好 } else { // 不要Catch 異常,否則異常會進一步被嵌套,結果會被發送到默認的錯誤隊列 throw new EasyNetQException("Message processing exception - look in t the default error quenue(broker)"); } }));

 

取消訂閱

所有的訂閱都會返回一個ISubscriptionResult接口實例。它包含屬性有訂閱底層被IConsumer使用的IExchaneIQueue,如果你需要使用更高級的API IAdvancedBus更好的去處理,這會變為可能。

你能夠在任何時間取消一個訂閱者,通過調用ISubscriptionResult實例上的Dispose方法,或者在它之上的 ConsumerCancellation屬性。

var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler); ... subscriptionResult.Dispose(); // 這個等價與 subscriptionResult.ConsumerCancellation.Dispose();

 

這將停止EasyNetQ對隊列的消費,並且關閉這個消費者的channel

注意:IBusIAndvancedBusdispose,可能夠取消所有消費者,並關閉對RabbitMQ的連接。

不要在消息處理器中調用 subscriptionResult.Dispose()。這將在EasyNetQ ACK 消息時,在消費者的channelsubscriptionResult.Dispose()調用關閉Channel之間,創建一個競爭狀態。由於EasyNetQ的內部架構這些將會在不同的線程被調用,還有時間上的不確定性。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM