rabbitMQ學習筆記(七) RPC 遠程過程調用


關於RPC的介紹請參考百度百科里的關於RPC的介紹:http://baike.baidu.com/view/32726.htm#sub32726

   現在來看看Rabbitmq中RPC吧!RPC的工作示意圖如下:


   上圖中的C代表客戶端,S表示服務器端;Rabbitmq中的RPC流程如下:

1、首先客戶端發送一個reply_to和corrention_id的請求,發布到RPC隊列中;

2、服務器端處理這個請求,並把處理結果發布到一個回調Queue,此Queue的名稱應當與reply_to的名稱一致

3、客戶端從回調Queue中得到先前correlation_id設定的值的處理結果。如果碰到和先前不一樣的corrention_id的值,將會忽略而不是拋出異常。

 

  對於上面所提到的回調Queue中的消費處理使用的是BasicProperties類;而消息 屬性在AMQP的協議中規定有14個;而很多大部分我們沒有用到。常用的幾個屬性有:

English代碼   收藏代碼
1 Message properties
2 The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
3 
4 delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial. 
5 content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json. 
6 reply_to: Commonly used to name a callback queue. 
7 correlation_id: Useful to correlate RPC responses with requests. 

 delivery_mode : 標記消息是持久性消息還是瞬態信息。在前面的“Work Queue”中我們已經提到過;   

  content_type : 用來描述MIME的類型。如把其類型設定為JSON;

  reply_to : 用於命名一個回調Queue;

  correlation_id : 用於與相關聯的請求的RPC響應.

當客戶端想要調用服務器的某個方法來完成某項功能時,就可以使用rabbitMQ支持的PRC服務。

其實RPC服務與普通的收發消息的區別不大, RPC的過程其實就是  

客戶端向服務端定義好的Queue發送消息,其中攜帶的消息就應該是服務端將要調用的方法的參數 ,並使用Propertis告訴服務端將結果返回到指定的Queue。

示例:

 1 package com.zf.rabbitmq07;
 2 
 3 import java.io.IOException;
 4 
 5 import com.rabbitmq.client.AMQP.BasicProperties;
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.ConsumerCancelledException;
10 import com.rabbitmq.client.QueueingConsumer;
11 import com.rabbitmq.client.QueueingConsumer.Delivery;
12 import com.rabbitmq.client.ShutdownSignalException;
13 
14 public class RPCServer {
15     
16     public static final String RPC_QUEUE_NAME = "rpc_queue";
17     
18     public static String sayHello(String name){
19         return "hello " + name ;
20     }
21     
22     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
23         
24         ConnectionFactory connFac = new ConnectionFactory() ;
25         connFac.setHost("localhost");
26         
27         Connection conn = connFac.newConnection() ;
28         
29         Channel channel = conn.createChannel() ;
30         
31         channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ;
32         
33         QueueingConsumer consumer = new QueueingConsumer(channel);
34         
35         channel.basicConsume(RPC_QUEUE_NAME, false , consumer) ;
36         
37         while(true){
38             System.out.println("服務端等待接收消息..");  
39             Delivery deliver = consumer.nextDelivery() ;
40             System.out.println("服務端成功收到消息..");
41             BasicProperties props =  deliver.getProperties() ;
42             
43             String message = new String(deliver.getBody() , "UTF-8") ;
44             
45             String responseMessage = sayHello(message) ;
46             
47             BasicProperties responseProps = new BasicProperties.Builder()
48             .correlationId(props.getCorrelationId())  
49             .build() ;
50             
51             //將結果返回到客戶端Queue
52             channel.basicPublish("", props.getReplyTo() , responseProps , responseMessage.getBytes("UTF-8") ) ;
53              
54             //向客戶端確認消息
55             channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
56             System.out.println("服務端返回消息完成..");
57         }
58         
59     }
60 
61 }
 1 package com.zf.rabbitmq07;
 2 
 3 import java.io.IOException;
 4 import java.util.UUID;
 5 
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.ConsumerCancelledException;
10 import com.rabbitmq.client.QueueingConsumer;
11 import com.rabbitmq.client.AMQP.BasicProperties;
12 import com.rabbitmq.client.QueueingConsumer.Delivery;
13 import com.rabbitmq.client.ShutdownSignalException;
14 
15 public class RPCClient {
16 
17     public static final String RPC_QUEUE_NAME = "rpc_queue";
18 
19     public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
20 
21         ConnectionFactory connFac = new ConnectionFactory() ;
22         connFac.setHost("localhost");
23         Connection conn = connFac.newConnection() ;
24         Channel channel = conn.createChannel() ;
25 
26         //響應QueueName ,服務端將會把要返回的信息發送到該Queue
27         String responseQueue = channel.queueDeclare().getQueue() ;
28 
29         String correlationId = UUID.randomUUID().toString() ;
30 
31         BasicProperties props = new BasicProperties.Builder()
32         .replyTo(responseQueue)
33         .correlationId(correlationId)
34         .build();
35 
36         String message = "is_zhoufeng";
37         channel.basicPublish( "" , RPC_QUEUE_NAME , props ,  message.getBytes("UTF-8"));
38 
39         QueueingConsumer consumer = new QueueingConsumer(channel)    ;
40 
41         channel.basicConsume( responseQueue , consumer) ;
42 
43         while(true){
44             
45             Delivery delivery = consumer.nextDelivery() ;
46             
47             if(delivery.getProperties().getCorrelationId().equals(correlationId)){
48                 String result = new String(delivery.getBody()) ;  
49                 System.out.println(result);
50             }
51             
52         }
53     }
54 
55 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM