代碼下載
https://download.csdn.net/download/u010312811/11252093
官方Demo
https://github.com/EasyNetQ/EasyNetQ/issues/793
1.新建項目
創建一個控制台程序,並添加對 EasyNetQ的引用
2.創建消息模型
2.1創建Answer
創建Answer數據模型
1 public class Answer 2 { 3 public string Text { get; } 4
5 public Answer(string text) 6 { 7 Text = text; 8 } 9 }
2.2創建Question
1 public class Question 2 { 3 public string Text { get; } 4
5 public Question(string text) 6 { 7 Text = text; 8 } 9 }
3.測試程序
3.1初始化代碼
1 private static IBus bus; 2 private const string ErrorQueue = "EasyNetQ_Default_Error_Queue"; 3
4 static void Main(string[] args) 5 { 6 bus = RabbitHutch.CreateBus("host=localhost"); 7 /*訂閱消息*/
8 Subscribe(); 9
10 /*處理錯誤隊列中的錯誤數據*/
11 HandleErrors(); 12
13 /*發布消息*/
14 Console.WriteLine("輸入文字,按回車發送消息!"); 15 while (true) 16 { 17 var msg = Console.ReadLine(); 18 bus.Publish(new Question(msg)); 19 } 20 }
創建一個總線,用於消息的收發;
依次注冊消息的訂閱方法,錯誤處理方法,消息發布方法。
3.2消息訂閱
1 private static void Subscribe() 2 { 3 /*聲明兩個消費者*/
4 bus.SubscribeAsync<Question>("subscriptionId", x => HandleMessageAsync(x).Invoke(1)); 5 bus.SubscribeAsync<Question>("subscriptionId", x => HandleMessageAsync(x).Invoke(2)); 6 } 7
8 private static Func<int,Task> HandleMessageAsync(Question question) 9 { 10 return async (id) =>
11 { 12 if (new Random().Next(0, 2) == 0) 13 { 14 Console.WriteLine("Exception Happened!!!!"); 15 throw new Exception("Error Hanppened!"); 16 } 17 else
18 { 19 Console.WriteLine(string.Format("worker:{0},content:{1}", id, question.Text)); 20 } 21 }; 22 }
訂閱方法中聲明了兩個消息的訂閱者(因為 subscriptionId相同,所以消息會采取輪詢的方法,依次發送到每個消息的消費者)。
消息處理中產生隨機數,進而有33%的機會產生異常
3.3消息發布
1 var msg = Console.ReadLine(); 2 bus.Publish(new Question(msg));
發布程序很簡單,讀取輸入的內容,直接使用EasyNetQ提供的發布方法即可。
3.4異常處理
1 private static void HandleErrors() 2 { 3 Action<IMessage<Error>, MessageReceivedInfo> handleErrorMessage = HandleErrorMessage; 4
5 IQueue queue = new Queue(ErrorQueue, false); 6 bus.Advanced.Consume(queue, handleErrorMessage); 7 } 8
9 private static void HandleErrorMessage(IMessage<Error> msg, MessageReceivedInfo info) 10 { 11 Console.WriteLine("catch: " + msg.Body.Message); 12 }
異常處理程序訂閱了隊列“EasyNetQ_Default_Error_Queue”,當異常發生時,EasyNetQ默認的處理是將隊列數據寫入當前的錯誤隊列中。
4.功能測試
當程序異常時,打印異常
5.異常重試
實際項目中,當程序發生異常后,我們期望的處理可能是將消息返回到原有隊列,進行再次的數據處理。
修改bus創建的聲明:
bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IConsumerErrorStrategy>(_ => new AlwaysRequeueErrorStrategy()));
AlwaysRequeueErrorStrategy是我們默認的錯誤處理方法
1 public sealed class AlwaysRequeueErrorStrategy : IConsumerErrorStrategy 2 { 3 public void Dispose() 4 { 5 } 6
7 public AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception) 8 { 9 return AckStrategies.NackWithRequeue; 10 } 11
12 public AckStrategy HandleConsumerCancelled(ConsumerExecutionContext context) 13 { 14 return AckStrategies.NackWithRequeue; 15 } 16 }