原文來自 RabbitMQ 英文官網的教程(6.Remote procedure call - RPC),其示例代碼采用了 .NET C# 語言。

In the second tutorial we learned how to use Work Queues to distribute time-consuming tasks among multiple workers.
在第二篇教程中,我們學習了如何使用工作隊列在多個工作單元之間分配耗時的任務。
But what if we need to run a function on a remote computer and wait for the result? Well, that's a different story. This pattern is commonly known as Remote Procedure Call or RPC.
但是假如我們需要運行一個在遠程電腦上的函數並等待其結果將會怎樣呢?好吧,這將是一個完全不同的故事,這個模式被普遍認為叫遠程過程調用或者簡稱 RPC。
In this tutorial we're going to use RabbitMQ to build an RPC system: a client and a scalable RPC server. As we don't have any time-consuming tasks that are worth distributing, we're going to create a dummy RPC service that returns Fibonacci numbers.
在本教程中我們即將使用 RabbitMQ 來構建一個 RPC 系統:一個客戶端和一個可伸縮的 RPC 服務器。由於我們並沒有任何耗時的任務能拿來分配,那就創建一個返回斐波納契數列的虛擬 RPC 服務吧。
Client interface
客戶端接口
To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:
為了說明如何使用 RPC 服務我們來創建一個簡單的客戶端類。我會公開一個名叫 call 的方法,該方法用以發送一個 RPC 請求並保持阻塞狀態,直至接收到應答為止。
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
A note on RPC
關於 RPC
Although RPC is a pretty common pattern in computing, it's often criticised. The problems arise when a programmer is not aware whether a function call is local or if it's a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.
盡管 RPC 是一個很常見的計算模式,也時常遭受批評。當程序員不知道針對 call 函數的調用是本地的還是很慢的 RPC 時就會出現問題,像這樣的困惑往往會導致不可預測的系統(問題)以及徒增不必要的調試復雜性。與簡化軟件有所不同的是,誤用 RPC 會導致難以維護的意大利面條式代碼。
Bearing that in mind, consider the following advice:
-
Make sure it's obvious which function call is local and which is remote.
-
Document your system. Make the dependencies between components clear.
-
Handle error cases. How should the client react when the RPC server is down for a long time?
記住以上問題,並考慮以下建議:
- 確保可以明顯區分哪一個函數是調用本地的,哪一個是遠程的。
- 為系統編寫文檔,確保組件之間的依賴很明確。
- 處理錯誤情形,當 RPC 服務端停機很長時間時,客戶端會怎樣應對?
When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.
當有疑問時先避免使用 RPC,如果可以,考慮使用一個異步管道 - 它類似於 RPC 的阻塞,會通過異步的方式將結果推送到下一個計算場景。
Callback queue
回調隊列
In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response we need to send a 'callback' queue address with the request:
一般而言,基於 RabbitMQ 來使用 RPC 是很簡單的,即客戶端發送一個請求消息,然后服務端使用一個響應消息作為應答。為了能獲得一個響應,我們需要在請求過程中發送一個“callback”隊列地址。
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
// ... then code to read a response message from the callback_queue ...
Message properties
消息屬性
The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
-
deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
-
contentType: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
-
replyTo: Commonly used to name a callback queue.
-
correlationId: Useful to correlate RPC responses with requests.
AMQP 0-9-1 協議會在消息中預定義包含有 14 個屬性的集合,大部分的屬性用得都比較少,除了以下幾項之外:
- deliveryMode:將消息標記為持久的(值為2),或者瞬時的(其他值),想必你在第二篇教程中還記得這個屬性。
- contentType:經常用來描述編碼的 mime 類型,比如在常見的 JSON 編碼中一個好的實踐便是設置該屬性為:application/json。
- replyTo:通常用來為回調隊列命名。
- correlationId:用以將 RPC 響應與請求關聯起來。
Correlation Id
CorrelationId
In the method presented above we suggest creating a callback queue for every RPC request. That's pretty inefficient, but fortunately there is a better way - let's create a single callback queue per client.
在上面呈現的方法中我們建議為每一個 RPC 請求創建一個回調隊列,不過這很低效,幸運的是我們有更好的辦法 - 讓我們為每一個客戶端創建一個單獨的回調。
That raises a new issue, having received a response in that queue it's not clear to which request the response belongs. That's when the correlationId property is used. We're going to set it to a unique value for every request. Later, when we receive a message in the callback queue we'll look at this property, and based on that we'll be able to match a response with a request. If we see an unknown correlationId value, we may safely discard the message - it doesn't belong to our requests.
這就又會出現一個問題,即在收到響應的隊列中,並不清楚哪個請求隸屬於該響應,這便是 correlationId 屬性所用之處。我們將會對每一個請求設置 correlationId 為唯一值,然后,當我們在回調隊列中接收到消息時會查看這個屬性,在該屬性的基礎上,我們可以讓請求與響應進行匹配。如果我們發現有未知的 correlationId 值,則可以放心地丟棄這些並不屬於我們的請求的消息。
You may ask, why should we ignore unknown messages in the callback queue, rather than failing with an error? It's due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending us the answer, but before sending an acknowledgment message for the request.
If that happens, the restarted RPC server will process the request again. That's why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.
你可能會問,我們為什么應該在回調隊列中忽略未知的消息,而不是(直接)返回錯誤?這可能是由於服務端存在競態條件。盡管不太可能,但是針對一個請求,RPC 服務器很可能在發送完應答后中止,而不是在發送確認消息之前。如果確實發生,重啟的 RPC 服務將再一次處理這個請求,這就是為什么我們在客戶端需要優雅地處理重復的響應,以及應該(保持)理想地冪等性。
Summary
總結

Our RPC will work like this:
-
When the Client starts up, it creates an anonymous exclusive callback queue.
-
For an RPC request, the Client sends a message with two properties: replyTo, which is set to the callback queue and correlationId, which is set to a unique value for every request.
-
The request is sent to an rpc_queue queue.
-
The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.
-
The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application.
RPC 會像如下這樣運作:
- 當客戶端啟動時,它將創建一個匿名的獨有回調隊列。
- 針對一個 RPC 請求,客戶端會發送一個基於兩個屬性的消息:一個是指向回調隊列的 replyTo,另一個是為每一個請求標記唯一值的 correlationId。
- 請求將發送至 rpc_queue 隊列。
- RPC 工作單元(或者叫服務端)會在隊列中持續等待請求。當請求出現時,RPC 將完成工作,同時使用來自 replyTo 字段(所指代)的隊列來發送攜帶着結果的消息返回至客戶端。
- 客戶端在回調隊列上等待着數據,當一個消息出現時,客戶端會檢查 correlationId 屬性,如果該值與當前請求的值相匹配,則把響應返回給應用程序。
Putting it all together
融合一起
The Fibonacci task:
斐波納契任務(函數)
private static int fib(int n)
{
if (n == 0 || n == 1) return n;
return fib(n - 1) + fib(n - 2);
}
We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible).
我們聲明了斐波納契函數,並假定只(允許)輸入正整數。(不要期望輸入過大的數字,因為很可能這個遞歸實現會非常慢)
The code for our RPC server RPCServer.cs looks like this:
針對我們的 RPC 服務端,RPCServer.cs 類文件的代碼看起來如下:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class RPCServer
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "rpc_queue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "rpc_queue",
autoAck: false, consumer: consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
{
string response = null;
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib({0})", message);
response = fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
///
/// Assumes only valid positive integer input.
/// Don't expect this one to work for big numbers, and it's
/// probably the slowest recursive implementation possible.
///
private static int fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}
return fib(n - 1) + fib(n - 2);
}
}
The server code is rather straightforward:
-
As usual we start by establishing the connection, channel and declaring the queue.
-
We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set the prefetchCount setting in channel.basicQos.
-
We use basicConsume to access the queue. Then we register a delivery handler in which we do the work and send the response back.
服務端的代碼是相當簡單的。
- 像往常一樣,我們先建立連接、信道以及聲明隊列。
- 我們可能想運行不只一個服務端處理程序,為了能通過多台服務器平均地分擔負載,我們需要設定 channel.basicQos 中 prefetchCount 的值。
- 我們使用 basicConsume 來訪問隊列,然后注冊一個遞送程序,在這個程序中我們執行工作並返回響應。
The code for our RPC client RPCClient.cs:
針對我們的 RPC 客戶端,RPCClient.cs 類文件的代碼如下:
using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
public class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RpcClient()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
{
respQueue.Add(response);
}
};
}
public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
return respQueue.Take(); ;
}
public void Close()
{
connection.Close();
}
}
public class Rpc
{
public static void Main()
{
var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
}
}
The client code is slightly more involved:
-
We establish a connection and channel and declare an exclusive 'callback' queue for replies.
-
We subscribe to the 'callback' queue, so that we can receive RPC responses.
-
Our call method makes the actual RPC request.
-
Here, we first generate a unique correlationId number and save it - the while loop will use this value to catch the appropriate response.
-
Next, we publish the request message, with two properties: replyTo and correlationId.
-
At this point we can sit back and wait until the proper response arrives.
-
The while loop is doing a very simple job, for every response message it checks if the correlationId is the one we're looking for. If so, it saves the response.
-
Finally we return the response back to the user.
客戶端的代碼稍微多一些:
- 我們建立連接和信道,以及針對答復(響應)聲明一個獨有的“callback”隊列。
- 我們訂閱這個“callback”隊列,以便可以接收到 RPC 響應。
- 我們的 call 方法將發起一個實際的 RPC 請求。
- 在此,我們首先生成一個唯一的 correlationId 編號並保存好它,因為 while 循環會使用該值來捕獲匹配的響應。
- 接下來,我們發布請求消息,它包含了兩個屬性:replyTo 和 correlationId。
- 此時,我們可以稍微等待一下直到指定的響應到來。
- while 循環所做的事情非常簡單,對於每一個響應消息,它都會檢查 correlationId 是否為我們正在尋找的那一個,如果是就保存該響應。
- 最終,我們將響應返回給用戶。
Making the Client request:
客戶端請求
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
Now is a good time to take a look at our full example source code (which includes basic exception handling) for RPCClient.cs and RPCServer.cs.
Set up as usual (see tutorial one):
Our RPC service is now ready. We can start the server:
現在是時候來看一下 RPCClient.cs 和 RPCServer.cs 完整的示例代碼了(包含了基本的異常處理)。
像往常一下創建(可參考第一篇):
我們的 RPC 服務已經就緒,現可以開啟服務端:
cd RPCServer
dotnet run
# => [x] Awaiting RPC requests
To request a fibonacci number run the client:
運行客戶端來請求一個斐波納契數:
cd RPCClient
dotnet run
# => [x] Requesting fib(30)
The design presented here is not the only possible implementation of a RPC service, but it has some important advantages:
-
If the RPC server is too slow, you can scale up by just running another one. Try running a second RPCServer in a new console.
-
On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queueDeclare are required. As a result the RPC client needs only one network round trip for a single RPC request.
目前所呈現的設計不僅僅是 RPC 服務的可能實現,而且還有一些重要優點:
- 如果 RPC 服務很慢,你可以通過運行另一個來橫向擴展,也就是嘗試在新的控制台中運行第二個 RPCServer。
- 在客戶端,RPC 只能發送和接收一條消息,必需像 queueDeclare 那樣進行非同步式調用。因此,RPC 客戶端只需要單次請求的一次網絡往返。
Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:
-
How should the client react if there are no servers running?
-
Should a client have some kind of timeout for the RPC?
-
If the server malfunctions and raises an exception, should it be forwarded to the client?
-
Protecting against invalid incoming messages (eg checking bounds, type) before processing.
我們的代碼仍然很簡單,也並沒有嘗試去解決更復雜(但很重要的)問題,比如就像:
- 如果服務端沒有運行,那么客戶端將如何應對?
- 客戶端針對 RPC 是否應該有某種超時(應對措施)?
- 如果服務端出現故障並引發異常,它是否應該轉發給客戶端?
- 在處理之前防備無效的傳入消息(比如檢查邊界和類型)。
