AMQP發布消息默認情況下是非事務性的,也不能確保你的消息真正送達代理服務器(RabbitMQ)。盡管AMQP也可以指定事務性發布,但是在RabbitMQ上這個傳輸速度非常慢(所以RabbitMQ不推薦使用事務性發布,而是提出'Publish Confirms')。所以我們也沒有讓EasyNetQ API去支持事務性發布。為了高效且確保投遞成功,RabbitMQ推薦使用'Publish Confirms'方式。簡單來講這是RabbitMQ對AMQP的一個擴展,當你的消息被RabbitMQ成功接收以后,提供了一個回調支持。
到底怎么才算成功接收,成功接收意味着什么呢?
- 當一個瞬間消息進入消息隊列時,說明RabbitMQ確定已收到該發布消息。
- 當一個持久化消息被保存到磁盤,或者在每個隊列上被消費時,說明RabbitMQ確定已收到該發布的持久消息。
- 當一個瞬間消息被發現無法路由時,RabbitMQ直接確定已收到該發布消息。
更多關於消息確認方面的內容,請看the announcement on the RabbitMQ blog.
啟用消息確認,需要通過在連接字符串中設置 publisherConfirms=true.
bus = RabbitHutch.CreateBus("host=localhost;publisherConfirms=true;timeout=10");
同步的發布方法bus.Publish(..)在返回之前會等待消息確認。在超時未收到確認(超時時間同樣配置在連接字符串中)的,將產生異常拋出。在同步發布方法上啟用發布者確認將明顯變慢。如果有性能上的擔心,你應該考慮使用異步發布方法PublishAsync()。
1 bus.PublishAsync(new MyMessage 2 { 3 Text = "Hello World" 4 }).ContinueWith(task => 5 { 6 // 這里僅僅檢測task是否完成 7 // 如果task完成,即使task處於出錯狀態,IsCompleted屬性也是true 8 // 如果要檢測是否正確完成應該用 if (task.IsCompleted && !task.IsFaulted) 9 if (task.IsCompleted) 10 {//如果任務完成 11 //Console.Out.WriteLine("{0} Completed", count); 12 }
13 if (task.IsFaulted) 14 {//如果任務執行出錯 15 Console.Out.WriteLine("\n\n"); 16 Console.Out.WriteLine(task.Exception); 17 Console.Out.WriteLine("\n\n"); 18 } 19 });
發布者調用異步發布方法bus.PublishAsync(),在收到確認消息之前,方法就已經返回,接着執行發布之后的程序。如果沒有確認消息,或者收到一個NACK確認,這個task將會完成,並且task.State屬性是錯誤狀態。
英文地址:https://github.com/EasyNetQ/EasyNetQ/wiki/Publisher-Confirms
