6-EasyNetQ之訂閱


一個EasyNetQ訂閱者訂閱一種消息類型(消息類為.NET 類型)。通過調用Subcribe方法一旦對一個類型設置了訂閱,一個持久化隊列就會在RabbitMQ broker上被創建,這個類型的任何消息都會被發送到這個隊列上。訂閱者無論什么時候連接上,RabbitMQ都將會將消息從隊列中發送給訂閱者。

不管消息什么時候送達到,訂閱這個消息的訂閱者需要給RabbitMQ一個可執行的操作。我們通過傳遞一個訂閱代理:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

現在每一次MyMessage實例被發送后,EasyNetQ將會調用我們的代理,打印這個消息的Text屬性到控制台。

你傳給訂閱的訂閱Id是重要的。 EasyNetQ將會在RabbitMQ Broker上為特定的消息類型的和訂閱id的組合創建唯一的隊列。

每一次調用Subscribe方法會創建一個新的隊列消費者。如果你用相同的消息和訂閱id調用Subscribe兩次,你將會創建兩個消費者去消費同一個隊列。然后RabbitMQ將會依次連續輪詢消息給每一個消費者。這種可伸縮性和工作分擔是非常棒的。比如說,你創建了一個處理特殊消息的服務,但是他已經超負荷工作了。簡單的創建一個新的服務實例(在同一個機器上,或者不同的機器上),不用配置任何東西,你自動就得到了伸縮性。

假如相同的消息類型,用不同的訂閱id調用了兩次Subscribe,你將創建兩個隊列,每一個隊列有自己的消費者。每一個消息的副本將會路由到每一個隊列,因此不同的消費者都將得到所有消息(這個類型的)。假如你有幾個不同的服務都關心相同類型的消息,這這樣很好。

寫訂閱回調委托時的注意事項

通過EasyNetQ訂閱到一個來至隊列的消息,他們被放置在內存隊列中。一個單獨線程循環對壘得到消息,調用他們的委托方法。因為在一個獨立線程上一個委托一次處理一個消息,你應該避免長時間的同步的IO操作。應該盡快從委托返回控制。

使用異步訂閱 SubscribeAsync

SubscribeAsync 允許你的訂閱者委托到一個能立即返回的Task,然后異步的執行長時間IO操作。一但長時間運行的訂閱完成后,就簡單的完成這個任務。下面的例子,我們請求一個web service使用一個異步IO操作(DownloadStringTask)。當這個task完成事,寫一行信息到控制台。

bus.SubscribeAsync<MyMessage>("subscribe_async_test",message => 
       new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
           .ContinueWith(task => 
                Console.WriteLine("Received:'{0}',Downloaded:'{1}'",
                    message.Text,
                    task.Result)))

另一個列子是如果有錯誤發生,返回結果會有異常拋出,那么消息將會被放到一個默認的錯誤隊列中。

_bus.SubscribeAsync<MessageType>("queue_Identifier", 
             Message => Task.Factory.StartNew(() => 
             {
                 //這里執行一些操作
                 //如果這里有一個異常,那么在這個Task執行完畢后,這個異常會作為結果返回,
                 // 然后任務將繼續執行下去。
             }).ContinueWith(task => 
             {
                 if ( task.IsCompleted && ! task.IsFaulted)
                 {
                     // 一切都很好
                 }
                 else
                 {
                     // 不要Catch 異常,否則異常會進一步被嵌套,結果會被發送到默認的錯誤隊列
                     throw new EasyNetQException("Message processing exception - look in t  the default error quenue(broker)");
                 }
             }));

取消訂閱##

所有的訂閱都會返回一個ISubscriptionResult接口實例。它包含屬性有訂閱底層被IConsumer使用的IExchane和IQueue,如果你需要使用更高級的API IAdvancedBus更好的去處理,這會變為可能。

你能夠在任何時間取消一個訂閱者,通過調用ISubscriptionResult實例上的Dispose方法,或者在它之上的 ConsumerCancellation屬性。

var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);

...

subscriptionResult.Dispose();
// 這個等價與 subscriptionResult.ConsumerCancellation.Dispose();

這將停止EasyNetQ對隊列的消費,並且關閉這個消費者的channel。

注意:IBus和IAndvancedBus的dispose,可能夠取消所有消費者,並關閉對RabbitMQ的連接。

不要在消息處理器中調用 subscriptionResult.Dispose()。這將在EasyNetQ ACK 消息時,在消費者的channel和subscriptionResult.Dispose()調用關閉Channel之間,創建一個競爭狀態。由於EasyNetQ的內部架構這些將會在不同的線程被調用,還有時間上的不確定性。

英文地址:https://github.com/EasyNetQ/EasyNetQ/wiki/Subscribe
本文地址:http://www.cnblogs.com/HuangLiang/p/EasyNetQ_Subscribe.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM