9、Subscribe訂閱


  一個EasyNetQ訂閱者訂閱一種消息類型(消息類的.NET 類型)。一旦通過調用Subscribe方法對一個類型建立了訂閱,一個持久化的隊列就會在RabbitMQ broker代理服務器上被創建,這個類型的任何消息都會被發送到這個隊列上。訂閱者無論什么時候連接上,RabbitMQ都會把消息從隊列中發送給訂閱者。

訂閱時需要指定收到消息該怎么處理,我們一般會傳遞一個Action<T>泛型委托:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

 

現在每次MyMessage實例被發布后(經RabbitMQ再發送給訂閱者),EasyNetQ會調用我們的委托方法,打印MyMessage的Text屬性到控制台。

你傳給Subscribe方法的訂閱Id是非常重要的

EasyNetQ將會在RabbitMQ Broker代理服務器上為特定的消息類型和訂閱id的組合創建唯一的隊列。

(消息隊列名稱為:    消息命名空間名.消息類名:消息類庫文件名_訂閱時指定的id)如     Model.MyMessage:Models_my_subscription_id

每一次調用Subscribe方法會創建一個新的隊列消費者。如果你用相同的消息類型和訂閱id調用Subscribe兩次(即上面黃底組合內容相同),你將會創建兩個消費者去消費同一個隊列。然后RabbitMQ將依次向每個消費者輪轉方式發送后續消息。這對於擴展和工作分配非常有用。比如說,你創建了一個處理特殊消息的服務(做消費者),但是他已經超負荷工作了(假如消息/工作處理很耗時)。簡單的創建幾個新的服務實例(即增加消費者),無論在同一個機器上,還是不同的機器上,不用配置任何東西,你自動就得到了伸縮性。同理,如果消費者大部分時間很空閑,也可以關掉幾個消費者。。這也就是分布式服務。

假如相同的消息類型,用不同的訂閱id調用了兩次Subscribe,你將創建兩個隊列,每一個隊列有自己的消費者。每一個消息的副本將會路由到每個隊列,因此不同的消費者都將得到所有消息(這個類型的)。假如你有幾個不同的服務都關心相同類型的消息,這樣很棒。

寫訂閱回調委托時的注意事項

消費者通過EasyNetQ訂閱后,每當它從RabbitMQ消息隊列接收到消息,消息就被放置在消費者的內存隊列中。

EasyNetQ會創建單個線程循環從內存隊列讀取消息,調用你之前注冊的委托方法(處理消息)。因為是在單個線程上,委托一次只能處理一個消息,所以你要避免長時間地同步IO操作。你應該盡快從委托返回控制。

使用異步訂閱 SubscribeAsync

SubscribeAsync方法允許你的訂閱者委托到一個異步方法,它能立即返回Task,然后異步地執行長時間IO操作。一旦長時間運行的訂閱完成后,就簡單的完成這個任務。

在下面的例子中,我們使用一個異步IO操作(即DownloadStringTask)請求一個web service。當這個task完成時,寫一行信息到控制台。

1 bus.SubscribeAsync<MyMessage>("subscribe_async_test", message => 
2        new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500"))
3            .ContinueWith(task => 
4                 Console.WriteLine("Received:'{0}',Downloaded:'{1}'",
5                     message.Text,
6                     task.Result)))
 
        

下一個列子是如果有錯誤發生,返回結果會有異常拋出,那么消息將會被放到一個默認的錯誤隊列中。

 1 _bus.SubscribeAsync<MessageType>("Queue_Identifier", 
 2          message => Task.Factory.StartNew(() => 
 3          {
 4              //這里執行一些操作
 5              //如果這里有一個異常,那么在這個Task執行完畢后,這個異常會作為結果返回,
 6              // 然后任務將繼續執行下去。
 7          }).ContinueWith(task => 
 8          {
 9              if ( task.IsCompleted && ! task.IsFaulted)
10              {
11                  // 一切工作正常時
12              }
13              else
14              {
15                  // 這里不要Catch 異常,否則異常會進一步被嵌套,results結果會被發送到RabbitMQ上默認的錯誤隊列
16                  throw new EasyNetQException("Message processing exception - look in t  the default error quenue(broker)");
17              }
18          }));

 

 
        

撤銷訂閱

所有的Subscribe訂閱方法都會返回一個ISubscriptionResult接口實例。它包含返回IExchange和IQueue的屬性(實際在底層是通過IConsumer實現的),如果需要還可以使用高級API IAdvancededBus進一步操作這些屬性。

你可以在任何時間撤銷一個訂閱者,通過調用ISubscriptionResult實例上的Dispose方法,或者在它之上的 ConsumerCancellation屬性。

var subscriptionResult = bus.Subscribe<MyMessage>("sub_id", MyHandler);

...

subscriptionResult.Dispose(); // 這個等價與 subscriptionResult.ConsumerCancellation.Dispose();

這將停止EasyNetQ對隊列的消費,並且關閉這個消費者的channel。

注意:上面代碼不同於IBus和IAndvancedBus的dispose方法,后兩者會撤銷所有消費者,並關閉與RabbitMQ的連接。

不要在消息處理(委托方法)中調用 subscriptionResult.Dispose()。這將在EasyNetQ ack確認消息和subscriptionResult.Dispose()調用關閉Channel之間產生一個競爭。(即到底會先ack確認?還是先關閉信道?)因為在EasyNetQ的內部架構中,這兩件事是在不同的線程上被執行,所以在時間上存在不確定性。

英文地址:https://github.com/EasyNetQ/EasyNetQ/wiki/Subscribe


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM