RabbitMQ中的RPC實現


1、RPC簡述

       RPC,Remote Procedure Call 遠程過程調用。通俗講,兩段程序不在同一個內存空間,無法直接通過方法名調用,就需要通過網絡通信方式調用。對於RabbitMQ,本身就是用於消息通信。簡單的RabbitMQ是,生產端發送消息,經由交換器,到達隊列。消費端不需要知道生產端,消費端訂閱隊列,消費隊列中的消息。而對於RPC,希望消費端消費消息后,返回一個結果,結果經由網絡,再返回給生產端。

      不考慮RabbitMQ針對RPC的特有設計。最簡單的設計是,生產端和消費端共同約定消費隊列和回復隊列,同時生產端每次發送消息時指定一個唯一ID。生產端將消息和唯一ID發送給消費隊列,消費者從消費隊列獲取消息。處理后,將結果和生產端發送過來的唯一ID,發送給回復隊列。生產端從回復隊列獲取消息和ID,判斷ID是否匹配,匹配,則此消息為回復消息。

    以上實現的RPC存在問題:生產端和消費端需要約定回復隊列,這就要求生產端和消費端互相知道,這無法實現解耦。解決方案:生產端在發送消息時,也將回復隊列名稱隨消息一起發送給隊列。

2、舉例說明RabbitMQ中的RPC實現

相關要點,見源碼注釋

pom依賴

   <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.6.0</version>
        </dependency>
    </dependencies>

生產端,也就是客戶端代碼

 

 1 package test;
 2 
 3 import java.io.IOException;
 4 import java.util.UUID;
 5 
 6 import com.rabbitmq.client.AMQP;
 7 import com.rabbitmq.client.AMQP.BasicProperties;
 8 import com.rabbitmq.client.Channel;
 9 import com.rabbitmq.client.DefaultConsumer;
10 import com.rabbitmq.client.Envelope;
11 
12 import utils.ChannelUtils;
13 
14 public class RPCClient {
15 
16     public static void main(String[] args) throws IOException {
17         RPCClient rpcClient = new RPCClient();
18         rpcClient.client();
19     }
20 
21     public void client() throws IOException {
22         //此方法封裝了如何連接RabbitMQ和創建connection,channel.源碼見附錄
23         Channel channel = ChannelUtils.getChannelInstance("client");
24         channel.exchangeDelete("exchange_rpc");
25         channel.exchangeDeclare("exchange_rpc", "direct", false, false, null);
26 
27         channel.queueDelete("queue_rpc");
28         channel.queueDeclare("queue_rpc", false, false, false, null);
29 
30         channel.queueBind("queue_rpc", "exchange_rpc", "rpc");
31 
32         //此處注意:我們聲明了要回復的隊列。隊列名稱由RabbitMQ自動創建。
33         //這樣做的好處是:每個客戶端有屬於自己的唯一回復隊列,生命周期同客戶端
34         String replyQueue = channel.queueDeclare().getQueue();
35         final String corrID = UUID.randomUUID().toString();
36 
37         //這里我們設計三類消息。
38         //消息1:指定回復隊列和ID
39         //消息2:僅指定回復隊列
40         //消息3:不指定回復隊列和ID
41         AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
42         // 指定回復隊列和回復correlateId
43         builder.replyTo(replyQueue).correlationId(corrID);
44         AMQP.BasicProperties properties = builder.build();
45         for (int i = 0; i < 2; i++) {
46             channel.basicPublish("exchange_rpc", "rpc", properties,
47                     (System.currentTimeMillis() + "-rpc發送消息1").getBytes());
48         }
49 
50         AMQP.BasicProperties.Builder builder2 = new AMQP.BasicProperties.Builder();
51         // 指定回復隊列,未指定回復correlateId
52         builder2.replyTo(replyQueue);
53         AMQP.BasicProperties properties2 = builder2.build();
54         for (int i = 0; i < 2; i++) {
55             channel.basicPublish("exchange_rpc", "rpc", properties2,
56                     (System.currentTimeMillis() + "-rpc發送消息2").getBytes());
57         }
58 
59         for (int i = 0; i < 2; i++) {
60             // 未指定回復隊列和correlateId
61             channel.basicPublish("exchange_rpc", "rpc", null, (System.currentTimeMillis() + "-rpc發送消息3").getBytes());
62         }
63 
64         DefaultConsumer c = new DefaultConsumer(channel) {
65             //這是一個回調函數,客戶端獲取消息,就調用此方法,處理消息
66             @Override
67             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
68                     throws IOException {
69                 if (corrID.equals(properties.getCorrelationId())) {
70                     System.out.println("correlationID對應上的消息:" + new String(body));
71                 } else {
72                     System.out.println("correlationID未對應上的消息:" + new String(body));
73                 }
74             }
75         };
76         channel.basicConsume(replyQueue, true, c);
77     }
78 
79 }

 

消費端,也就是服務器端代碼

 1 package test;
 2 
 3 import java.io.IOException;
 4 
 5 import com.rabbitmq.client.AMQP;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.DefaultConsumer;
 8 import com.rabbitmq.client.Envelope;
 9 import com.rabbitmq.client.AMQP.BasicProperties;
10 
11 import utils.ChannelUtils;
12 
13 public class RPCServer {
14 
15     public static void main(String[] args) throws IOException {
16         RPCServer rpcServer = new RPCServer();
17         rpcServer.Server();
18     }
19 
20     public void Server() throws IOException {
21         final Channel channel = ChannelUtils.getChannelInstance("server");
22 
23         DefaultConsumer c = new DefaultConsumer(channel) {
24             
25             //這是一個回到函數,服務器端獲取到消息,就會調用此方法處理消息
26             @Override
27             public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
28                     throws IOException {
29                 System.out.println(new String(body));
30 
31                 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
32                 //我們在將要回復的消息屬性中,放入從客戶端傳遞過來的correlateId
33                 builder.correlationId(properties.getCorrelationId());
34                 AMQP.BasicProperties prop = builder.build();
35 
36                 //發送給回復隊列的消息,exchange="",routingKey=回復隊列名稱
37                 //因為RabbitMQ對於隊列,始終存在一個默認exchange="",routingKey=隊列名稱的綁定關系
38                 channel.basicPublish("", properties.getReplyTo(), prop, (new String(body) + "-回復").getBytes());
39 
40             }
41         };
42         channel.basicConsume("queue_rpc", true, c);
43     }
44 
45 }

 

 

附錄:ChannelUtils 的源碼

package utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ChannelUtils {

    // AMQP的連接其實是對Socket做的封裝, 注意以下AMQP協議的版本號,不同版本的協議用法可能不同。
    public static Channel getChannelInstance(String ConnectionDescription) {
        try {
            ConnectionFactory connectionFactory = getConnectionFactory();
            Connection connection = connectionFactory.newConnection(ConnectionDescription);

            return connection.createChannel();
        } catch (Exception e) {
            throw new RuntimeException("獲取Channel連接失敗");
        }

    }

    public static ConnectionFactory getConnectionFactory() {
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setHost("192.168.1.111");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("drs");
        connectionFactory.setPassword("123456");

        return connectionFactory;
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM