先決條件
本教程假定 RabbitMQ 已經安裝,並運行在localhost
標准端口(5672)。如果你使用不同的主機、端口或證書,則需要調整連接設置。從哪里獲得幫助
如果您在閱讀本教程時遇到困難,可以通過郵件列表 聯系我們。
在第 教程[2] 中,我們學習了如何使用工作隊列在多個工作單元之間分配耗時任務。
但是如果我們想要運行一個在遠程計算機上的函數並等待其結果呢?這將是另外一回事了。這種模式通常被稱為 遠程過程調用 或 RPC 。
在本篇教程中,我們將使用 RabbitMQ 構建一個 RPC 系統:一個客戶端和一個可擴展的 RPC 服務器。由於我們沒有什么耗時任務值得分發,那干脆就創建一個返回斐波那契數列的虛擬 RPC 服務吧。
客戶端接口
為了說明如何使用 RPC 服務,我們將創建一個簡單的客戶端類。該類將暴露一個名為Call
的方法,用來發送 RPC 請求並且保持阻塞狀態,直到接收到應答為止。
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
關於 RPC 的說明
盡管 RPC 在計算機中是一種很常見的模式,但它經常受到批評。問題出現在當程序員不知道一個函數是本地調用還是一個耗時的 RPC 請求。這樣的混淆,會導致系統不可預測,以及給調試增加不必要的復雜性。誤用 RPC 可能會導致不可維護的混亂代碼,而不是簡化軟件。
牢記這些限制,請考慮如下建議:
- 確保可以明顯區分哪些函數是本地調用,哪些是遠程調用。
- 為您的系統編寫文檔,明確組件之間的依賴關系。
- 捕獲異常,當 RPC 服務長時間宕機時客戶端該如何應對。
當有疑問的時候可以先避免使用 RPC。如果可以的話,考慮使用異步管道 - 而不是類似 RPC 的阻塞,其會將結果以異步的方式推送到下一個計算階段。
回調隊列
一般來講,基於 RabbitMQ 進行 RPC 通信是非常簡單的,客戶端發送一個請求消息,然后服務端用一個響應消息作為應答。為了能接收到響應,我們需要在發送請求過程中指定一個'callback'隊列地址。
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
// ... then code to read a response message from the callback_queue ...
消息屬性
AMQP 0-9-1 協議在消息中預定義了一個包含 14 個屬性的集合,大多數屬性很少使用,但以下情況除外:
Persistent
:將消息標記為持久的(值為2)或者瞬時的(其他值),可以參考 教程[2]。
DeliveryMode
:熟悉 AMQP 協議的人可以選擇此屬性而不是熟悉協議的人可以選擇使用此屬性而不是Persistent
,它們控制的東西是一樣的。
ContentType
:用於描述編碼的 mime 類型。例如,對於經常使用的 JSON 編碼,將此屬性設置為:application/json
是一種很好的做法。
ReplyTo
:通常用於命名回調隊列。
CorrelationId
:用於將 RPC 響應與請求相關聯。
關聯ID
在上面介紹的方法中,我們建議為每個 RPC 請求創建一個回調隊列,但是這種方式效率低。幸運的是我們有一種更好的方式,那就是為每個客戶端創建一個獨立的回調隊列。
這種方式會引出一個新的問題,在收到響應的回調隊列中,它無法區分響應屬於哪一個請求,此時便是CorrelationId
屬性的所用之處。我們將為每個請求的CorrelationId
設置一個唯一值。之后當我們在回調隊列接收到響應的時候,再去檢查下這個屬性是否和請求中的值匹配,如此一來,我們就可以把響應和請求關聯起來了。如果出現一個未知的CorrelationId
值,我們可以安全的銷毀這個消息,因為這個消息不屬於我們的請求。
你可能會問,為什么我們應該忽略回調隊列中的未知的消息,而不是用錯誤來標識失敗呢?這是因為於服務器端可能存在競爭條件。雖然不太可能,但是 RPC 服務器可能在僅發送了響應消息而未發送消息確認的情況下掛掉,如果出現這種情況,RPC 服務器重啟之后將會重新處理該請求。這就是為什么在客戶端上我們必須優雅地處理重復的響應,並且理想情況下 RPC 應該是冪等的。
總結
我們的 RPC 會是這樣工作:
- 客戶端啟動時,會創建一個匿名的獨占回調隊列。
- 對於 RPC 請求,客戶端發送帶有兩個屬性的消息:
ReplyTo
(設置為回調隊列)和CorrelationId
(為每個請求設置唯一值)。 - 請求被發送到
rpc_queue
隊列。 - RPC 工作線程(或者叫:服務器)正在等待該隊列上的請求。當出現請求時,它會執行該作業,並使用
ReplyTo
屬性設置的隊列將帶有結果的消息發送回客戶端。 - 客戶端等待回調隊列上的數據。出現消息時,它會檢查
CorrelationId
屬性。如果它與請求中的值匹配,則返回對應用程序的響應。
組合在一起
斐波納契 任務:
private static int fib(int n)
{
if (n == 0 || n == 1) return n;
return fib(n - 1) + fib(n - 2);
}
我們宣布我們的斐波那契函數。並假定只允許有效的正整數輸入。 (不要期望這個適用於大數字,它可能是最慢的遞歸實現)。
我們的 RPC 服務端代碼 RPCServer.cs 看起來如下所示:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class RPCServer
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "rpc_queue", durable: false,
exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "rpc_queue",
autoAck: false, consumer: consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
{
string response = null;
var body = ea.Body;
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine(" [.] fib({0})", message);
response = fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine(" [.] " + e.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
///
/// Assumes only valid positive integer input.
/// Don't expect this one to work for big numbers, and it's
/// probably the slowest recursive implementation possible.
///
private static int fib(int n)
{
if (n == 0 || n == 1)
{
return n;
}
return fib(n - 1) + fib(n - 2);
}
}
服務端代碼非常簡單:
- 像往常一樣,首先建立連接,通道和聲明隊列。
- 我們可能希望運行多個服務器進程。為了在多個服務器上平均分配負載,我們需要設置
channel.BasicQos
中的prefetchCount
值。 - 使用
BasicConsume
訪問隊列,然后注冊一個交付處理程序,並在其中完成工作並發回響應。
我們的 RPC 客戶端 RPCClient.cs 代碼:
using System;
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
public class RpcClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props;
public RpcClient()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel);
props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var response = Encoding.UTF8.GetString(body);
if (ea.BasicProperties.CorrelationId == correlationId)
{
respQueue.Add(response);
}
};
}
public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(
exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
return respQueue.Take(); ;
}
public void Close()
{
connection.Close();
}
}
public class Rpc
{
public static void Main()
{
var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
}
}
客戶端代碼稍微復雜一些:
- 建立連接和通道,並為響應聲明一個獨有的 'callback' 隊列。
- 訂閱這個 'callback' 隊列,以便可以接收到 RPC 響應。
Call
方法用來生成實際的 RPC 請求。- 在這里,我們首先生成一個唯一的
CorrelationId
編號並保存它,while 循環會使用該值來捕獲匹配的響應。 - 接下來,我們發布請求消息,其中包含兩個屬性:
ReplyTo
和CorrelationId
。 - 此時,我們可以坐下來稍微一等,直到指定的響應到來。
- while 循環做的工作非常簡單,對於每個響應消息,它都會檢查
CorrelationId
是否是我們正在尋找的那一個。如果是這樣,它就會保存該響應。 - 最后,我們將響應返回給用戶。
客戶發出請求:
var rpcClient = new RPCClient();
Console.WriteLine(" [x] Requesting fib(30)");
var response = rpcClient.Call("30");
Console.WriteLine(" [.] Got '{0}'", response);
rpcClient.Close();
現在是查看 RPCClient.cs 和 RPCServer.cs 的完整示例源代碼(包括基本異常處理)的好時機哦。
像往常一樣設置(請參見 教程[1]):
我們的 RPC 服務現已准備就緒,現在可以啟動服務端:
cd RPCServer
dotnet run
# => [x] Awaiting RPC requests
要請求斐波納契數,請運行客戶端:
cd RPCClient
dotnet run
# => [x] Requesting fib(30)
這里介紹的設計並不是 RPC 服務的唯一可能實現,但它仍具有一些重要優勢:
- 如果 RPC 服務器太慢,您可以通過運行另一個服務器來擴展。嘗試在新開一個控制台,運行第二個 RPCServer。
- 在客戶端,RPC 只需要發送和接收一條消息。不需要像
QueueDeclare
一樣同步調用。因此,對於單個 RPC 請求,RPC 客戶端只需要一次網絡往返。
我們的代碼很簡單,也並沒有嘗試去解決更復雜(但很重要)的問題,比如就像:
- 如果服務端沒有運行,客戶端應該如何反應?
- 客戶端是否應該為 RPC 設置某種超時機制?
- 如果服務端出現故障並引發異常,是否應將其轉發給客戶端?
- 在處理之前防止無效的傳入消息(例如:檢查邊界、類型)。
如果您想進行實驗,您可能會發現 管理 UI 對於查看隊列非常有用。
寫在最后
本文翻譯自 RabbitMQ 官方教程 C# 版本。如本文介紹內容與官方有所出入,請以官方最新內容為准。水平有限,翻譯的不好請見諒,如有翻譯錯誤還請指正。
- 原文鏈接:RabbitMQ tutorial - Remote procedure call (RPC)
- 實驗環境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code
- 最后更新:2018-11-17