C#實現異步消息隊列


拿到新書《.net框架設計》,到手之后迅速讀了好多,雖然這本書不像很多教程一樣從頭到尾系統的講明一些知識,但是從項目實戰角度告訴我們如何使用我們的知識,從這本書中提煉了一篇,正好符合我前幾篇的“數據驅動框架”設計的問題;

消息隊列

消息隊列英語Message queue)是一種進程間通信或同一進程的不同線程間的通信方式,軟件貯列用來處理一系列的輸入,通常是來自使用者。消息隊列提供了異步通信協議,每一個貯列中的紀錄包含詳細說明的資料,包含發生的時間,輸入裝置的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不需要同時與消息隊列互交。消息會保存在隊列中,直到接收者取回它。

簡單的說隊列就是貯存了我們需要處理的Command但是並不是及時的拿到其處理結果;

實現

實際上,消息隊列常常保存在鏈表結構中。擁有權限的進程可以向消息隊列中寫入或讀取消息。

目前,有很多消息隊列有很多開源的實現,包括JBoss MessagingJORAMApache ActiveMQSun Open Message QueueApache Qpid和HTTPSQS。

優點,缺點

消息隊列本身是異步的,它允許接收者在消息發送很長時間后再取回消息,這和大多數通信協議是不同的。例如WWW中使用的HTTP協議是同步的,因為客戶端在發出請求后必須等待服務器回應。然而,很多情況下我們需要異步的通信協議。比如,一個進程通知另一個進程發生了一個事件,但不需要等待回應。但消息隊列的異步特點,也造成了一個缺點,就是接收者必須輪詢消息隊列,才能收到最近的消息。

信號相比,消息隊列能夠傳遞更多的信息。與管道相比,消息隊列提供了有格式的數據,這可以減少開發人員的工作量。但消息隊列仍然有大小限制。

讀取隊列消息

主要有兩種(1)服務端的推;(2)客戶端的拉;

拉:主要是客戶端定時輪詢拿走消息處理;

推:通過事件訂閱方式主動通知訂閱者進行處理;

消息的貯存

簡單的是通過內存鏈表實現貯存;也可以借助DB,比如Redis;還可以持久到本地文件中;

如何保證異步處理的一致性

盡管隊列主要目的是實現消息貯存,同時將調用與實現異步化。但是如果想達到處理消息一致性,好的方式是區別業務處理順序,比如操作主從DB,主負責寫,從負責讀,我們沒有機會在寫之后立馬從讀數據庫拿到你想要的結果;同時我們需要借助中間狀態,當多個中間狀態同時符合調用結果才到到業務時間被處理,否則將“異常消息”持久化,待下次操作;

上代碼

建立消息對立核心隊列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
     public  delegate  void  MessageQueueEventNotifyHandler(Message.BaseMessage message);
 
     public  class  MessageQueue:Queue<BaseMessage>
     {
         public  static  MessageQueue GlobalQueue =  new  MessageQueue();
 
         private  Timer timer =  new  Timer();
         public  MessageQueue() {
             this .timer.Interval = 5000;
             this .timer.Elapsed += Notify;
             this .timer.Enabled =  true ;
         }
         private  void  Notify( object  sender, ElapsedEventArgs e) {
             lock  ( this ) {
                 if  ( this .Count > 0) {
                     //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue());
                     var  message =  this .Dequeue();
                     this .messageNotifyEvent(message);
                 }
             }
         }
 
         private  MessageQueueEventNotifyHandler messageNotifyEvent;
         public  event  MessageQueueEventNotifyHandler MessageNotifyEvent {
             add {
                 this .messageNotifyEvent += value;
             }
 
             remove {
                 if  ( this .messageNotifyEvent !=  null ) {
                     this .messageNotifyEvent -= value;
                 }
             }
         }
     }
}

事件處理

1
2
3
4
5
6
7
8
9
10
11
12
13
public  const  string  OrderCodePrefix =  "P" ;
         public  void  Submit(Message.BaseMessage message)
         {
             Order order = message.Body  as  Order;
 
             if  (order.OrderCode.StartsWith(OrderCodePrefix))
             {
                 System.Console.WriteLine( "這個是個正確的以({0})開頭的訂單:{1}" , OrderCodePrefix,order.OrderCode);
             }
             else  {
                 System.Console.WriteLine( "這個是個錯誤的訂單,沒有以({0})開頭:{1}" ,OrderCodePrefix,order.OrderCode);
             }
         }

可依據具體業務進行個性化處理;

通過Proxy向隊列追加消息

1
2
3
4
5
6
7
public  class  OrderServiceProxy:IOrderService
     {
         public  void  Submit(Message.BaseMessage message)
         {
             MessageQueue.MessageQueue.GlobalQueue.Enqueue(message);
         }
     }

客戶端調用

1
2
3
4
5
6
7
8
9
10
11
12
13
OrderService orderService =  new  OrderService();
             MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit;
 
             var  orders =  new  List<Order>() {
                 new  Order(){OrderCode= "P001" },
                 new  Order(){OrderCode= "P002" },
                 new  Order(){OrderCode= "B003" }
             };
 
             OrderServiceProxy proxy =  new  OrderServiceProxy();
             orders.ForEach(order => proxy.Submit( new  Message.BaseMessage() { Body=order}));
 
             Console.ReadLine();

這樣就滿足了事件的綁定與觸發個性化處理,同時達到了消息異步化的目的,希望更細致的拓展用到后期的項目中。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM