RabbitMQ學習總結 第三篇:工作隊列Work Queue


目錄

RabbitMQ學習總結 第一篇:理論篇
RabbitMQ學習總結 第二篇:快速入門HelloWorld

RabbitMQ學習總結 第三篇:工作隊列Work Queue

RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe

RabbitMQ學習總結 第五篇:路由Routing

RabbitMQ學習總結 第六篇:Topic類型的exchange

RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)

 

在上篇中我們實現了程序來從一個已經命名的隊列里發送和接收消息。本篇博文中我們將要創建工作隊列用來把一些比較耗時的任務分配給多個worker。

工作隊列的主要思想就是避開立刻處理某個資源消耗交大的任務並且需要等待它執行完成。取而代之的是我們可以將它加入計划列表,並在后邊執行這些任務。我們將任務分裝成一個消息,並發送到隊列中。后台的工作程序在接收到消息后將會立刻執行任務。當運行多個執行器時,任務將會在他們之間共享。

這個概念在web應用程序中是比較實用的,對於一些在一個短的http請求里無法完成的復雜任務。

1、准備

上篇博文中是發送一個包含”Hello World“的消息。現在我們來發送一條代表復雜任務的字符串。我們這里沒有一個真實存在的任務,例如修改圖片大小和渲染pdf文件這類的任務,這里我們模擬一個任務繁忙的場景(使用Thread.sleep()函數)。這里我們使用字符串類的點號個數來代表任務的復雜性,每一個點號都占用一秒鍾的處理時間。例如,一個用”Hello…”來描述的偽造的任務將會占用三秒時間。

我們稍微修改一下上篇博文中的Send.java代碼,可以從客戶端發送任意消息。這個程序將會指定任務到我們的工作列表中,命名為NewTask.java:

發送消息部分如下:

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

從運行參數中拿到消息類容:

    private static String getMessage(String[] strings){
        if (strings.length < 1)
            return "Hello World!";
        return joinStrings(strings, " ");
    }

    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }

舊的接收端也要做稍微的修改:消息體里的一個逗號代表一個一秒鍾的任務,接收端會接收到消息,然后執行任務。這里重新命名為Work.java:

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());

    System.out.println(" [x] Received '" + message + "'");        
    doWork(message);
    System.out.println(" [x] Done");
}

然后模擬執行任務消耗時間:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

2、輪詢調度

任務隊列的一個較大的優勢就是能夠很方便的安排工作。如果后台隊列里正在積壓一些工作一直沒有被執行的話,通過添加更多的worker就可以解決了。

首先,讓我們來同時運行兩個worker實例(C1和C2),他們將會同時從隊列里拿到消息,具體的詳情見下:

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C

然后發布任務(運行發送端):

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

然后查看我們的worker執行了什么任務:

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar 
Worker
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

默認情況下,RabbitMQ會把每個消息以此輪詢發到各個消費者那,把消息平均的發到各個消費者那。這種分配管理的方式叫輪詢,還可以測試多個worker的情形。

3、消息應答機制

完成一個任務需要花費幾秒鍾。你一定很好奇,如果某個消費者開始執行某個任務花費了很長的時間並且在執行到某個部分時崩潰了那會怎么樣。在我們現在的代碼中,在向消費者推送某條消息后,RabbitMQ會立刻刪除掉這條消息。這樣的話,如果我們kill掉某個worker的話,那么我們將會流失掉該worker正在處理任務的消息(改任務未處理完成),我們也會丟失所有被發送到這個消費者且未處理完成的消息。

但是,我們不想丟失這部分消息,我們希望這類消息可以再次被發送到其它worker那。

為了保證永遠不會丟失消息,RabbitMQ支持消息應答機制。當消費者接收到消息並完成任務后會往RabbitMQ服務器發送一條確認的命令,然后RabbitMQ才會將消息刪除。

如果某個消費者在還有發送確認信息就掛了,RabbitMQ將會視為服務沒有執行完成,然后把執行消息的服務再發給另外一個消費者。這種方式下,即時某個worker掛了,也不會使得消息丟失。

這里不是用超時來判斷的,只有在某個消費者連接斷開時,RabbitMQ才會把重新發送該消費者沒有返回確認的消息到其它消費者那。即時處理某條任務花費了很長的時間,在這里也是沒有問題的。

消息應答機制默認是打開的,在上邊例子中我們明確的關閉了它(autoAck=true),那么現在應該如下修改程序:

QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);

while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  //...      
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

這樣就可以保證即時你kill掉了worker也不會出現信息丟失的現象,worker被kill掉之后,所有的未確認消息將會被重新發送。

易錯點:

很多人都會忘記調用basicAck方法,雖然這是一個很簡單的錯誤,但往往卻是致命。消費者退出后消息將會被重發,但是由於一些未能被確認消息不能被釋放,RabbitMQ將會消耗掉越來越多的內存。

為了能夠調試這種錯誤,你可以使用rabbitmqctl來打印出messages_unacknowledged字段。

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

4、消息的持久化

我們已經學習了在發生消費者掛掉或是任務被kill掉時的容錯機制,下邊將來看看當RabbitMQ服務被停止后,怎么保證消息不丟失。

當RabbitMQ退出或是宕機時會丟失隊列和消息,當然有兩個地方需要注意才能解決這類問題的發生:將隊列和消息都持久化存儲

首先,我們要確保RabbitMQ永遠不會丟失消息隊列,那就需要聲明它為持久化存儲:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

雖然這里的操作是正確的,但在這里依然不會生效,因為命名為“hello”的隊列在之前已經被創建(非持久化),現在已經存在了。RabbitMQ不允許你重新定義一個已經存在的消息隊列,如果你嘗試着去修改它的某些屬性的話,那么你的程序將會報錯。所以,這里你需要更換一個消息隊列名稱:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

生產者和消費者都需要使用queueDeclare方法來指定持久化屬性。

現在我們可以確保即使RabbitMQ重啟了,任務隊列也不會丟失。下邊我就來實現消息持久化(通過設置屬性MessageProperties. PERSISTENT_TEXT_PLAIN,其中MessageProperties實現了BasicProperties接口)。

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue", 
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

標記消息持久化並不能百分百的保證消息一定不會被丟失,雖然RabbitMQ會把消息寫到磁盤上,但是從RabbitMQ接收到消息到寫到磁盤上,這個短時間的過程中發生的RabbitMQ重啟依然會使得為寫入到磁盤的消息被丟失。事實上是這樣的,RabbitMQ接收到消息后,首先會把該消息寫到內存緩沖區中,並不是直接把單條消息實時寫到磁盤上的。消息的持久化不是健壯的,但是對於簡單的任務隊列是夠用了。如果你需要一套很健壯的持久化方案,那么你可以使用publisher confirms(稍后會更新詳細的使用方法)。

5、公平的任務分發策略

你可能會注意到有的時候RabbitMQ不能像你預想中的那樣分發消息。例如有兩個worker,第奇數個消息對應的任務都很耗時,第偶數個消息對應的任務都很快就能執行完。這樣的話其中有個worker就會一直都很繁忙,另外一個worker幾乎不做任務。RabbitMQ不會去對這種現象做任何處理,依然均勻的去推送消息。

這是因為RabbitMQ在消息被生產者推送過來后就被推送到消費者端,它不會去查看未接收到消費者確認的消息數量。它只會把N個消息均與的分發到N個消費者那。

為了能解決這個問題,我們可以使用basicQos放來來設置消費者最多會同時接收多少個消息。這里設置為1,表示RabbitMQ同一時間發給消費者的消息不超過一條。這樣就能保證消費者在處理完某個任務,並發送確認信息后,RabbitMQ才會向它推送新的消息,在此之間若是有新的消息話,將會被推送到其它消費者,若所有的消費者都在處理任務,那么就會等待。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意消息隊列的大小:

如果所有的worker都處於較忙的狀態下,你的消息隊列有可能會太長(出現內存或磁盤瓶頸)。需要盡量多的關注這些信息,出現的時候可以適當的添加worker。

6、代碼的最后實現

發送端:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) 
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    //指定隊列持久化
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    //指定消息持久化
    channel.basicPublish( "", TASK_QUEUE_NAME, 
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}

接收端:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    //指定隊列持久化
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
    //指定該消費者同時只接收一條消息
    channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

//打開消息應答機制
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());

      System.out.println(" [x] Received '" + message + "'");   
      doWork(message); 
      System.out.println(" [x] Done" );
    
      //返回接收到消息的確認信息
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
  }
  //...
}

使用消息應答機制和prefetchCount可以實現一個工作隊列了。持久化的選項可以使任務即使隊列和消息即使在RabbitMQ重啟后,依然不會丟失。

關於Channel和MessageProperties的更多應用可以參考Java官方API文檔:

http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/

 

最后總結:

1、消費者端在信道上打開消息應答機制,並確保能返回接收消息的確認信息,這樣可以保證消費者發生故障也不會丟失消息。

2、服務器端和客戶端都要指定隊列的持久化和消息的持久化,這樣可以保證RabbitMQ重啟,隊列和消息也不會。

3、指定消費者接收的消息個數,避免出現消息均勻推送出現的資源不合理利用的問題。

 

參考鏈接:http://www.rabbitmq.com/tutorials/tutorial-two-java.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM