RabbitMQ消息隊列(三):任務分發機制[轉]


在上篇文章中,我們解決了從發送端(Producer)向接收端(Consumer)發送“Hello World”的問題。在實際的應用場景中,這是遠遠不夠的。從本篇文章開始,我們將結合更加實際的應用場景來講解更多的高級用法。

   當有Consumer需要大量的運算時,RabbitMQ Server需要一定的分發機制來balance每個Consumer的load。接下來我們分布講解。 

   應用場景就是RabbitMQ Server會將queue的Message分發給不同的Consumer以處理計算密集型的任務:

1. Message acknowledgment 消息確認

每個Consumer可能需要一段時間才能處理完收到的數據。如果在這個過程中,Consumer出錯了,異常退出了,而數據還沒有處理完成,那么 非常不幸,這段數據就丟失了。因為我們采用no-ack的方式進行確認,也就是說,每次Consumer接到數據后,而不管是否處理完 成,RabbitMQ Server會立即把這個Message標記為完成,然后從queue中刪除了。

     如果一個Consumer異常退出了,它處理的數據能夠被另外的Consumer處理,這樣數據在這種情況下就不會丟失了(注意是這種情況下)。

      為了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。為了保證數據能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack。而應該是在處理完數據后發送ack。

    在處理數據后發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ可以去安全的刪除它了。

    如果Consumer退出了但是沒有發送ack,那么RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數據也不會丟失。

    這里並沒有用到超時機制。RabbitMQ僅僅通過Consumer的連接中斷來確認該Message並沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做數據處理。

這樣即使你通過Ctr-C中斷了Recieve.cs,那么Message也不會丟失了,它會被分發到下一個Consumer。

      如果忘記了ack,那么后果很嚴重。當Consumer退出時,Message會重新分發。然后RabbitMQ會占用越來越多的內存,由於 RabbitMQ會長時間運行,因此這個“內存泄漏”是致命的。去調試這種錯誤,可以通過一下命令打印un-acked Messages.

2. Round-robin dispatching 循環分發

        RabbitMQ的分發機制非常適合擴展,而且它是專門為並發程序設計的。如果現在load加重,那么只需要創建更多的Consumer來進行任務處理即 可。當然了,對於負載還要加大怎么辦?我沒有遇到過這種情況,那就可以創建多個virtual Host,細化不同的通信類別了。

     1、首先開啟兩個Consumer,即運行兩個Recieve.cs。

     2、在開啟兩個Producer,即運行兩個Producer.cs。

默認情況下,RabbitMQ 會順序的分發每個Message。當每個收到ack后,會將該Message刪除,然后將下一個Message分發到下一個Consumer。這種分發方式叫做round-robin(優雅分發)。

Producer.cs

 1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
 6             using (IConnection connection = factory.CreateConnection())
 7             {
 8                 using (IModel channel = connection.CreateModel())
 9                 {
10                     channel.QueueDeclare("hello", false, false, false, null);
11                     var message = GetMessage(args);
12                     var body = Encoding.UTF8.GetBytes(message);
13                     
14                     var properties = channel.CreateBasicProperties();
15                     properties.DeliveryMode = 2;//non-persistent (1) or persistent (2) 
16                     //channel.TxSelect();
17                     channel.BasicPublish("", "hello", properties, body);
18                     //channel.TxCommit();
19                 }
20             }
21         }
22 
23         private static string GetMessage(string[] args)
24         {
25             return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
26         }
27     }

Consumer.cs

 1 //#define demo1
 2 #define demo2
 3 using RabbitMQ.Client;
 4 using RabbitMQ.Client.Events;
 5 using System;
 6 using System.Collections.Generic;
 7 using System.Linq;
 8 using System.Text;
 9 using System.Threading;
10 using System.Threading.Tasks;
11 
12 namespace ReceiveDemo2
13 {
14     /// <summary>
15     /// 一個Send和多個Receive的例子,
16     /// 還加上了ack的例子.
17     /// 優雅分發
18     /// </summary>
19     class Program
20     {
21         static void Main(string[] args)
22         {
23             var factory = new ConnectionFactory() { HostName = "localhost" };
24             using (var connection = factory.CreateConnection())
25             {
26                 using (var channel = connection.CreateModel())
27                 {
28                     channel.QueueDeclare("hello", false, false, false, null);
29                     var consumer = new QueueingBasicConsumer(channel);
30 #if demo1
31                     channel.BasicConsume("hello", true, consumer);//自動刪除消息
32 #else
33                     channel.BasicConsume("hello", false, consumer);//需要接受方發送ack回執,刪除消息
34 #endif
35                     Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
36                     while (true)
37                     {
38                         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//掛起的操作
39 #if demo2
40                         channel.BasicAck(ea.DeliveryTag, false);//與channel.BasicConsume("hello", false, null, consumer);對應
41 #endif
42                         var body = ea.Body;
43                         var message = Encoding.UTF8.GetString(body);
44                         Console.WriteLine(" [x] Received {0}", message);
45                         int dots = message.Split('.').Length - 1;
46                         Thread.Sleep(dots * 1000);
47                         Console.WriteLine(" [x] Done");
48 #if demo2
49                         //channel.BasicAck(ea.DeliveryTag, false);//與channel.BasicConsume("hello", false, null, consumer);對應,這句話寫道40行和49行運行結果就會不一樣.寫到這里會發生如果輸出[x] Received {0}之后,沒有輸出 [x] Done之前,CTRL+C結束程序,那么message會自動發給另外一個客戶端,當另外一個客戶端收到message后,執行完49行的命令之后,服務器會刪掉這個message
50 #endif
51                     }
52                 }
53             }
54         }
55     }
56 }

3. Message durability消息持久化

     在上一節中我們知道了即使Consumer異常退出,Message也不會丟失。但是如果RabbitMQ Server退出呢?軟件都有bug,即使RabbitMQ Server是完美毫無bug的(當然這是不可能的,是軟件就有bug,沒有bug的那不叫軟件),它還是有可能退出的:被其它軟件影響,或者系統重啟 了,系統panic了。。。

    為了保證在RabbitMQ退出或者crash了數據仍沒有丟失,需要將queue和Message都要持久化。queue的持久化需要在聲明時指定durable=True,修改Producer和Consumer的channel.QueueDeclare代碼,再次強調,Producer和Consumer都應該去創建這個queue,盡管只有一個地方的創建是真正起作用的:

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);

上述語句執行不會有什么錯誤,但是確得不到我們想要的結果,原因就是RabbitMQ Server已經維護了一個叫hello的queue,那么上述執行不會有任何的作用,也就是hello的任何屬性都不會被影響。這一點在上篇文章也討論過。

那么workaround也很簡單,聲明一個另外的名字的queue,比如名字定位task_hello,或者通過監控http://localhost:15672/,刪除名為“hello”的Queue。

接下來,還需要持久化Message,即在Producer.cs里面Publish的時候指定一個properties,方式如下:

 1 static void Main(string[] args)
 2         {
 3             var factory = new ConnectionFactory() { HostName = "localhost" };
 4             using (var connection = factory.CreateConnection())
 5             {
 6                 using (var channel = connection.CreateModel())
 7                 {
 8                     bool durable = true;
 9                     channel.QueueDeclare("task_queue", durable, false, false, null);//queue的持久化需要在聲明時指定durable=True
10                     var message = GetMessage(args);
11                     var body = Encoding.UTF8.GetBytes(message);
12                     var properties = channel.CreateBasicProperties();
13                     properties.SetPersistent(true);//需要持久化Message,即在Publish的時候指定一個properties,
14                     channel.BasicPublish("", "task_hello", properties, body);
15                 }
16             }
17         }

關於持久化的進一步討論:

    為了數據不丟失,我們采用了:

  1. 在數據處理結束后發送ack,這樣RabbitMQ Server會認為Message Deliver 成功。
  2. 持久化queue,可以防止RabbitMQ Server 重啟或者crash引起的數據丟失。
  3. 持久化Message,理由同上。

    但是這樣能保證數據100%不丟失嗎?

    答案是否定的。問題就在與RabbitMQ需要時間去把這些信息存到磁盤上,這個time window雖然短,但是它的確還是有。在這個時間窗口內如果數據沒有保存,數據還會丟失。還有另一個原因就是RabbitMQ並不是為每個Message都做fsync:它可能僅僅是把它保存到Cache里,還沒來得及保存到物理磁盤上。

    因此這個持久化還是有問題。但是對於大多數應用來說,這已經足夠了。當然為了保持一致性,你可以把每次的publish放到一個transaction中。這個transaction的實現需要user defined codes。

    那么商業系統會做什么呢?一種可能的方案是在系統panic時或者異常重啟時或者斷電時,應該給各個應用留出時間去flash cache,保證每個應用都能exit gracefully。

4. Fair dispatch 公平分發

    你可能也注意到了,分發機制不是那么優雅。默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。當然n是取余后的。它不管Consumer是否還有unacked Message,只是按照這個默認機制進行分發。

   那么如果有個Consumer工作比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻是毫無休息的機會。那么,RabbitMQ是如何處理這種問題呢?

  通過 BasicQos 方法設置prefetchCount = 1。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。 設置方法如下:

channel.BasicQos(0, 1, false);

注意,這種方法可能會導致queue滿。當然,這種情況下你可能需要添加更多的Consumer,或者創建更多的virtualHost來細化你的設計。

Consumer.cs

 1 static void Main(string[] args)
 2         {
 3             var factory = new ConnectionFactory() { HostName = "localhost" };
 4             using (var connection = factory.CreateConnection())
 5             {
 6                 using (var channel = connection.CreateModel())
 7                 {
 8                     bool durable = true;
 9                     channel.QueueDeclare("task_queue", durable, false, false, null);
10                     channel.BasicQos(0, 1, false);//這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。 
11                     var consumer = new QueueingBasicConsumer(channel);
12                     channel.BasicConsume("task_hello", false, null, consumer);//需要接受方發送ack回執,刪除消息
13                     Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
14                     while (true)
15                     {
16                         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//掛起的操作
17                         channel.BasicAck(ea.DeliveryTag, false);//與channel.BasicConsume("task_queue", false, null, consumer);對應
18                         var body = ea.Body;
19                         var message = Encoding.UTF8.GetString(body);
20                         Console.WriteLine(" [x] Received {0}", message);
21                         int dots = message.Split('.').Length - 1;
22                         Thread.Sleep(dots * 1000);
23                         Console.WriteLine(" [x] Done");
24                     }
25                 }
26             }
27         }

 轉:

http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html(官網)

http://blog.csdn.net/anzhsoft/article/details/19607841(翻譯)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM