Remote procedure call (RPC)
在第二篇教程中,我們學習了如何使用工作隊列在多個工作人員之間分配耗時的任務,但是如果我們需要在遠程計算機上運行某個功能並等待結果呢?那么,這是一個不同的故事。這種模式通常稱為遠程過程調用或RPC。
在本教程中,我們將使用RabbitMQ構建一個RPC系統:一個客戶端和一個可擴展的RPC服務器。由於我們沒有任何值得分發的耗時任務,我們將創建一個返回斐波那契數字的虛擬RPC服務。
客戶端接口
為了說明如何使用RPC服務,我們將創建一個簡單的客戶端類。它將公開一個名為call的方法 ,它發送一個RPC請求並阻塞,直到收到應答:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
有關RPC的說明
雖然RPC是計算中很常見的模式,但它經常受到批評。當程序員不知道函數調用是本地的還是慢速的RPC時會出現這些問題。像這樣的混亂導致不可預測的系統,並增加了調試的不必要的復雜性,而不是簡化軟件,濫用RPC會導致不可維護的意大利面式代碼。
銘記這一點,請考慮以下建議:
- 確保顯而易見哪個函數調用是本地的,哪個是遠程的。
- 文件記錄您的系統,使組件之間的依賴關系清晰。
- 處理錯誤情況。當RPC服務器長時間關閉時,客戶端應該如何反應?
有疑問時避免使用RPC。如果可以的話,你應該使用異步管道 - 而不是類似於RPC的阻塞,結果被異步推送到下一個計算階段。
回調隊列
一般來說,通過RabbitMQ來實現RPC是很容易的。客戶端發送請求消息,服務器回復響應消息。為了收到響應消息,我們需要在請求中發送一個“callback”隊列地址。我們可以使用默認隊列(在Java客戶端中是獨占的)。讓我們試試看:
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
消息屬性
AMQP 0-9-1協議預先定義了消息的14個屬性。大多數屬性很少被使用,除了以下幾點:
- deliveryMode: 將消息標記為持久性(值為2)或瞬態(任何其他值)。您可能還記得第二個教程中的這個屬性。
- contentType: 用於描述編碼的mime類型。例如,對於經常使用的JSON編碼,將此屬性設置為: application/json
- replyTo: 通常用於命名回調隊列。
- correlationId: 有助於將RPC響應與請求關聯起來
依賴的類
import com.rabbitmq.client.AMQP.BasicProperties;
關聯的ID ( Correlation Id )
在上面介紹的方法中,我們建議為每個RPC請求創建一個回調隊列。但是這是非常低效的,幸運的是有一個更好的方法 - 為每個客戶端創建一個回調隊列。
這引發了一個新問題,該隊列中收到回復后,不清楚回復屬於哪個請求。這時就是使用correlationId屬性的時候。我們為每個請求設置一個唯一的correlationId值。稍后,當我們在回調隊列中收到消息時,我們將查看此屬性,並基於此屬性,我們將能夠將響應與請求進行匹配。如果我們看到未知的correlationId值,我們可以放心地丟棄該消息 - 它不屬於我們的請求。
您可能會問,為什么我們應該忽略回調隊列中的未知消息,而不是因為錯誤參數的失敗消息?這是由於服務器端可能出現競爭狀況。雖然不太可能,但在發送給我們答案之后,但在發送請求的確認消息之前,RPC服務器可能會死亡。如果發生這種情況,重新啟動的RPC服務器將再次處理該請求。這就是為什么在客戶端,我們必須優雅地處理重復的響應,理想情況下RPC應該是冪等的。
整合

我們的RPC會像這樣工作:
- 當客戶端啟動時,它創建一個匿名獨占callback隊列。
- 對於RPC請求,客戶端會發送一條消息,其中包含兩個屬性: replyTo,它被設置為回調隊列和correlationId,它被設置為每個請求的唯一值。
- 該請求被發送到rpc_queue隊列。
- RPC worker(又名:服務器)正在等待該隊列上的請求。當出現請求時,它執行該作業,並使用replyTo字段中的隊列將結果發送回客戶端。
- 客戶端在回調隊列中等待數據。當出現消息時,它會檢查correlationId屬性。如果它匹配來自請求的值,則返回對應用程序的響應。
RPCClient.java
package com.rabbitmq.tutorials.rpc; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.103"); connection = factory.newConnection(); channel = connection.createChannel(); //為回復聲明獨占的“callback”隊列。 replyQueueName = channel.queueDeclare().getQueue(); } //會生成實際的RPC請求 public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); //發布具有兩個屬性的請求消息: replyTo和correlationId channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); //由於消費者交付處理是在另一個線程中執行,因此我們需要在響應到達之前暫停主線程。BlockingQueue是可能的解決方案之一。這里我們創建的 容量設置為1的ArrayBlockingQueue, // 因為我們只需要等待一個響應。 final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); //訂閱'callback'隊列,以便我們可以接收RPC響應 channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override //該handleDelivery方法是做一個很簡單的工作,對每一位消費響應消息它會檢查的correlationID 是我們要找的人。如果是這樣,它將響應BlockingQueue public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } }); //從response中獲取響應 return response.take(); } public void close() throws IOException { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } finally { if (fibonacciRpc!= null) { try { fibonacciRpc.close(); } catch (IOException _ignore) {} } } } }
RPCServer.java
package com.rabbitmq.tutorials.rpc; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; /** * 斐波那契函數 * @param n * @return */ private static int fib(int n) { if (n ==0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); } public static void main(String[] argv) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.0.103"); Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body,"UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e){ System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized(this) { this.notify(); } } } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // 循環等待並准備消費RPC client發送的消息. while (true) { synchronized(consumer) { try { consumer.wait();//暫停主線程 } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) {} } } }
執行步驟:
- 啟動RPCServer.java
- 啟動RPCClient.java實例3次
全6篇完整項目地址:https://github.com/liwenzlw/rabbitmq-tutorials