1、RPC簡述
RPC,Remote Procedure Call 遠程過程調用。通俗講,兩段程序不在同一個內存空間,無法直接通過方法名調用,就需要通過網絡通信方式調用。對於RabbitMQ,本身就是用於消息通信。簡單的RabbitMQ是,生產端發送消息,經由交換器,到達隊列。消費端不需要知道生產端,消費端訂閱隊列,消費隊列中的消息。而對於RPC,希望消費端消費消息后,返回一個結果,結果經由網絡,再返回給生產端。
不考慮RabbitMQ針對RPC的特有設計。最簡單的設計是,生產端和消費端共同約定消費隊列和回復隊列,同時生產端每次發送消息時指定一個唯一ID。生產端將消息和唯一ID發送給消費隊列,消費者從消費隊列獲取消息。處理后,將結果和生產端發送過來的唯一ID,發送給回復隊列。生產端從回復隊列獲取消息和ID,判斷ID是否匹配,匹配,則此消息為回復消息。
以上實現的RPC存在問題:生產端和消費端需要約定回復隊列,這就要求生產端和消費端互相知道,這無法實現解耦。解決方案:生產端在發送消息時,也將回復隊列名稱隨消息一起發送給隊列。
2、舉例說明RabbitMQ中的RPC實現
相關要點,見源碼注釋
pom依賴
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.6.0</version> </dependency> </dependencies>
生產端,也就是客戶端代碼
1 package test; 2 3 import java.io.IOException; 4 import java.util.UUID; 5 6 import com.rabbitmq.client.AMQP; 7 import com.rabbitmq.client.AMQP.BasicProperties; 8 import com.rabbitmq.client.Channel; 9 import com.rabbitmq.client.DefaultConsumer; 10 import com.rabbitmq.client.Envelope; 11 12 import utils.ChannelUtils; 13 14 public class RPCClient { 15 16 public static void main(String[] args) throws IOException { 17 RPCClient rpcClient = new RPCClient(); 18 rpcClient.client(); 19 } 20 21 public void client() throws IOException { 22 //此方法封裝了如何連接RabbitMQ和創建connection,channel.源碼見附錄 23 Channel channel = ChannelUtils.getChannelInstance("client"); 24 channel.exchangeDelete("exchange_rpc"); 25 channel.exchangeDeclare("exchange_rpc", "direct", false, false, null); 26 27 channel.queueDelete("queue_rpc"); 28 channel.queueDeclare("queue_rpc", false, false, false, null); 29 30 channel.queueBind("queue_rpc", "exchange_rpc", "rpc"); 31 32 //此處注意:我們聲明了要回復的隊列。隊列名稱由RabbitMQ自動創建。 33 //這樣做的好處是:每個客戶端有屬於自己的唯一回復隊列,生命周期同客戶端 34 String replyQueue = channel.queueDeclare().getQueue(); 35 final String corrID = UUID.randomUUID().toString(); 36 37 //這里我們設計三類消息。 38 //消息1:指定回復隊列和ID 39 //消息2:僅指定回復隊列 40 //消息3:不指定回復隊列和ID 41 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); 42 // 指定回復隊列和回復correlateId 43 builder.replyTo(replyQueue).correlationId(corrID); 44 AMQP.BasicProperties properties = builder.build(); 45 for (int i = 0; i < 2; i++) { 46 channel.basicPublish("exchange_rpc", "rpc", properties, 47 (System.currentTimeMillis() + "-rpc發送消息1").getBytes()); 48 } 49 50 AMQP.BasicProperties.Builder builder2 = new AMQP.BasicProperties.Builder(); 51 // 指定回復隊列,未指定回復correlateId 52 builder2.replyTo(replyQueue); 53 AMQP.BasicProperties properties2 = builder2.build(); 54 for (int i = 0; i < 2; i++) { 55 channel.basicPublish("exchange_rpc", "rpc", properties2, 56 (System.currentTimeMillis() + "-rpc發送消息2").getBytes()); 57 } 58 59 for (int i = 0; i < 2; i++) { 60 // 未指定回復隊列和correlateId 61 channel.basicPublish("exchange_rpc", "rpc", null, (System.currentTimeMillis() + "-rpc發送消息3").getBytes()); 62 } 63 64 DefaultConsumer c = new DefaultConsumer(channel) { 65 //這是一個回調函數,客戶端獲取消息,就調用此方法,處理消息 66 @Override 67 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 68 throws IOException { 69 if (corrID.equals(properties.getCorrelationId())) { 70 System.out.println("correlationID對應上的消息:" + new String(body)); 71 } else { 72 System.out.println("correlationID未對應上的消息:" + new String(body)); 73 } 74 } 75 }; 76 channel.basicConsume(replyQueue, true, c); 77 } 78 79 }
消費端,也就是服務器端代碼
1 package test; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.DefaultConsumer; 8 import com.rabbitmq.client.Envelope; 9 import com.rabbitmq.client.AMQP.BasicProperties; 10 11 import utils.ChannelUtils; 12 13 public class RPCServer { 14 15 public static void main(String[] args) throws IOException { 16 RPCServer rpcServer = new RPCServer(); 17 rpcServer.Server(); 18 } 19 20 public void Server() throws IOException { 21 final Channel channel = ChannelUtils.getChannelInstance("server"); 22 23 DefaultConsumer c = new DefaultConsumer(channel) { 24 25 //這是一個回到函數,服務器端獲取到消息,就會調用此方法處理消息 26 @Override 27 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 28 throws IOException { 29 System.out.println(new String(body)); 30 31 AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); 32 //我們在將要回復的消息屬性中,放入從客戶端傳遞過來的correlateId 33 builder.correlationId(properties.getCorrelationId()); 34 AMQP.BasicProperties prop = builder.build(); 35 36 //發送給回復隊列的消息,exchange="",routingKey=回復隊列名稱 37 //因為RabbitMQ對於隊列,始終存在一個默認exchange="",routingKey=隊列名稱的綁定關系 38 channel.basicPublish("", properties.getReplyTo(), prop, (new String(body) + "-回復").getBytes()); 39 40 } 41 }; 42 channel.basicConsume("queue_rpc", true, c); 43 } 44 45 }
附錄:ChannelUtils 的源碼
package utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ChannelUtils { // AMQP的連接其實是對Socket做的封裝, 注意以下AMQP協議的版本號,不同版本的協議用法可能不同。 public static Channel getChannelInstance(String ConnectionDescription) { try { ConnectionFactory connectionFactory = getConnectionFactory(); Connection connection = connectionFactory.newConnection(ConnectionDescription); return connection.createChannel(); } catch (Exception e) { throw new RuntimeException("獲取Channel連接失敗"); } } public static ConnectionFactory getConnectionFactory() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.1.111"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("drs"); connectionFactory.setPassword("123456"); return connectionFactory; } }