在第一個教程里面,我們寫了一個程序從一個有名字的隊列中發送和接收消息,在這里我們將要創建一個分發耗時任務給多個worker的任務隊列。
任務隊列核心思想就是避免執行一個資源密集型的任務,而程序要等待其執行完畢才能進行下一步的任務。相反地我們讓任務延遲執行,我們封裝一個task作為消息,並把它發送至隊列,在后台運行的工作進程將彈出的任務,並最終執行作業。當運行多個worker的時候,task將在他們之間共享。
准備
在前一節中我們發送一個包含“HelloWorld!”的消息,現在我們發送字符串代表一個復雜的任務,我們沒有一個真實的任務,比如格式化圖片大小等等,所以我們使用Thread.sleep()代表一個執行時間較長的任務,這里我們使用幾個點來代表任務的復雜度,每一個點代表任務執行一秒的時間,比如hello...就代表執行了3秒。
我們稍微改變一個上一節中的Send.java,允許從命令行發送任意的消息,程序將從我們的工作隊列中執行任務,所以命名為NewTask.java:
String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
以下是幫助從命令行參數獲取消息體的代碼:
private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
上一節中的Rece.java也需要少許改變:需要偽造一個根據點來執行多少秒的任務。它將處理傳送過來的消息,並且執行任務,命名為Worker.java:
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
}
};
channel.basicConsume(TASK_QUEUE_NAME, true, consumer);
模擬執行時間的任務:
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
編譯:
$ javac -cp rabbitmq-client.jar NewTask.java Worker.java
循環調度
使用任務隊列的優點之一就是很容並行化一個work,如果我們產生了工作積壓,我們可以很簡單的增加worker的數量,來解決問題。
首先,讓我們嘗試在同一時間運行兩個工人實例。他們都將在隊列中得到消息,但究竟如何?讓我們來看看。
您需要三個控制台打開。兩個將運行輔助程序。這些控制台將是我們的兩名消費者 - C1和C2。
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
在第三個,我們將發布新的任務。一旦你開始運行消費者就可以發布幾條消息:
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....
我們來看看它是怎樣將任務非配給我們的worker的:
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'
默認情況下,RabbitMQ將發送每一個在序列中的消息到下一個消費者,平均而言,每一個消費者將獲得相同數量的消息,發布這種消息的方式叫循環調度。
消息確認
做一個任務需要幾秒鍾,那么當一個消費者執行任務到一半的時候掛了怎么辦?在我們的當前代碼里面,一旦消息傳送給我們的消費者,消息就從存儲中刪除了。在這種情況下,如果Kill了一個worker,我們不僅僅失去了它正在執行的消息任務,而且我們將失去所有分配給它,但是還沒執行的消任務。
但是我們不想丟失任何消息,如果一個worker掛掉,我們將分配這些任務給其他的消費者。
為了確保消息不會丟失,RabbitMQ支持消息確認。一個ACK(nowledgement)從消費者發送給RabbitMQ一個消息確認當前消息已被接收和處理,RabbitMQ可自由將其刪除。
如果消費者死亡(其信道被關閉,關閉連接,或TCP連接丟失),而不發送ACK,RabbitMQ知道消息並沒有被接收和執行完全,將重新將它放入隊列。如果同一時間存在其他在線的消費者,它將迅速重新傳遞消息給另一個消費者。這樣,你可以肯定沒有消息丟失,即使偶爾的消費者死亡。
目前沒有任何消息超時,當消費者掛掉的時候,RabbitMQ將重新傳遞消息,即使處理一個消息需要很長很長的時間也沒關系。
消息確認默認情況下開啟。在前面的例子中,我們明確地通過AUTOACK = true標志將它們關閉。現在是時候刪除此標志,一旦我們與任務完成,將從worker發送適當的確認。
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
使用此代碼,我們可以肯定,即使你使用CTRL + C,殺死一個worker,什么都不會丟失。worker死亡后不久,所有未確認的消息會被重新傳遞。
被遺忘的確認
忘記baseACK是一個常見的錯誤,這是個簡單的錯誤,但是后果是很嚴重的。當你的客戶端退出的時候(可能看起來就像是隨機交還)消息將被重新傳遞,但RabbitMQ會消耗的越來越多的內存,它將無法釋放任何unacked的消息。
為了調試這種錯誤,你可以使用rabbitmqctl打印messages_unacknowledged字段。
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
消息持久化
我們已經學會了如何確保即使消費者死亡,任務也不會丟失。但是如果RabbitMQ的服務器停止,我們的任務仍然會丟失。
當RabbitMQ的退出或崩潰時,除非你告訴它不要忘記的隊列和消息。兩件事情都需要確保,消息才不會丟失:我們需要將隊列和消息持久化。
首先,我們需要確保的RabbitMQ永遠不會失去我們的隊列。為了做到這一點,我們需要把它聲明為持久:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
雖然以上的代碼本身是對的,但是它不會對我們的目前設置起作用。這是因為我們已經定義了一個名為hello的不持久的隊列,RabbitMQ不允許你使用不同的參數重新定義現有隊列,並會返回一個錯誤。但是有一個快速的解決
辦法 - 讓我們與聲明不同名稱的隊列,例如task_queue:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
queueDeclare變化需要被施加到生產者和消費者代碼兩者。
在這一點上我們確保即使RabbitMQ的重啟task_queue隊列也不會被丟失。現在,我們需要我們的消息標記為持久性 - 通過設置MessageProperties(實現BasicProperties)的值PERSISTENT_TEXT_PLAIN。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
注意消息持久性
將消息標記為持久性並不能完全保證信息不會丟失。雖然RabbitMQ消息將保存到磁盤,但是有很短的時間內RabbitMQ已經接受了消息,但是還沒來得及保存它。此外,RabbitMQ沒有為每條消息做FSYNC(2) - 它可能只是保存到緩存,並沒有真正寫入磁盤。持久性的保證不強,但是對於我們簡單的任務隊列還是綽綽有余的。如果你需要一個更強有力的保證,那么你可以使用publisher confirms。
公平調度
你可能已經注意到,調度仍然沒有完全按照我們真正想要的工作。舉個例子,比如有兩個消費者的情況,當奇數的消息非常重,但是偶數的消息非常輕的時候,一個消費者將被累死,而另一個卻閑着。RabbitMQ卻不知道,仍然在均勻的給每個消費者發送消息。
這種情況發生是因為RabbitMQ只負責分發進入到隊列的消息,它不看為消費者未確認的消息的數量。它只是盲目分派每第n個消息給第n消費者。
為了杜絕那種情況,我們可以使用basicQos方法與prefetchCount = 1設置。它告訴RabbitMQ不要把多個消息在同一時間給一個消費者。或者,換句話說,只有消費者處理並且確認前一個消息之后才會給它分配下一個消息,相反,消息將被非配給下一個不處於忙碌的消費者。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意隊列大小
如果所有的worker都在忙,你的隊列也填滿了。您將要留意的是,也許添加更多的worker,或者有一些其他的策略。
代碼整合
NewTask.java
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = getMessage(argv);
channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}
Worker.java
import com.rabbitmq.client.*;
import java.io.IOException;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
原文地址:RabbitMQ之Work Queues
代碼地址:https://github.com/aheizi/hi-mq
相關:
1.RabbitMQ之HelloWorld
2.RabbitMQ之任務隊列
3.RabbitMQ之發布訂閱
4.RabbitMQ之路由(Routing)
5.RabbitMQ之主題(Topic)
6.RabbitMQ之遠程過程調用(RPC)