扛不住的Hello World模式
上篇《RabbitMQ入門-從HelloWorld開始》介紹了RabbitMQ中最基本的Hello World模型。正如其名,Hello World模型組成簡單,也很好理解,我們也看到了一條消息時如何從一個生產者最終流向隊列並最終被消費者消費的過程。
但是,過於簡單、單調的模型設計也存在一些缺陷。假使現在隊列Queue中擠壓了很多的消息沒有被消費,Hello World模型中只有一個消費者,在消費消息時會顯得力不從心。如果遇上網絡狀況異常等情況,則消費速率就更加不同樂觀,從而影響了消息的處理效率,影響網站應用的性能。
很直觀的思路,我們能想到的是,一個人不行,那就多來幾個人,這時候就有了我們的Work模型。
多管齊下的Work模式
該模型具有以下特征
-
一個消息生產者P,一個消息存儲隊列Q,多個消息消費者C
-
Work模型能夠較好的解決資源密集型場景的問題,不需要像Hello World那樣孤注一擲的等唯一的消費者消費完
-
多個消費者,多管齊下,更加高效的並行處理消息
實例
如何構造一個資源密集型的場景
相較於Hello World,Work模式主要是在資源密集型的場景更能發揮威力,那么沒有工作環境或者很難遇到這樣的情況,我們怎么辦?
其實,這個場景的本質是為了體現一個消費者處理要很長時間的時候,這個模式是如何發揮作用的。那么,我們可以讓每個消費者處理的時間長點不就行了,要讓Consumer處理的時間長很簡單,只要調用Thread.sleep()即可。
發送端
對於發送端相對Hello World類型來說,沒有什么不同。這是我們隊這里發送的消息采用指定的格式比如“hello......”,在后面的發送端接收消息后,當遇到"."則停頓1秒或者2秒,所以程序如下
package com.ximalaya.openapi.rabbitmq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* Created by jackie on 17/8/4.
*/
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.161");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = getMessage(argv);
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
private static String getMessage(String[] strings) {
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
注意:這里getMessage方法,如果在運行的配置參數中添加了輸入參數,則使用輸入參數,如果沒有填寫,則使用默認值"Hello World"。
填寫輸入參數的方法是在如下圖位置寫上輸入參數
我們執行發送端代碼,向隊列"task_queue"中塞入4條消息
從這個動態圖片可以發現,通過發送端一次性發送了4條消息。
接收端
package com.ximalaya.openapi.rabbitmq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* Created by jackie on 17/8/4.
*/
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.161");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(2000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
注意:這里的doWork方法,該方法當遇到"."是就會睡眠2秒鍾,所以像"hello..."這樣的消息就會睡眠6秒。
下面分兩種情況來看接收端的處理信息的情況
一個消費者
如果此時只運行一個接收端的代碼,說明只啟動了一個Consumer,我們看看消息的消費過程
-
圖中Ready的消息依次從4->3->2->1->0,表示消息依次被派出消費
-
Uncknowledged表示沒有確認的,這里始終是1,因為消息時一個個發送的,等一個個發完了,最終變為0
-
Total表示總共剩余的消息個數,最終消費完變為0
兩個消費者
如果這時候啟動兩個客戶端,我們看下消息是如何被消費的
-
圖中的Ready從4->2->0,這是因為有兩個消費者,消息分別分發到兩個消費者上,一次派發兩個,分兩次派發完
-
Unacknowledged從0->2->0,過程為在一次發送兩條消息時,說明有兩條消息等待確認是否被消費掉
-
Total則與Ready變化趨勢一致
對比“一個消費者”和“兩個消費者”的消費情況,我們確實發現Work的消費處理效率要比Hello World高。
細心的你可能發現了,為什么在“兩個消費者”的情況下能夠做到如此公平的每個消費者分配兩個,有關這塊,限於篇幅,將在下篇詳細介紹。
如果您覺得閱讀本文對您有幫助,請點一下“推薦”按鈕,您的“推薦”將是我最大的寫作動力!如果您想持續關注我的文章,請掃描二維碼,關注JackieZheng的微信公眾號,我會將我的文章推送給您,並和您一起分享我日常閱讀過的優質文章。