消息隊列
RabbitMQ是一個消息隊列,它能夠接收和轉發消息。這個過程就像寄快遞一樣,把物件打包給快遞小哥,快遞小哥會負責把物件派送到正確的地址。
生產者和消費者
生產者就是用來生產消息(發送消息)的:
消費者就是用來消費消息(接收消息)的:
在生產者和消費者之間的就是消息隊列:
它相當於消息緩沖區,最多能存儲多少數據只受限於機器的內存和磁盤。多個生產者可以發送消息給同一個隊列,多個消費者也可以從同一個隊列接收消息。
Windows安裝RabbitMQ
參考mall商城學習教程的RabbitMQ部分內容:
http://www.macrozheng.com/#/architect/mall_arch_09?id=rabbitmq
原文中rabbitmq-server-3.7.14.exe
下載地址失效了,改從這里下載:
https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.14
安裝完成后,確認服務已開啟:
進入RabbitMQ安裝目錄下的sbin目錄:
在地址欄輸入cmd並回車啟動命令行,然后輸入以下命令啟動管理功能:
rabbitmq-plugins enable rabbitmq_management
RabbitMQ運行在本地機器上:
默認用戶名密碼為guest / guest:
Java客戶端
amqp-client-5.7.1.jar
是RabbitMQ官方提供的Java客戶端:
https://www.rabbitmq.com/tutorials/tutorial-one-java.html
既可以直接下載jar包,也可以在Maven中添加依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
生產消息
導包:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
創建類Send,定義隊列名為hello:
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
...
}
}
建立連接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
}
代碼中創建了一個Connection實例和一個Channel實例,它們都用try語句包裹了起來,這是因為Connection和Channel類都實現了java.io.Closeable
,try語句會自動關閉連接。
聲明消息隊列,並發送Hello World!
消息到隊列中:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
聲明消息隊列是個冪等操作,重復聲明不會重復創建隊列。
消息體是字節數組(byte array)。
Send.java
完整代碼:
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Send.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消費消息
消費消息的代碼跟生產消息的代碼類似,也需要導包,建立連接:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}
消費者也聲明了一個消息隊列,因為有可能消費者比生產者先啟動。這樣能確保消費消息時,有隊列存在。
消費者沒有用try語句,因為消費者一直在異步監聽消息,如果把連接關閉了,它就沒法消費了。
導包中有個DeliverCallback
,通過它就能消費消息:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
因為發送消息和接收消息都是異步的,所以它叫做,callback,回調。
Recv.java
完整代碼:
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
運行效果
運行Send.java
生產消息后,能看到RabbitMQ后台已經有1條消息:
和1個消息隊列:
並且發送完成后就斷開了連接。
運行Recv.java
消費消息后,能看到隊列中已經沒有消息了:
而消費者仍然保持着連接,持續監控新消息。如果把消費者停掉,連接就會斷開。
從消息隊列中能看到整個過程如下圖所示:
任務分發
任務分發是把多個任務扔進隊列,然后分發給多個worker來執行。之所以要用隊列來實現,是因為任務處理需要一定時長,如果一直等待會導致阻塞,而異步把任務排到隊列里,就能加快分發,在取出任務時,也能根據各個worker負載情況,均衡分配。尤其是在Web應用中,HTTP連接短暫就會斷開,異步處理就特別適用。排隊比蜂擁而至辦事效率更高。
實現任務分發,新建NewTask.java
發送消息:
String message = String.join(" ", argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
用message來模擬任務。
新建Worker.java
接收消息:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
doWrok()
模擬任務處理,用.
來表示時長,hello...
就代表要處理3秒。
然后在2個shell啟動2個worker:
# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
發送5條任務:
# shell 3
java -cp $CP NewTask First message.
# => [x] Sent 'First message.'
java -cp $CP NewTask Second message..
# => [x] Sent 'Second message..'
java -cp $CP NewTask Third message...
# => [x] Sent 'Third message...'
java -cp $CP NewTask Fourth message....
# => [x] Sent 'Fourth message....'
java -cp $CP NewTask Fifth message.....
# => [x] Sent 'Fifth message.....'
worker的處理情況如下:
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'
任務是循環調度的,worker1總是處理奇數序列的任務,worker2總是處理偶數序列的任務。
消息確認
RabbitMQ支持消息確認,consumer在接收到消息並處理后,會回傳一個ack給producer,告訴RabbitMQ這條消息已經接收成功了。它的好處是能防止worker掛掉而丟失消息,因為假如producer沒有收到消息確認,它會保留這條消息,重新發送給其他worker。消息確認過程默認有30秒的超時時間,超過30秒沒有收到消息確認,就會重試。
在代碼中有個默認設置:
boolean autoAck = true;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
autoAck表示自動確認,在消息發送出去以后,就自動確認了。這就起不到防止消息丟失的效果,所以通常會設置為:
boolean autoAck = false;
消息持久化
RabbitMQ重啟以后,所有的隊列和消息都會丟失,消息持久化能保留這些數據,在重啟后恢復所有的隊列和消息。
隊列持久到用到了durable參數:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
需要注意的是,修改隊列的參數必須重新命名新的隊列,因為RabbitMQ不支持對現有隊列的參數進行修改。
消息持久化用到了MessageProperties:
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
均衡調度
均衡調度是根據worker負載情況來合理分配任務。前面實現的任務分發是循環調度的,worker1總是處理奇數序列的任務,worker2總是處理偶數序列的任務。假如奇數序列的任務始終比偶數序列的任務繁忙,處理起來耗時長,那么就會導致worker1一直繁忙而worker2處於空閑。這顯然不是很合理。
這是因為RabbitMQ默認只是盲目的將第n個消息發給第n個consumer,而不會去管有多少個未確認的消息數量。
RabbitMQ提供了prefetchCount參數來實現均衡調度:
int prefetchCount = 1;
channel.basicQos(prefetchCount);
通過設置prefetchCount為1,RabbitMQ一次只會給一個worker分發一條消息,假如某個worker比較繁忙,那么只會等它處理完成回傳消息確認(Message acknowledgment)后,才會分發新消息給它。
完整代碼
NewTask.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = String.join(" ", argv);
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
Worker.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
發布訂閱
發布訂閱是發送一條消息給多個consumer的一對多模式。不同於任務分發的一個消息只發送給一個worker的一對一模式。
接下來將實現一個日志系統來說明發布訂閱的一對多模式,它由2個程序組成,第一個程序負責提交日志消息,第二個程序負責接收消息並打印出來。第二個程序有2個實例,第1個實例接收消息存儲到磁盤,同時第2個實例把日志打印到屏幕。
Exchange
回顧幾個概念:
-
producer:生產者,發送消息。
-
queue:隊列,消息緩存。
-
consumer:消費者,接收消息。
實際上producer並不會直接給queue發送消息,它並不知道消息會發給哪個queue。在producer和queue中間,有一個叫做Exchange的東西:
producer只會把消息發送給Exchage,而Exchange的作用就是定義消息轉發規則:
-
消息發給一個隊列?
-
消息發給多個隊列?
-
消息應該被忽略掉?
-
等等等
Exchange有幾種類型:direct, topic, headers 和fanout,這里討論最后一個fanout,它會把所有消息廣播給所有隊列。
創建命名為logs
的Exchange:
channel.exchangeDeclare("logs", "fanout");
發送消息到Exchange:
channel.basicPublish( "logs", "", null, message.getBytes());
還記得之前的代碼么:
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數為空字符串,這並不是說沒有Exchange,而是使用了RabbitMQ默認的Exchange。
有Exchange的消息隊列,才是真正的消息隊列。
臨時隊列
臨時隊列主要用來臨時傳輸消息。在將要實現的日志系統中,因為是廣播所有消息,所以並不關心轉發給哪個具體的隊列,只要有個隊列就行。並且每次連接到隊列時,都希望隊列里面是空的,沒有老數據。在consumer斷開連接后,隊列也能自動刪除。
可以通過無參數的queueDeclare()
來實現臨時隊列:
String queueName = channel.queueDeclare().getQueue();
這樣能創建一個非持久化的、獨占的、自動刪除的隊列。它的隊列名是個隨機名字,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
綁定
Exchange需要跟隊列綁定:
channel.queueBind(queueName, "logs", "");
綁定后Exchange才知道把消息發給哪些隊列。
可以使用命令查看RabbitMQ存在哪些綁定:
rabbitmqctl list_bindings
完整代碼
EmitLog.java
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
producer並不需要定義隊列,它只把消息發給Exchange即可。
ReceiveLogs.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
運行效果
配置環境變量:
mac:
export CP=.:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar
windows:
set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
mac用$CP
,windows用%CP
。
編譯:
javac -cp $CP EmitLog.java ReceiveLogs.java
保存日志到本地文件:
java -cp $CP ReceiveLogs > logs_from_rabbit.log
打印日志到屏幕:
java -cp $CP ReceiveLogs
此時就建立了2個臨時隊列,通過命令可以查看Exchange和隊列的綁定:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
發送消息:
java -cp $CP EmitLog
就能看到2個consumer在同時消費消息了,一個會保存日志到本地文件,一個會打印日志到屏幕。
消息路由
消息路由是指Exchange把某些消息轉發到指定隊列。已經實現的日志系統是把所有消息都轉發給了所有隊列,接下來將實現只把error日志保存到本地文件,把info、warning和error都打印到屏幕。
binding key
在給Exchange和Queue綁定的時候,可以指定第三個參數:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
這個參數叫做binding key。binding key的意義是根據Exchange來決定的,比如fanout
類型的Exchange會忽略binding key而把所有消息轉發給所有隊列,而direct
類型的Exchange會查找binding key匹配的routing key,然后把消息轉發到匹配的隊列中去。以下圖示能說明這個匹配過程:
Exchange X
的type是direct
類型,它綁定了2個隊列Q1和Q2。隊列Q1有1個binding key orange
,隊列Q2有2個bingding key black
和green
。Exchange會把routing key為orange的消息轉發給Q1,而把routing key為black或green的消息轉發給Q2。其他消息則會被Exchange忽略。
1個Exchange能夠使用相同的binding key跟多個隊列進行綁定:
如圖所示,Exchange會把帶有routing key為black
的消息同時轉發給Q1和Q2。
代碼實現
創建Exchange:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
發送消息:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
severity是info
、warning
、error
三者其中之一。
綁定Exchange和隊列:
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
EmitLogDirect.java
完整代碼如下:
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/EmitLogDirect.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
//..
}
ReceiveLogsDirect.java
完整代碼如下:
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/ReceiveLogsDirect.java
import com.rabbitmq.client.*;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String severity : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
運行效果
編譯:
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
只把error日志保存到本地文件:
java -cp $CP ReceiveLogsDirect error > logs_from_rabbit.log
把info、warning、error日志都輸出到屏幕:
java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C
發送error消息:
java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
Topic
topic是一種帶有特殊含義的routing key。topic不是隨意命名的,它由一個或多個單詞構成,以.
點號分隔,比如stock.usd.nyse
、nyse.vmw
、quick.orange.rabbit
,且長度限制了255字節。在前面的日志系統中,已經實現了按照日志級別(info/warning/error)進行消息路由,但是比較單一。接下來將通過topic來實現既能按照日志級別,也能按照日志來源(auth/cron/kern)進行路由。
Topic Exchange
topic
類型的Exchange就是用來支持Topic的。它跟direct
類型的Exchange比較類似,只是對於routing key的命名有要求,並且支持兩種特殊字符:
-
*
代表1個單詞。 -
#
代表0個或多個單詞。
通過以下圖示可以直觀看到topic
類型的Exchange是如何路由的:
所有消息都是<speed>.<colour>.<species>
格式的,分別代表速度、顏色、物種。隊列Q1的binding key是*.orange.*
,隊列Q2的binding key是*.*.rabbit
和lazy.#
。
- 帶有
quick.orange.rabbit
和lazy.orange.elephant
的routing key的消息會轉發到Q1和Q2。 - 帶有
quick.orange.fox
routing key的消息只會轉發到Q1。 lazy.brown.fox
只會轉發到Q2。lazy.pink.rabbit
只會轉發到Q2一次,雖然它同時命中了2個binding key,但還是只會轉發1次。quick.brown.fox
會被Exchange忽視,不轉發給任何隊列。- 只傳一個單詞
orange
或者四個單詞quick.orange.male.rabbit
,會被Exchange忽視。 - 四個單詞
lazy.orange.male.rabbit
,只轉發給Q2。
topic
類型的Exchange是很靈活的,如果binding key設置為#
,那么它就相當於fanout
類型,轉發所有消息。如果binding key里面不包含*
或者#
,那么它就相當於direct
類型,轉發指定消息。
完整代碼
EmitLogTopic.java
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/EmitLogTopic.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
//..
}
ReceiveLogsTopic.java
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/ReceiveLogsTopic.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
topic的格式是<facility>.<severity>
。
運行效果
編譯:
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接收所有日志:
java -cp $CP ReceiveLogsTopic "#"
只接收來源於kern的日志:
java -cp $CP ReceiveLogsTopic "kern.*"
只接收critical嚴重級別的日志:
java -cp $CP ReceiveLogsTopic "*.critical"
也可以多重綁定:
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
發送日志消息:
java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
RPC
RPC是Remote Procedure Call的縮寫,遠程過程調用,比如在遠程機器中執行函數,並拿到返回結果。RabbitMQ能用來實現RPC服務。接下來就簡單實現一個調用生成斐波那契數列的RPC服務。
整體設計如圖所示:
Client發送RPC請求到rpc_queue里面,然后阻塞,等待返回。在請求中會指定一個回調隊列的地址,通過reply_to來指定。Server從rpc_queue讀取消息進行處理,根據reply_to把響應放到回調隊列中。請求中還設置了一個correlation_id,Client在收到響應時,根據這個關聯id來匹配,匹配上的消息才進行接收。
correlation_id匹配不上的消息會忽略,為什么是忽略而不是拋出異常呢?因為RPC Server有可能發返回ack確認前就宕機,當它重啟以后,會重新處理請求消息,從而導致消息重復處理。對於RPC來說最好是能夠讓重復場景是冪等的。
代碼實現
RPCServer.java
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCServer.java
import com.rabbitmq.client.*;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
//設置n個Server進程
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
RPCClient.java
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] argv) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
//在響應到達前掛起main阻塞等待,1表示只等待1個響應
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {
connection.close();
}
}
運行效果
編譯:
javac -cp $CP RPCClient.java RPCServer.java
啟動Server:
java -cp $CP RPCServer
# => [x] Awaiting RPC requests
運行Client:
java -cp $CP RPCClient
# => [x] Requesting fib(30)
如果Server處理慢,那么可以再在控制台啟動新的Server來消費。
注意,本文的示例都只是為了演示而編寫的,不是真正意義上的實現。
對於可靠的消息確認,RabbitMQ提供了一個擴展,感興趣的話可以閱讀:
https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
參考資料: