RabbitMQ消息隊列官方教程Java學習筆記


消息隊列

RabbitMQ是一個消息隊列,它能夠接收和轉發消息。這個過程就像寄快遞一樣,把物件打包給快遞小哥,快遞小哥會負責把物件派送到正確的地址。

生產者和消費者

生產者就是用來生產消息(發送消息)的:

img

消費者就是用來消費消息(接收消息)的:

img

在生產者和消費者之間的就是消息隊列

img

它相當於消息緩沖區,最多能存儲多少數據只受限於機器的內存和磁盤。多個生產者可以發送消息給同一個隊列,多個消費者也可以從同一個隊列接收消息。

img

Windows安裝RabbitMQ

參考mall商城學習教程的RabbitMQ部分內容:

http://www.macrozheng.com/#/architect/mall_arch_09?id=rabbitmq

原文中rabbitmq-server-3.7.14.exe下載地址失效了,改從這里下載:

https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.14

安裝完成后,確認服務已開啟:

image-20220326205428623

進入RabbitMQ安裝目錄下的sbin目錄:

image-20220326204901185

在地址欄輸入cmd並回車啟動命令行,然后輸入以下命令啟動管理功能:

rabbitmq-plugins enable rabbitmq_management

RabbitMQ運行在本地機器上:

http://localhost:15672/

image-20220326182614819

默認用戶名密碼為guest / guest:

image-20220326205726347

Java客戶端

amqp-client-5.7.1.jar是RabbitMQ官方提供的Java客戶端:

https://www.rabbitmq.com/tutorials/tutorial-one-java.html

image-20220326142538038

既可以直接下載jar包,也可以在Maven中添加依賴:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.2</version>
</dependency>

生產消息

sending

導包:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

創建類Send,定義隊列名為hello:

public class Send {
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception {
      ...
  }
}

建立連接:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

}

代碼中創建了一個Connection實例和一個Channel實例,它們都用try語句包裹了起來,這是因為Connection和Channel類都實現了java.io.Closeable,try語句會自動關閉連接。

聲明消息隊列,並發送Hello World!消息到隊列中:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

聲明消息隊列是個冪等操作,重復聲明不會重復創建隊列。

消息體是字節數組(byte array)。

Send.java完整代碼:

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Send.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消費消息

receiving

消費消息的代碼跟生產消息的代碼類似,也需要導包,建立連接:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;


public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}

消費者也聲明了一個消息隊列,因為有可能消費者比生產者先啟動。這樣能確保消費消息時,有隊列存在。

消費者沒有用try語句,因為消費者一直在異步監聽消息,如果把連接關閉了,它就沒法消費了。

導包中有個DeliverCallback,通過它就能消費消息:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

因為發送消息和接收消息都是異步的,所以它叫做,callback,回調。

Recv.java完整代碼:

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

運行效果

運行Send.java生產消息后,能看到RabbitMQ后台已經有1條消息:

image-20220326210013199

和1個消息隊列:

image-20220326210233345

並且發送完成后就斷開了連接。

運行Recv.java消費消息后,能看到隊列中已經沒有消息了:

image-20220326210451253

而消費者仍然保持着連接,持續監控新消息。如果把消費者停掉,連接就會斷開。

從消息隊列中能看到整個過程如下圖所示:

image-20220326211204785

任務分發

任務分發是把多個任務扔進隊列,然后分發給多個worker來執行。之所以要用隊列來實現,是因為任務處理需要一定時長,如果一直等待會導致阻塞,而異步把任務排到隊列里,就能加快分發,在取出任務時,也能根據各個worker負載情況,均衡分配。尤其是在Web應用中,HTTP連接短暫就會斷開,異步處理就特別適用。排隊比蜂擁而至辦事效率更高。

實現任務分發,新建NewTask.java發送消息:

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

用message來模擬任務。

新建Worker.java接收消息:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

doWrok()模擬任務處理,用.來表示時長,hello...就代表要處理3秒。

然后在2個shell啟動2個worker:

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

發送5條任務:

# shell 3
java -cp $CP NewTask First message.
# => [x] Sent 'First message.'
java -cp $CP NewTask Second message..
# => [x] Sent 'Second message..'
java -cp $CP NewTask Third message...
# => [x] Sent 'Third message...'
java -cp $CP NewTask Fourth message....
# => [x] Sent 'Fourth message....'
java -cp $CP NewTask Fifth message.....
# => [x] Sent 'Fifth message.....'

worker的處理情況如下:

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

任務是循環調度的,worker1總是處理奇數序列的任務,worker2總是處理偶數序列的任務。

消息確認

RabbitMQ支持消息確認,consumer在接收到消息並處理后,會回傳一個ack給producer,告訴RabbitMQ這條消息已經接收成功了。它的好處是能防止worker掛掉而丟失消息,因為假如producer沒有收到消息確認,它會保留這條消息,重新發送給其他worker。消息確認過程默認有30秒的超時時間,超過30秒沒有收到消息確認,就會重試。

在代碼中有個默認設置:

boolean autoAck = true;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

autoAck表示自動確認,在消息發送出去以后,就自動確認了。這就起不到防止消息丟失的效果,所以通常會設置為:

boolean autoAck = false;

消息持久化

RabbitMQ重啟以后,所有的隊列和消息都會丟失,消息持久化能保留這些數據,在重啟后恢復所有的隊列和消息。

隊列持久到用到了durable參數:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

需要注意的是,修改隊列的參數必須重新命名新的隊列,因為RabbitMQ不支持對現有隊列的參數進行修改。

消息持久化用到了MessageProperties:

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

均衡調度

均衡調度是根據worker負載情況來合理分配任務。前面實現的任務分發是循環調度的,worker1總是處理奇數序列的任務,worker2總是處理偶數序列的任務。假如奇數序列的任務始終比偶數序列的任務繁忙,處理起來耗時長,那么就會導致worker1一直繁忙而worker2處於空閑。這顯然不是很合理。

這是因為RabbitMQ默認只是盲目的將第n個消息發給第n個consumer,而不會去管有多少個未確認的消息數量。

RabbitMQ提供了prefetchCount參數來實現均衡調度:

int prefetchCount = 1;
channel.basicQos(prefetchCount);

通過設置prefetchCount為1,RabbitMQ一次只會給一個worker分發一條消息,假如某個worker比較繁忙,那么只會等它處理完成回傳消息確認(Message acknowledgment)后,才會分發新消息給它。

完整代碼

NewTask.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = String.join(" ", argv);

        channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }
}

Worker.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

發布訂閱

發布訂閱是發送一條消息給多個consumer的一對多模式。不同於任務分發的一個消息只發送給一個worker的一對一模式。

接下來將實現一個日志系統來說明發布訂閱的一對多模式,它由2個程序組成,第一個程序負責提交日志消息,第二個程序負責接收消息並打印出來。第二個程序有2個實例,第1個實例接收消息存儲到磁盤,同時第2個實例把日志打印到屏幕。

Exchange

回顧幾個概念:

  • producer:生產者,發送消息。

  • queue:隊列,消息緩存。

  • consumer:消費者,接收消息。

實際上producer並不會直接給queue發送消息,它並不知道消息會發給哪個queue。在producer和queue中間,有一個叫做Exchange的東西:

img

producer只會把消息發送給Exchage,而Exchange的作用就是定義消息轉發規則

  • 消息發給一個隊列?

  • 消息發給多個隊列?

  • 消息應該被忽略掉?

  • 等等等

Exchange有幾種類型:direct, topic, headers 和fanout,這里討論最后一個fanout,它會把所有消息廣播給所有隊列。

創建命名為logs的Exchange:

channel.exchangeDeclare("logs", "fanout");

發送消息到Exchange:

channel.basicPublish( "logs", "", null, message.getBytes());

還記得之前的代碼么:

channel.basicPublish("", "hello", null, message.getBytes());

第一個參數為空字符串,這並不是說沒有Exchange,而是使用了RabbitMQ默認的Exchange。

有Exchange的消息隊列,才是真正的消息隊列。

臨時隊列

臨時隊列主要用來臨時傳輸消息。在將要實現的日志系統中,因為是廣播所有消息,所以並不關心轉發給哪個具體的隊列,只要有個隊列就行。並且每次連接到隊列時,都希望隊列里面是空的,沒有老數據。在consumer斷開連接后,隊列也能自動刪除。

可以通過無參數的queueDeclare()來實現臨時隊列:

String queueName = channel.queueDeclare().getQueue();

這樣能創建一個非持久化的、獨占的、自動刪除的隊列。它的隊列名是個隨機名字,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg

綁定

Exchange需要跟隊列綁定:

channel.queueBind(queueName, "logs", "");

綁定后Exchange才知道把消息發給哪些隊列。

可以使用命令查看RabbitMQ存在哪些綁定:

rabbitmqctl list_bindings

完整代碼

img

EmitLog.java

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = argv.length < 1 ? "info: Hello World!" :
                            String.join(" ", argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }
}

producer並不需要定義隊列,它只把消息發給Exchange即可。

ReceiveLogs.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

運行效果

配置環境變量:

mac:

export CP=.:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar

windows:

set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar

mac用$CP,windows用%CP

編譯:

javac -cp $CP EmitLog.java ReceiveLogs.java

保存日志到本地文件:

java -cp $CP ReceiveLogs > logs_from_rabbit.log

打印日志到屏幕:

java -cp $CP ReceiveLogs

此時就建立了2個臨時隊列,通過命令可以查看Exchange和隊列的綁定:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

發送消息:

java -cp $CP EmitLog

就能看到2個consumer在同時消費消息了,一個會保存日志到本地文件,一個會打印日志到屏幕。

消息路由

消息路由是指Exchange把某些消息轉發到指定隊列。已經實現的日志系統是把所有消息都轉發給了所有隊列,接下來將實現只把error日志保存到本地文件,把info、warning和error都打印到屏幕。

binding key

在給Exchange和Queue綁定的時候,可以指定第三個參數:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

這個參數叫做binding key。binding key的意義是根據Exchange來決定的,比如fanout類型的Exchange會忽略binding key而把所有消息轉發給所有隊列,而direct類型的Exchange會查找binding key匹配的routing key,然后把消息轉發到匹配的隊列中去。以下圖示能說明這個匹配過程:

img

Exchange X的type是direct類型,它綁定了2個隊列Q1和Q2。隊列Q1有1個binding key orange,隊列Q2有2個bingding key blackgreen。Exchange會把routing key為orange的消息轉發給Q1,而把routing key為black或green的消息轉發給Q2。其他消息則會被Exchange忽略。

1個Exchange能夠使用相同的binding key跟多個隊列進行綁定:

img

如圖所示,Exchange會把帶有routing key為black的消息同時轉發給Q1和Q2。

代碼實現

創建Exchange:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

發送消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

severity是infowarningerror三者其中之一。

綁定Exchange和隊列:

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

img

EmitLogDirect.java完整代碼如下:

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/EmitLogDirect.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    }
  }
  //..
}

ReceiveLogsDirect.java完整代碼如下:

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/ReceiveLogsDirect.java

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
        System.exit(1);
    }

    for (String severity : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

運行效果

編譯:

javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java

只把error日志保存到本地文件:

java -cp $CP ReceiveLogsDirect error > logs_from_rabbit.log

把info、warning、error日志都輸出到屏幕:

java -cp $CP ReceiveLogsDirect info warning error
# => [*] Waiting for logs. To exit press CTRL+C

發送error消息:

java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

Topic

topic是一種帶有特殊含義的routing key。topic不是隨意命名的,它由一個或多個單詞構成,以.點號分隔,比如stock.usd.nysenyse.vmwquick.orange.rabbit,且長度限制了255字節。在前面的日志系統中,已經實現了按照日志級別(info/warning/error)進行消息路由,但是比較單一。接下來將通過topic來實現既能按照日志級別,也能按照日志來源(auth/cron/kern)進行路由。

Topic Exchange

topic類型的Exchange就是用來支持Topic的。它跟direct類型的Exchange比較類似,只是對於routing key的命名有要求,並且支持兩種特殊字符:

  • *代表1個單詞。

  • #代表0個或多個單詞。

通過以下圖示可以直觀看到topic類型的Exchange是如何路由的:

所有消息都是<speed>.<colour>.<species>格式的,分別代表速度、顏色、物種。隊列Q1的binding key是*.orange.*,隊列Q2的binding key是*.*.rabbitlazy.#

  • 帶有quick.orange.rabbitlazy.orange.elephant的routing key的消息會轉發到Q1和Q2。
  • 帶有quick.orange.fox routing key的消息只會轉發到Q1。
  • lazy.brown.fox 只會轉發到Q2。
  • lazy.pink.rabbit只會轉發到Q2一次,雖然它同時命中了2個binding key,但還是只會轉發1次。
  • quick.brown.fox會被Exchange忽視,不轉發給任何隊列。
  • 只傳一個單詞orange或者四個單詞quick.orange.male.rabbit會被Exchange忽視
  • 四個單詞lazy.orange.male.rabbit,只轉發給Q2。

topic類型的Exchange是很靈活的,如果binding key設置為#,那么它就相當於fanout類型,轉發所有消息。如果binding key里面不包含*或者#,那么它就相當於direct類型,轉發指定消息。

完整代碼

EmitLogTopic.java

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/EmitLogTopic.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    }
  }
  //..
}

ReceiveLogsTopic.java

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/ReceiveLogsTopic.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
        System.exit(1);
    }

    for (String bindingKey : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

topic的格式是<facility>.<severity>

運行效果

編譯:

javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java

接收所有日志:

java -cp $CP ReceiveLogsTopic "#"

只接收來源於kern的日志:

java -cp $CP ReceiveLogsTopic "kern.*"

只接收critical嚴重級別的日志:

java -cp $CP ReceiveLogsTopic "*.critical"

也可以多重綁定:

java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

發送日志消息:

java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

RPC

RPC是Remote Procedure Call的縮寫,遠程過程調用,比如在遠程機器中執行函數,並拿到返回結果。RabbitMQ能用來實現RPC服務。接下來就簡單實現一個調用生成斐波那契數列的RPC服務。

整體設計如圖所示:

img

Client發送RPC請求到rpc_queue里面,然后阻塞,等待返回。在請求中會指定一個回調隊列的地址,通過reply_to來指定。Server從rpc_queue讀取消息進行處理,根據reply_to把響應放到回調隊列中。請求中還設置了一個correlation_id,Client在收到響應時,根據這個關聯id來匹配,匹配上的消息才進行接收。

correlation_id匹配不上的消息會忽略,為什么是忽略而不是拋出異常呢?因為RPC Server有可能發返回ack確認前就宕機,當它重啟以后,會重新處理請求消息,從而導致消息重復處理。對於RPC來說最好是能夠讓重復場景是冪等的。

代碼實現

RPCServer.java

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCServer.java

import com.rabbitmq.client.*;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            //設置n個Server進程
            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // RabbitMq consumer worker thread notifies the RPC server owner thread
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

RPCClient.java

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) {
        try (RPCClient fibonacciRpc = new RPCClient()) {
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        //在響應到達前掛起main阻塞等待,1表示只等待1個響應
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}

運行效果

編譯:

javac -cp $CP RPCClient.java RPCServer.java

啟動Server:

java -cp $CP RPCServer
# => [x] Awaiting RPC requests

運行Client:

java -cp $CP RPCClient
# => [x] Requesting fib(30)

如果Server處理慢,那么可以再在控制台啟動新的Server來消費。

注意,本文的示例都只是為了演示而編寫的,不是真正意義上的實現。

對於可靠的消息確認,RabbitMQ提供了一個擴展,感興趣的話可以閱讀:

https://www.rabbitmq.com/tutorials/tutorial-seven-java.html

image-20220406215213650

參考資料:

https://www.rabbitmq.com/getstarted.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM