在第二個教程中,我們學習了如何使用工作隊列在多個worker之間分配耗時的任務。
但是如果我們需要在遠程計算機上運行功能並等待結果呢?嗯,這是另外一件事情,這種模式通常被稱為遠程過程調用(RPC)。
在本教程中我們將使用RabbitMQ的建立一個RPC系統:一個客戶端和一個可伸縮的RPC服務器。由於我們沒有什么耗時的任務,我們要創建一個返回斐波那契數虛設RPC服務。
客戶端接口(Client interface)
為了說明RPC如何使用,我們將創建一個簡單的客戶端類。它將創建一個名為call的方法——發送RPC請求,並且處於阻塞狀態,直到收到應答。
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
PRC筆記
盡管PRC是一個常見的模式,它經常受到批評。當程序員不知道他所調用的方法是本地的還是一個緩慢的RPC,問題就出現了。這樣的混亂在系統中造成不可預料的結果,並增加了不必要的調試的復雜性,相比於簡單的軟件,PRC的濫用可能導致造成不可維護的面條式的代碼。
考慮到這一點,請參考以下建議:確保能明確分辨出哪些函數是本地的,哪些是遠程的。
建立文檔,讓組件之間的依賴關系更清楚。
處理錯誤的case,如果RPC服務器掛了很長時間,客戶端應該怎么處理?
如果對以上有疑問,請避免使用。如果沒有,你也應該使用異步管道,而不是阻塞式的RPC調用,結果被異步地推到下一個計算階段。
回調隊列(Callback queue)
一般來說利用RabbitMQ來做RPC是很簡單的。客戶端發送請求消息,服務端回復應答消息。為了能收到回復,我們需要發送一個“callback”隊列地址在請求里面。我們可以使用默認隊列(這是Java客戶端獨有的):
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
消息屬性
AMQP協議預定義了14個屬性去發送消息。大部分的屬性都很少使用,但是下列除外:
deliveryMode:標記的消息為持久(值為2)或暫時的(任何其他值)。你可能還記得第二個教程中的此屬性。
contentType:用於描述MIME類型的編碼。例如,對於經常使用JSON編碼,是一個很好的做法,將此屬性設置為:application/json。
eplyTo: 常用於命名一個回調隊列。
correlationId: 用於關聯的RPC響應。
我們需要import:
import com.rabbitmq.client.AMQP.BasicProperties;
關聯標識(Correlation Id)
在上面介紹的方法中,我們建議為每一個RPC請求建立一個回調隊列。這是相當低效的,幸好有一個更好的辦法 - 讓我們創建每個客戶端一個回調隊列。
這樣產生了一個新的問題,在收到該回調隊列的響應的時候,我們並不知道該響應是哪個請求的響應,這就是correlationId屬性的用處,我們將它設置為每個請求的唯一值。這樣,當我們在回調隊列收到一條消息的時候,我們將看看這個屬性,就能找到與這個響應相對應的請求。如果我們看到一個未知的correlationId,我們完全可以丟棄消息,因為他不並不屬於我們系統。
你也許會問,為什么我們選擇丟棄這個消息,而不是拋出一個錯誤。這是為了解決服務器端有可能發生的競爭情況。盡管可能性不大,但RPC服務器還是有可能在已將應答發送給我們但還未將確認消息發送給請求的情況下死掉。如果這種情況發生,RPC在重啟后會重新處理請求。這就是為什么我們必須在客戶端優雅的處理重復響應,同時RPC也需要盡可能保持冪等性。
總結
我們的RPC這樣工作:
代碼整合
斐波那契數列任務:
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
我們定義一個斐波那契的方法,假定只有有效的正整數輸入。(不要指望它為大數據工作,這可能是最慢的遞歸實現)
我們的RPC服務器RPCServer.java的代碼如下:
private static final String RPC_QUEUE_NAME = "rpc_queue";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
以上服務端的代碼很簡單:
- 和通常一樣,我們從建立一個連接,一個通道和定義一個隊列開始。
- 我們可能需要運行多個服務器進程。為了在多個服務器上均勻分布的負荷,我們需要設置channel.basicQos中的prefetchCount。
- 我們使用basicConsume訪問隊列。然后,進入while循環中,等待請求消息,完成工作並發送回響應。
RPCClient.java:
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
客戶端的代碼稍微復雜:
- 我們建立一個連接,一個通道和一個用於接收回復的回調隊列。
- 我們訂閱“回調”的隊列,這樣我們就可以接收RPC響應。
- 我們的call方法發出實際的RPC請求。
- 在這里,我們首先生成一個唯一的correlationID,並保存它 - while循環會使用這個值來捕捉適當的響應。
- 接下來,我們發布請求消息時,具有兩個屬性:的replyTo和的correlationID。
- 在這一點上,我們可以坐下來,等到適當的響應到達。
- while循環正在做一個很簡單的工作,對於每一個響應消息它會檢查的correlationID是我們要找的人。如果是,它將保存的響應。
- 最后,我們返回響應給用戶。
客戶端請求:
RPCClient fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
fibonacciRpc.close();
以上的設計不是唯一可能的實現一個RPC服務的,但它有一些重要的優點:
- 如果RPC服務器速度太慢,則只需運行多個即可。嘗試在新的控制台運行的第二RPCServer。
- 在客戶端,RPC請求只發送或接收一條消息。不需要像 queue_declare 這樣的異步調用。所以RPC客戶端的單個請求只需要一個網絡往返。
我們的代碼依舊非常簡單,而且沒有試圖去解決一些復雜(但是重要)的問題,如:
- 當沒有服務器運行時,客戶端如何作出反映。
- 客戶端是否需要實現類似RPC超時的東西。
- 如果服務器發生故障,並且拋出異常,應該被轉發到客戶端嗎?
- 在處理前,防止混入無效的信息(例如檢查邊界)
原文地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
代碼地址:https://github.com/aheizi/hi-mq
相關:
1.RabbitMQ之HelloWorld
2.RabbitMQ之任務隊列
3.RabbitMQ之發布訂閱
4.RabbitMQ之路由(Routing)
5.RabbitMQ之主題(Topic)
6.RabbitMQ之遠程過程調用(RPC)