一個EasyNetQ訂閱者訂閱一種消息類型(消息類的.NET 類型)。一旦通過調用Subscribe方法對一個類型建立了訂閱,一個持久化的隊列就會在RabbitMQ broker代理服務器上被創建,這個類型的任何消息都會被發送到這個隊列上。訂閱者無論什么時候連接上,RabbitMQ都會把消息從隊列中發送給訂閱者。
訂閱時需要指定收到消息該怎么處理,我們一般會傳遞一個Action<T>泛型委托:
bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));
現在每次MyMessage實例被發布后(經RabbitMQ再發送給訂閱者),EasyNetQ會調用我們的委托方法,打印MyMessage的Text屬性到控制台。
你傳給Subscribe方法的訂閱Id是非常重要的。
EasyNetQ將會在RabbitMQ Broker代理服務器上為特定的消息類型和訂閱id的組合創建唯一的隊列。
(消息隊列名稱為: 消息命名空間名.消息類名:消息類庫文件名_訂閱時指定的id)如 Model.MyMessage:Models_my_subscription_id
每一次調用Subscribe方法會創建一個新的隊列消費者。如果你用相同的消息類型和訂閱id調用Subscribe兩次(即上面黃底組合內容相同),你將會創建兩個消費者去消費同一個隊列。然后RabbitMQ將依次向每個消費者輪轉方式發送后續消息。這對於擴展和工作分配非常有用。比如說,你創建了一個處理特殊消息的服務(做消費者),但是他已經超負荷工作了(假如消息/工作處理很耗時)。簡單的創建幾個新的服務實例(即增加消費者),無論在同一個機器上,還是不同的機器上,不用配置任何東西,你自動就得到了伸縮性。同理,如果消費者大部分時間很空閑,也可以關掉幾個消費者。。這也就是分布式服務。
假如相同的消息類型,用不同的訂閱id調用了兩次Subscribe,你將創建兩個隊列,每一個隊列有自己的消費者。每一個消息的副本將會路由到每個隊列,因此不同的消費者都將得到所有消息(這個類型的)。假如你有幾個不同的服務都關心相同類型的消息,這樣很棒。
寫訂閱回調委托時的注意事項
消費者通過EasyNetQ訂閱后,每當它從RabbitMQ消息隊列接收到消息,消息就被放置在消費者的內存隊列中。
EasyNetQ會創建單個線程循環從內存隊列讀取消息,調用你之前注冊的委托方法(處理消息)。因為是在單個線程上,委托一次只能處理一個消息,所以你要避免長時間地同步IO操作。你應該盡快從委托返回控制。
使用異步訂閱 SubscribeAsync
SubscribeAsync方法允許你的訂閱者委托到一個異步方法,它能立即返回Task,然后異步地執行長時間IO操作。一旦長時間運行的訂閱完成后,就簡單的完成這個任務。
在下面的例子中,我們使用一個異步IO操作(即DownloadStringTask)請求一個web service。當這個task完成時,寫一行信息到控制台。
1 bus.SubscribeAsync<MyMessage>("subscribe_async_test", message => 2 new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500")) 3 .ContinueWith(task => 4 Console.WriteLine("Received:'{0}',Downloaded:'{1}'", 5 message.Text, 6 task.Result)))
下一個列子是如果有錯誤發生,返回結果會有異常拋出,那么消息將會被放到一個默認的錯誤隊列中。
1 _bus.SubscribeAsync<MessageType>("Queue_Identifier", 2 message => Task.Factory.StartNew(() => 3 { 4 //這里執行一些操作 5 //如果這里有一個異常,那么在這個Task執行完畢后,這個異常會作為結果返回, 6 // 然后任務將繼續執行下去。 7 }).ContinueWith(task => 8 { 9 if ( task.IsCompleted && ! task.IsFaulted) 10 { 11 // 一切工作正常時 12 } 13 else 14 { 15 // 這里不要Catch 異常,否則異常會進一步被嵌套,results結果會被發送到RabbitMQ上默認的錯誤隊列 16 throw new EasyNetQException("Message processing exception - look in t the default error quenue(broker)"); 17 } 18 }));
撤銷訂閱
所有的Subscribe訂閱方法都會返回一個ISubscriptionResult接口實例。它包含返回IExchange和IQueue的屬性(實際在底層是通過IConsumer實現的),如果需要還可以使用高級API IAdvancededBus進一步操作這些屬性。
你可以在任何時間撤銷一個訂閱者,通過調用ISubscriptionResult實例上的Dispose方法,或者在它之上的 ConsumerCancellation屬性。
var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler); ... subscriptionResult.Dispose(); // 這個等價與 subscriptionResult.ConsumerCancellation.Dispose();
這將停止EasyNetQ對隊列的消費,並且關閉這個消費者的channel。
注意:上面代碼不同於IBus和IAndvancedBus的dispose方法,后兩者會撤銷所有消費者,並關閉與RabbitMQ的連接。
不要在消息處理(委托方法)中調用 subscriptionResult.Dispose()。這將在EasyNetQ ack確認消息和subscriptionResult.Dispose()調用關閉Channel之間產生一個競爭。(即到底會先ack確認?還是先關閉信道?)因為在EasyNetQ的內部架構中,這兩件事是在不同的線程上被執行,所以在時間上存在不確定性。