遠程過程調用(Remote Proceddure call【RPC】)
(本實例都是使用的Net的客戶端,使用C#編寫)
在第二個教程中,我們學習了如何使用工作隊列在多個工作實例之間分配耗時的任務。
但是,如果我們需要在遠程計算機上運行功能並等待結果怎么辦? 那是一個不同的故事。 此模式通常稱為遠程過程調用或RPC。
在本教程中,我們將使用RabbitMQ構建一個RPC系統:一個客戶機和一個可擴展的RPC服務器。 由於我們沒有任何值得分發的耗時任務,我們將創建一個返回斐波納契數字的虛擬RPC服務。
1、客戶端接口【Client Interface】
為了說明如何使用RPC服務,我們將創建一個簡單的客戶端類。 它將公開一個名為call的方法,該方法發送RPC請求並阻塞,直到接收到答案:
var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close();
關於RPC的注釋
雖然RPC是一個很常見的計算模式,但它經常被批評。 當系統出現問題的時候,程序員不知道函數調用是本地函數還是緩慢的RPC調用,這樣的混亂導致了系統的不可預測性,並增加了調試的復雜性。 濫用RPC可能導致代碼的可維護性很差,這樣的設計不但沒有簡化軟件,而且只會是系統更糟。
銘記這一點,請考慮以下建議:
確保顯而易見哪個函數調用是本地的,哪個是遠程的。
記錄您的系統。 使組件之間的依賴關系清除。
處理錯誤情況。 當RPC服務器停機很長時間后,客戶端應該如何反應?
當有疑問避免RPC。 如果可以的話,您應該使用異步管道 - 而不是類似RPC的阻塞,將異步推送到下一個計算階段。
2、回調隊列【Callback queue】
一般來說RPC對RabbitMQ來說很容易。 客戶端發送請求消息,服務器回復一條響應消息。 為了收到一個響應,我們需要發送一個請求向'回調'的隊列地址:
var corrId = Guid.NewGuid().ToString(); var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); // ... then code to read a response message from the callback_queue ...
消息屬性
AMQP 0-9-1協議預先定義了一組14個隨附消息的屬性。 大多數屬性很少使用,除了以下內容:
deliveryMode:將消息標記為persistent(值為2)或transient(任何其他值)。 您可能會從第二個教程中記住此屬性。
contentType:用於描述mime類型的編碼。 例如對於經常使用的JSON編碼,將此屬性設置為:application / json是一個很好的做法。
replyTo:通常用來命名一個回調隊列。
correlationId:用於將RPC響應與請求相關聯。
3、相關標識【Correlation Id】
在上面所提出的方法中,我們建議為每個RPC請求創建一個回調隊列。這是非常低效的,但幸運的是有一個更好的方法 - 讓我們為每個客戶端創建一個回調隊列。
這將引發了一個新問題,在該隊列中收到響應,響應所屬的請求是不知道的。此時正是使用correlationId屬性的時候。我們將為每個請求設置一個唯一的值。稍后,當我們在回調隊列中收到一條消息時,我們將查看此屬性,並且基於此,我們將能夠將響應與請求相匹配。如果我們看到一個未知的correlationId值,我們可以安全地丟棄該消息 - 它不屬於我們的請求。
您可能會問,為什么我們應該忽略回調隊列中的未知消息,而不是出現錯誤?這是由於服務器端可能出現競爭情況。雖然不太可能,RPC服務器可能會在發送答復之后,但在發送請求的確認消息之前死亡。如果發生這種情況,重新啟動的RPC服務器將再次處理該請求。這就是為什么在客戶端上,我們必須優雅地處理這些重復的響應,並且RPC應該理想地是冪等的。
4、概要【Summary】
我們的RPC將像這樣工作:
當客戶端啟動時,它創建一個匿名獨占回調隊列。
對於RPC請求,客戶端發送一個具有兩個屬性的消息:replyTo,它被設置為回調隊列和correlationId,它被設置為每個請求的唯一值。
請求被發送到rpc_queue隊列。
RPC worker(aka:server)正在等待隊列上的請求。 當請求出現時,它將執行該作業,並使用replyTo字段中的隊列將結果發送回客戶端。
客戶端等待回呼隊列中的數據。 當信息出現時,它檢查correlationId屬性。 如果它與請求中的值相匹配,則返回對應用程序的響應。
5、整合
斐波納契【Fibonacci】任務:
private static int fib(int n) { if (n == 0 || n == 1) return n; return fib(n - 1) + fib(n - 2); }
我們聲明斐波那契函數。 它只假設有效的正整數輸入。 (不要指望這一個能為大數字工作,而且這可能是最慢的遞歸實現)
我們的RPC服務器RPCServer.cs的代碼如下所示:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 6 class RPCServer 7 { 8 public static void Main() 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using (var connection = factory.CreateConnection()) 12 using (var channel = connection.CreateModel()) 13 { 14 channel.QueueDeclare(queue: "rpc_queue", durable: false, 15 exclusive: false, autoDelete: false, arguments: null); 16 channel.BasicQos(0, 1, false); 17 var consumer = new EventingBasicConsumer(channel); 18 channel.BasicConsume(queue: "rpc_queue", 19 noAck: false, consumer: consumer); 20 Console.WriteLine(" [x] Awaiting RPC requests"); 21 22 consumer.Received += (model, ea) => 23 { 24 string response = null; 25 26 var body = ea.Body; 27 var props = ea.BasicProperties; 28 var replyProps = channel.CreateBasicProperties(); 29 replyProps.CorrelationId = props.CorrelationId; 30 31 try 32 { 33 var message = Encoding.UTF8.GetString(body); 34 int n = int.Parse(message); 35 Console.WriteLine(" [.] fib({0})", message); 36 response = fib(n).ToString(); 37 } 38 catch (Exception e) 39 { 40 Console.WriteLine(" [.] " + e.Message); 41 response = ""; 42 } 43 finally 44 { 45 var responseBytes = Encoding.UTF8.GetBytes(response); 46 channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, 47 basicProperties: replyProps, body: responseBytes); 48 channel.BasicAck(deliveryTag: ea.DeliveryTag, 49 multiple: false); 50 } 51 }; 52 53 Console.WriteLine(" Press [enter] to exit."); 54 Console.ReadLine(); 55 } 56 } 57 58 /// 59 60 /// Assumes only valid positive integer input. 61 /// Don't expect this one to work for big numbers, and it's 62 /// probably the slowest recursive implementation possible. 63 /// 64 65 private static int fib(int n) 66 { 67 if (n == 0 || n == 1) 68 { 69 return n; 70 } 71 72 return fib(n - 1) + fib(n - 2); 73 } 74 }
服務器代碼相當簡單:
像往常一樣,我們開始建立連接,通道並聲明隊列。
我們可能想要運行多個服務器進程。 為了在多個服務器上平均分配負載,我們需要在channel.basicQos中設置prefetchCount設置。
我們使用basicConsume訪問隊列。 然后我們注冊一個交付處理程序,我們在其中進行工作並發回響應。
我們的RPC客戶端的代碼RPCClient.cs:
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using RabbitMQ.Client; 7 using RabbitMQ.Client.Events; 8 9 class RPCClient 10 { 11 private IConnection connection; 12 private IModel channel; 13 private string replyQueueName; 14 private QueueingBasicConsumer consumer; 15 16 public RPCClient() 17 { 18 var factory = new ConnectionFactory() { HostName = "localhost" }; 19 connection = factory.CreateConnection(); 20 channel = connection.CreateModel(); 21 replyQueueName = channel.QueueDeclare().QueueName; 22 consumer = new QueueingBasicConsumer(channel); 23 channel.BasicConsume(queue: replyQueueName, 24 noAck: true, 25 consumer: consumer); 26 } 27 28 public string Call(string message) 29 { 30 var corrId = Guid.NewGuid().ToString(); 31 var props = channel.CreateBasicProperties(); 32 props.ReplyTo = replyQueueName; 33 props.CorrelationId = corrId; 34 35 var messageBytes = Encoding.UTF8.GetBytes(message); 36 channel.BasicPublish(exchange: "", 37 routingKey: "rpc_queue", 38 basicProperties: props, 39 body: messageBytes); 40 41 while(true) 42 { 43 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 44 if(ea.BasicProperties.CorrelationId == corrId) 45 { 46 return Encoding.UTF8.GetString(ea.Body); 47 } 48 } 49 } 50 51 public void Close() 52 { 53 connection.Close(); 54 } 55 } 56 57 class RPC 58 { 59 public static void Main() 60 { 61 var rpcClient = new RPCClient(); 62 63 Console.WriteLine(" [x] Requesting fib(30)"); 64 var response = rpcClient.Call("30"); 65 Console.WriteLine(" [.] Got '{0}'", response); 66 67 rpcClient.Close(); 68 } 69 }
客戶端代碼稍微復雜一些:
我們建立一個連接和通道,並為回覆聲明一個獨占的'回調'隊列。
我們訂閱'回調'隊列,這樣我們可以收到RPC響應。
我們的調用方法使得實際的RPC請求。
在這里,我們首先生成一個唯一的correlationId數字並保存它 - while循環將使用此值來捕獲適當的響應。
接下來,我們發布請求消息,此請求消息具有兩個屬性:replyTo和correlationId。
在這一點上,我們可以坐下來等待適當的響應到達。
while循環正在做一個非常簡單的工作,對於每個響應消息,它檢查correlationId是否是我們正在尋找的。 如果是這樣,它會保存響應。
最后,我們將響應返回給用戶。
讓客戶端發送請求:
var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close();
現在是看看我們的RPCClient.cs和RPCServer.cs的完整示例源代碼(包括基本異常處理)的好時機。
照常設置(參見教程一):
我們的RPC服務現在已經准備好了。 我們可以啟動服務器:
cd RPCServer
dotnet run
# => [x] Awaiting RPC requests
要請求運行客戶端的fibonacci號碼:
cd RPCClient dotnet run # => [x] Requesting fib(30)
這里提出的設計不是RPC服務的唯一可能的實現,而是具有一些重要的優點:
如果RPC服務器太慢,可以通過運行另一個RPC服務器進行擴展。 嘗試在新的控制台中運行第二個RPCServer。
在客戶端,RPC需要發送和接收一條消息。 不需要像queueDeclare這樣的同步調用。 因此,RPC客戶端只需要一個網絡往返單個RPC請求。
我們的代碼仍然非常簡單,沒有嘗試解決更復雜(但重要的)問題,例如:
如果沒有服務器運行,客戶端應該如何反應?
客戶端是否需要RPC的某種超時時間?
如果服務器發生故障並引發異常,應該將其轉發給客戶端?
在處理之前防止無效的傳入消息(例如檢查邊界,類型)。
好了,這個系列也快結束了。
在把原地址貼出來,讓大家了解更多。地址如下:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html