EasyNetQ
支持的最簡單的消息模式是發布/訂閱.這個模式是一個極好的方法用來解耦消息提供者和消費者。消息發布者只要簡單的對世界說,“這里有些事發生” 或者 “我現在有一個信息”。它不關心有沒有人監聽,或者接收者是誰,或者接收者在那里。我們能夠添加和移除特定類型的消息的訂閱者,不需發布者做任何的重新配置。我們也能夠有多個發布者發布相同的消息,添加和刪除發布者也不用其他的發布者或者訂閱者做任何重新配置。
用EasyNetQ
發布消息(假定你已經重建了一個IBus
實例)
- 創建你自己的消息實例,可以是任何可序列化的
.NET
類型。 - 調用
IBus
上的Publish
方法,並傳入你的消息實例。
代碼如下:
var message = new MyMessage{ Text = "Hello Rabbit" }; bus.Publish(message);
為確保消息投遞成功,請看Publisher Confirms
.
發布者和訂閱者之間彼此是不知道對方的。發布者簡單的對世界說“這兒有事情發生”,訂閱者告訴世界“我關心這種事兒的發生”。在這個模型中這是很好的,沒有人關心特定的事件。可能有一個訂閱者關心這個消息,也可能有200個,或者沒有人關心它。發布者不應該關心EasyNetQ
對這個消息模式的實現。假如你開始去發布消息,而沒有任何訂閱者曾經定義此消息,那么這個消息就簡單的消失了。這是我們的設計意圖。
一個EasyNetQ
訂閱者訂閱一種消息類型(消息類為.NET
類型)。通過調用Subcribe
方法一旦對一個類型設置了訂閱,一個持久化隊列就會在RabbitMQ broker
上被創建,這個類型的任何消息都會被發送到這個隊列上。訂閱者無論什么時候連接上,RabbitMQ
都將會將消息從隊列中發送給訂閱者。
不管消息什么時候送達到,訂閱這個消息的訂閱者需要給RabbitMQ
一個可執行的操作。我們通過傳遞一個訂閱代理:
bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
- 1
現在每一次MyMessage
實例被發送后,EasyNetQ
將會調用我們的代理,打印這個消息的Text
屬性到控制台。
你傳給訂閱的訂閱Id
是重要的。 EasyNetQ
將會在RabbitMQ Broker
上為特定的消息類型的和訂閱id
的組合創建唯一的隊列。
每一次調用Subscribe
方法會創建一個新的隊列消費者。如果你用相同的消息和訂閱id
調用Subscribe
兩次,你將會創建兩個消費者去消費同一個隊列。然后RabbitMQ
將會依次連續輪詢消息給每一個消費者。這種可伸縮性和工作分擔是非常棒的。比如說,你創建了一個處理特殊消息的服務,但是他已經超負荷工作了。簡單的創建一個新的服務實例(在同一個機器上,或者不同的機器上),不用配置任何東西,你自動就得到了伸縮性。
假如相同的消息類型,用不同的訂閱id
調用了兩次Subscribe
,你將創建兩個隊列,每一個隊列有自己的消費者。每一個消息的副本將會路由到每一個隊列,因此不同的消費者都將得到所有消息(這個類型的)。假如你有幾個不同的服務都關心相同類型的消息,這這樣很好。
寫訂閱回調委托時的注意事項
通過EasyNetQ
訂閱到一個來至隊列的消息,他們被放置在內存隊列中。一個單獨線程循環對壘得到消息,調用他們的委托方法。因為在一個獨立線程上一個委托一次處理一個消息,你應該避免長時間的同步的IO
操作。應該盡快從委托返回控制。
使用異步訂閱 SubscribeAsync
SubscribeAsync
允許你的訂閱者委托到一個能立即返回的Task
,然后異步的執行長時間IO
操作。一但長時間運行的訂閱完成后,就簡單的完成這個任務。下面的例子,我們請求一個web service
使用一個異步IO
操作(DownloadStringTask
)。當這個task
完成事,寫一行信息到控制台。
bus.SubscribeAsync<MyMessage>("subscribe_async_test",message => new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500")) .ContinueWith(task => Console.WriteLine("Received:'{0}',Downloaded:'{1}'", message.Text, task.Result)))
另一個列子是如果有錯誤發生,返回結果會有異常拋出,那么消息將會被放到一個默認的錯誤隊列中。
_bus.SubscribeAsync<MessageType>("queue_Identifier", Message => Task.Factory.StartNew(() => { //這里執行一些操作 //如果這里有一個異常,那么在這個Task執行完畢后,這個異常會作為結果返回, // 然后任務將繼續執行下去。 }).ContinueWith(task => { if ( task.IsCompleted && ! task.IsFaulted) { // 一切都很好 } else { // 不要Catch 異常,否則異常會進一步被嵌套,結果會被發送到默認的錯誤隊列 throw new EasyNetQException("Message processing exception - look in t the default error quenue(broker)"); } }));
取消訂閱
所有的訂閱都會返回一個ISubscriptionResult
接口實例。它包含屬性有訂閱底層被IConsumer
使用的IExchane
和IQueue
,如果你需要使用更高級的API IAdvancedBus
更好的去處理,這會變為可能。
你能夠在任何時間取消一個訂閱者,通過調用ISubscriptionResult
實例上的Dispose
方法,或者在它之上的 ConsumerCancellation
屬性。
var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler); ... subscriptionResult.Dispose(); // 這個等價與 subscriptionResult.ConsumerCancellation.Dispose();
這將停止EasyNetQ
對隊列的消費,並且關閉這個消費者的channel
。
注意:IBus
和IAndvancedBus
的dispose
,可能夠取消所有消費者,並關閉對RabbitMQ
的連接。
不要在消息處理器中調用 subscriptionResult.Dispose()
。這將在EasyNetQ ACK
消息時,在消費者的channel
和subscriptionResult.Dispose()
調用關閉Channel
之間,創建一個競爭狀態。由於EasyNetQ
的內部架構這些將會在不同的線程被調用,還有時間上的不確定性。