RabbitMQ之實現RPC模式


1. 概述

本文使用RabbitMQ實現RPC的調用方式,主要包括如下內容:

  • 回調隊列(Callback queue)
  • RPC調用相關的消息參數:replyTo和correlationId
  • RPC調用的客戶端和服務端的demo代碼

2. 本文實現功能說明

本文使用RabbitMQ實現RPC的調用方式,我們需要使用新的隊列:回調隊列(Callback queue)

RPC需要涉及消息的兩個重要屬性:

  • replyTo: 存儲回調隊列的名稱
  • correlationId: 唯一標識本次的請求,主要用於RPC調用

這里寫圖片描述

我們實現如上圖的功能:

1. RPC客戶端啟動后,創建一個匿名、獨占的、回調的隊列
2. RPC客戶端設置消息的2個屬性:replyTo和correlationId,然后將消息發送到隊列rpc_queue
3. RPC服務端在隊列rpc_queue上等待消息。RPC服務端處理完收到消息后,然后將處理結果封裝成消息發送到replyTo指定的隊列上,並且此消息帶上correlationId(此值為收到消息里的correlationId)
4. RPC客戶端在隊列replyTo上等待消息,當收到消息后,它會判斷收到消息的correlationId。如果值和自己之前發送的一樣,則這個值就是RPC的處理結果

3. RPC客戶端: RpcClient代碼

主要業務邏輯如下:

1. 配置連接工廠
2. 建立TCP連接
3. 在TCP連接的基礎上創建通道
4. 定義臨時隊列replyQueueName,聲明唯一標志本次請求corrId,並將replyQueueName和corrId配置要發送的消息隊列中
5. 使用默認的交換機發送消息到隊列rpc_queue中
6. 使用阻塞隊列BlockingQueue阻塞當前進程
7. 收到請求后,將請求放入BlockingQueue中,主線程被喚醒,打印返回內容

下面只列出不同的地方:
第4,5步代碼如下

// 定義臨時隊列,並返回生成的隊列名稱
String replyQueueName = channel.queueDeclare().getQueue();

// 唯一標志本次請求
String corrId = UUID.randomUUID().toString();
// 生成發送消息的屬性
AMQP.BasicProperties props = new AMQP.BasicProperties
        .Builder()
        .correlationId(corrId) // 唯一標志本次請求
        .replyTo(replyQueueName) // 設置回調隊列
        .build();
// 發送消息,發送到默認交換機
channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));

第6,7步代碼如下:

// 阻塞隊列,用於存儲回調結果
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
// 定義消息的回退方法
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        if (properties.getCorrelationId().equals(corrId)) {
            response.offer(new String(body, "UTF-8"));
        }
    }
});
// 獲取回調的結果
String result = response.take();
System.out.println(" [RpcClient] Result:'" + result + "'");

完整代碼
RPC客戶端代碼:RpcClient.java

4. RPC服務端:RpcServer

主要業務邏輯如下:

1. 配置連接工廠
2. 建立TCP連接
3. 在TCP連接的基礎上創建通道
4. 聲明一個rpc_queue隊列
5. 設置同時最多只能獲取一個消息
6. 在rpc_queue隊列在等待消息
7. 收到消息后,調用回調對象對消息進行處理,向此消息的replyTo隊列中發送處理並帶上correlationId
8. 使用wait-notify實現主線程和消息處理回調對象進行同步

下面只列出不同的地方:
第7,8步代碼如下:

// 定義消息的回調處理類
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 生成返回的結果,關鍵是設置correlationId值
        AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                .Builder()
                .correlationId(properties.getCorrelationId())
                .build();
        // 生成返回
        String response = generateResponse(body);
        // 回復消息,通知已經收到請求
        channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
        // 對消息進行應答
        channel.basicAck(envelope.getDeliveryTag(), false);
        // 喚醒正在消費者所有的線程
        synchronized(this) {
            this.notify();
        }
    }
};
// 消費消息
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
// 在收到消息前,本線程進入等待狀態
while (true) {
    synchronized(consumer) {
        try {
            consumer.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
完整代碼
RPC服務端代碼: RpcServer.java

5. 測試:
BasicTest

@Test
public void rpc() throws InterruptedException {

    // rpc服務端
    executorService.submit(() -> {
        RpcServer.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd);
    });

    // rpc客戶端
    executorService.submit(() -> {
        RpcClient.execute(rabbitmq_host, rabbitmq_user, rabbitmq_pwd, "rpc test");
    });

    // sleep 10s
    Thread.sleep(10 * 1000);
}

rpc客戶端調用rpc服務端,並打印返回的結果

[RpcServer] Awaiting RPC requests
[RpcClient] Requesting : rpc test
[RpcServer] receive requests: rpc test
[RpcClient] Result:'response:rpc test-1516171567975

6. 代碼

上文的詳細代碼主要如下:
RPC服務端代碼: RpcServer.java
RPC客戶端代碼:RpcClient.java
測試代碼:BasicTest.java的方法 header()
所有的詳細代碼見github代碼,請盡量使用tag v0.12,不要使用master,因為master一直在變,不能保證文章中代碼和github上的代碼一直相同


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM