一個EasyNetQ訂閱者訂閱一種消息類型(消息類為.NET 類型)。通過調用Subcribe方法一旦對一個類型設置了訂閱,一個持久化隊列就會在RabbitMQ broker上被創建,這個類型的任何消息都會被發送到這個隊列上。訂閱者無論什么時候連接上,RabbitMQ都將會將消息從隊列中發送給訂閱者。
不管消息什么時候送達到,訂閱這個消息的訂閱者需要給RabbitMQ一個可執行的操作。我們通過傳遞一個訂閱代理:
bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
現在每一次MyMessage實例被發送后,EasyNetQ將會調用我們的代理,打印這個消息的Text屬性到控制台。
你傳給訂閱的訂閱Id是重要的。 EasyNetQ將會在RabbitMQ Broker上為特定的消息類型的和訂閱id的組合創建唯一的隊列。
每一次調用Subscribe方法會創建一個新的隊列消費者。如果你用相同的消息和訂閱id調用Subscribe兩次,你將會創建兩個消費者去消費同一個隊列。然后RabbitMQ將會依次連續輪詢消息給每一個消費者。這種可伸縮性和工作分擔是非常棒的。比如說,你創建了一個處理特殊消息的服務,但是他已經超負荷工作了。簡單的創建一個新的服務實例(在同一個機器上,或者不同的機器上),不用配置任何東西,你自動就得到了伸縮性。
假如相同的消息類型,用不同的訂閱id調用了兩次Subscribe,你將創建兩個隊列,每一個隊列有自己的消費者。每一個消息的副本將會路由到每一個隊列,因此不同的消費者都將得到所有消息(這個類型的)。假如你有幾個不同的服務都關心相同類型的消息,這這樣很好。
寫訂閱回調委托時的注意事項
通過EasyNetQ訂閱到一個來至隊列的消息,他們被放置在內存隊列中。一個單獨線程循環對壘得到消息,調用他們的委托方法。因為在一個獨立線程上一個委托一次處理一個消息,你應該避免長時間的同步的IO操作。應該盡快從委托返回控制。
使用異步訂閱 SubscribeAsync
SubscribeAsync 允許你的訂閱者委托到一個能立即返回的Task,然后異步的執行長時間IO操作。一但長時間運行的訂閱完成后,就簡單的完成這個任務。下面的例子,我們請求一個web service使用一個異步IO操作(DownloadStringTask)。當這個task完成事,寫一行信息到控制台。
bus.SubscribeAsync<MyMessage>("subscribe_async_test",message =>
new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
.ContinueWith(task =>
Console.WriteLine("Received:'{0}',Downloaded:'{1}'",
message.Text,
task.Result)))
另一個列子是如果有錯誤發生,返回結果會有異常拋出,那么消息將會被放到一個默認的錯誤隊列中。
_bus.SubscribeAsync<MessageType>("queue_Identifier",
Message => Task.Factory.StartNew(() =>
{
//這里執行一些操作
//如果這里有一個異常,那么在這個Task執行完畢后,這個異常會作為結果返回,
// 然后任務將繼續執行下去。
}).ContinueWith(task =>
{
if ( task.IsCompleted && ! task.IsFaulted)
{
// 一切都很好
}
else
{
// 不要Catch 異常,否則異常會進一步被嵌套,結果會被發送到默認的錯誤隊列
throw new EasyNetQException("Message processing exception - look in t the default error quenue(broker)");
}
}));
取消訂閱##
所有的訂閱都會返回一個ISubscriptionResult接口實例。它包含屬性有訂閱底層被IConsumer使用的IExchane和IQueue,如果你需要使用更高級的API IAdvancedBus更好的去處理,這會變為可能。
你能夠在任何時間取消一個訂閱者,通過調用ISubscriptionResult實例上的Dispose方法,或者在它之上的 ConsumerCancellation屬性。
var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);
...
subscriptionResult.Dispose();
// 這個等價與 subscriptionResult.ConsumerCancellation.Dispose();
這將停止EasyNetQ對隊列的消費,並且關閉這個消費者的channel。
注意:IBus和IAndvancedBus的dispose,可能夠取消所有消費者,並關閉對RabbitMQ的連接。
不要在消息處理器中調用 subscriptionResult.Dispose()。這將在EasyNetQ ACK 消息時,在消費者的channel和subscriptionResult.Dispose()調用關閉Channel之間,創建一個競爭狀態。由於EasyNetQ的內部架構這些將會在不同的線程被調用,還有時間上的不確定性。
英文地址:https://github.com/EasyNetQ/EasyNetQ/wiki/Subscribe
本文地址:http://www.cnblogs.com/HuangLiang/p/EasyNetQ_Subscribe.html