RabbitMQ-RPC模式

如果我們需要在遠程電腦上運行一個方法,並且還要等待一個返回結果該怎么辦?這和前面的例子不太一樣, 這種模式我們通常稱為遠程過程調用,即RPC.
在本節中,我們將會學習使用RabbitMQ去搭建一個RPC系統:一個客戶端和一個可以升級(擴展)的RPC服務器。為了模擬一個耗時任務,我們將創建一個返回斐波那契數列的虛擬的RPC服務。
客戶端
在客戶端定義一個RPCClient類,並定義一個call()方法,這個方法發送一個RPC請求,並等待接收響應結果
RPCClient client = new RPCClient();
String result = client.call("4");
System.out.println( "第四個斐波那契數是: " + result);
回調隊列 Callback Queue
使用RabbitMQ去實現RPC很容易。一個客戶端發送請求信息,並得到一個服務器端回復的響應信息。為了得到響應信息,我們需要在請求的時候發送一個“回調”隊列地址。我們可以使用默認隊列。下面是示例代碼:
//定義回調隊列,
//自動生成對列名,非持久,獨占,自動刪除
callbackQueueName = ch.queueDeclare().getQueue();
//用來設置回調隊列的參數對象
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
//發送調用消息
ch.basicPublish("", "rpc_queue", props, message.getBytes());
消息屬性 Message Properties
AMQP 0-9-1協議定義了消息的14個屬性。大部分屬性很少使用,下面是比較常用的4個:
deliveryMode:將消息標記為持久化(值為2)或非持久化(任何其他值)。
contentType:用於描述mime類型。例如,對於經常使用的JSON格式,將此屬性設置為:application/json。
replyTo:通常用於指定回調隊列。
correlationId:將RPC響應與請求關聯起來非常有用。
關聯id (correlationId):
在上面的代碼中,我們會為每個RPC請求創建一個回調隊列。 這是非常低效的,這里還有一個更好的方法:讓我們為每個客戶端創建一個回調隊列。
這就提出了一個新的問題,在隊列中得到一個響應時,我們不清楚這個響應所對應的是哪一條請求。這時候就需要使用關聯id(correlationId)。我們將為每一條請求設置唯一的的id值。稍后,當我們在回調隊列里收到一條消息的時候,我們將查看它的id屬性,這樣我們就可以匹配對應的請求和響應。如果我們發現了一個未知的id值,我們可以安全的丟棄這條消息,因為它不屬於我們的請求。
小結

RPC的工作方式是這樣的:
- 對於RPC請求,客戶端發送一條帶有兩個屬性的消息:replyTo,設置為僅為請求創建的匿名獨占隊列,和correlationId,設置為每個請求的惟一id值。
- 請求被發送到rpc_queue隊列。
- RPC工作進程(即:服務器)在隊列上等待請求。當一個請求出現時,它執行任務,並使用replyTo字段中的隊列將結果發回客戶機。
- 客戶機在回應消息隊列上等待數據。當消息出現時,它檢查correlationId屬性。如果匹配請求中的值,則向程序返回該響應數據。
代碼
服務器端
package rabbitmq.rpc;
import java.io.IOException;
import java.util.Random;
import java.util.Scanner;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCServer {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
/*
* 定義隊列 rpc_queue, 將從它接收請求信息
*
* 參數:
* 1. queue, 對列名
* 2. durable, 持久化
* 3. exclusive, 排他
* 4. autoDelete, 自動刪除
* 5. arguments, 其他參數屬性
*/
ch.queueDeclare("rpc_queue",false,false,false,null);
ch.queuePurge("rpc_queue");//清除隊列中的內容
ch.basicQos(1);//一次只接收一條消息
//收到請求消息后的回調對象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//處理收到的數據(要求第幾個斐波那契數)
String msg = new String(message.getBody(), "UTF-8");
int n = Integer.parseInt(msg);
//求出第n個斐波那契數
int r = fbnq(n);
String response = String.valueOf(r);
//設置發回響應的id, 與請求id一致, 這樣客戶端可以把該響應與它的請求進行對應
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(message.getProperties().getCorrelationId())
.build();
/*
* 發送響應消息
* 1. 默認交換機
* 2. 由客戶端指定的,用來傳遞響應消息的隊列名
* 3. 參數(關聯id)
* 4. 發回的響應消息
*/
ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
//發送確認消息
ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
//
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//消費者開始接收消息, 等待從 rpc_queue接收請求消息, 不自動確認
ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
}
protected static int fbnq(int n) {
if(n == 1 || n == 2) return 1;
return fbnq(n-1)+fbnq(n-2);
}
}
客戶端
package rabbitmq.rpc;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCClient {
Connection con;
Channel ch;
public RPCClient() throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
con = f.newConnection();
ch = con.createChannel();
}
public String call(String msg) throws Exception {
//自動生成對列名,非持久,獨占,自動刪除
String replyQueueName = ch.queueDeclare().getQueue();
//生成關聯id
String corrId = UUID.randomUUID().toString();
//設置兩個參數:
//1. 請求和響應的關聯id
//2. 傳遞響應數據的queue
BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
//向 rpc_queue 隊列發送請求數據, 請求第n個斐波那契數
ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
//用來保存結果的阻塞集合,取數據時,沒有數據會暫停等待
BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
//接收響應數據的回調對象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//如果響應消息的關聯id,與請求的關聯id相同,我們來處理這個響應數據
if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
//把收到的響應數據,放入阻塞集合
response.offer(new String(message.getBody(), "UTF-8"));
}
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//開始從隊列接收響應數據
ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
//返回保存在集合中的響應數據
return response.take();
}
public static void main(String[] args) throws Exception {
RPCClient client = new RPCClient();
while (true) {
System.out.print("求第幾個斐波那契數:");
int n = new Scanner(System.in).nextInt();
String r = client.call(""+n);
System.out.println(r);
}
}
}
