EasyNetQ異常處理


代碼下載

https://download.csdn.net/download/u010312811/11252093

官方Demo

https://github.com/EasyNetQ/EasyNetQ/issues/793

 

1.新建項目

創建一個控制台程序,並添加對 EasyNetQ的引用

 

2.創建消息模型

2.1創建Answer

創建Answer數據模型

1     public class Answer 2  { 3         public string Text { get; } 4 
5         public Answer(string text) 6  { 7             Text = text; 8  } 9     }

2.2創建Question

1     public class Question 2  { 3         public string Text { get; } 4 
5         public Question(string text) 6  { 7             Text = text; 8  } 9     }

 

3.測試程序

3.1初始化代碼

 1    private static IBus bus;  2    private const string ErrorQueue = "EasyNetQ_Default_Error_Queue";  3 
 4    static void Main(string[] args)  5  {  6         bus = RabbitHutch.CreateBus("host=localhost");  7         /*訂閱消息*/
 8  Subscribe();  9 
10         /*處理錯誤隊列中的錯誤數據*/
11  HandleErrors(); 12 
13         /*發布消息*/
14         Console.WriteLine("輸入文字,按回車發送消息!"); 15         while (true) 16  { 17             var msg = Console.ReadLine(); 18             bus.Publish(new Question(msg)); 19  } 20     }

創建一個總線,用於消息的收發;

依次注冊消息的訂閱方法,錯誤處理方法,消息發布方法。

 

3.2消息訂閱

 1     private static void Subscribe()  2  {  3         /*聲明兩個消費者*/
 4         bus.SubscribeAsync<Question>("subscriptionId", x => HandleMessageAsync(x).Invoke(1));  5         bus.SubscribeAsync<Question>("subscriptionId", x => HandleMessageAsync(x).Invoke(2));  6  }  7 
 8     private static Func<int,Task> HandleMessageAsync(Question question)  9  { 10         return async (id) =>
11  { 12             if (new Random().Next(0, 2) == 0) 13  { 14                 Console.WriteLine("Exception Happened!!!!"); 15                 throw new Exception("Error Hanppened!"); 16  } 17             else
18  { 19                 Console.WriteLine(string.Format("worker:{0},content:{1}", id, question.Text)); 20  } 21  }; 22     }

訂閱方法中聲明了兩個消息的訂閱者(因為 subscriptionId相同,所以消息會采取輪詢的方法,依次發送到每個消息的消費者)。

消息處理中產生隨機數,進而有33%的機會產生異常

3.3消息發布

1     var msg = Console.ReadLine();
2     bus.Publish(new Question(msg));

發布程序很簡單,讀取輸入的內容,直接使用EasyNetQ提供的發布方法即可。

3.4異常處理

 

 1     private static void HandleErrors()  2  {  3         Action<IMessage<Error>, MessageReceivedInfo> handleErrorMessage = HandleErrorMessage;  4 
 5         IQueue queue = new Queue(ErrorQueue, false);  6  bus.Advanced.Consume(queue, handleErrorMessage);  7  }  8 
 9     private static void HandleErrorMessage(IMessage<Error> msg, MessageReceivedInfo info) 10  { 11         Console.WriteLine("catch: " + msg.Body.Message); 12     }

 

異常處理程序訂閱了隊列“EasyNetQ_Default_Error_Queue”,當異常發生時,EasyNetQ默認的處理是將隊列數據寫入當前的錯誤隊列中。

 

 

4.功能測試

 當程序異常時,打印異常

 

5.異常重試

實際項目中,當程序發生異常后,我們期望的處理可能是將消息返回到原有隊列,進行再次的數據處理。

修改bus創建的聲明:

bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IConsumerErrorStrategy>(_ => new AlwaysRequeueErrorStrategy()));
AlwaysRequeueErrorStrategy是我們默認的錯誤處理方法
 1     public sealed class AlwaysRequeueErrorStrategy : IConsumerErrorStrategy  2  {  3         public void Dispose()  4  {  5  }  6 
 7         public AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)  8  {  9             return AckStrategies.NackWithRequeue; 10  } 11 
12         public AckStrategy HandleConsumerCancelled(ConsumerExecutionContext context) 13  { 14             return AckStrategies.NackWithRequeue; 15  } 16     }
 
        

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM