實現RPC
首先要弄明白,RPC是個什么東西。
(RPC) Remote Procedure Call Protocol 遠程過程調用協議
在一個大型的公司,系統由大大小小的服務構成,不同的團隊維護不同的代碼,部署在不同的機器。但是在做開發時候往往要用到其它團隊的方法,因為已經有了實現。但是這些服務部署不同的機器上,想要調用就需要網絡通信,這些代碼繁瑣且復雜,一不小心就會寫的很低效。RPC協議定義了規划,其它的公司都給出了不同的實現。比如微軟的wcf,以及現在火熱的WebApi。
在RabbitMQ中RPC的實現也是很簡單高效的,現在我們的客戶端、服務端都是消息發布者與消息接收者。
首先客戶端通過RPC向服務端發出請求
我這里有一堆東西需要你給我處理一下,correlation_id:這是我的請求標識,erply_to:你處理完過后把結果返回到這個隊列中。
服務端拿到了請求,開始處理並返回
correlation_id:這是你的請求標識 ,原封不動的給你。 這時候客戶端用自己的correlation_id與服務端返回的id進行對比。是我的,就接收。
一些繁瑣的細節rabbitmq已經為我們封裝了,簡單的SimpleRpcServer與SimpleRpcClient讓Rpc實現的更為方便,這里可以先看一下server端
使用默認的交換機,創建一個SimpleRpcServer的實例,這里需要注意的是,SimpleRpcServer的處理應該是根據業務來的,也就是自己的。在給出的類中沒有任何的實現,如果我們創建一個自己的RpcServer並且給出實現
//創建返回一個新的頻道 using (var channel = RabbitMqHelper.GetConnection().CreateModel()) { //創建一個rpc queue channel.QueueDeclare("RpcQueue", true, false, false, null); SimpleRpcServer rpc = new SmsSimpleRpcServer(new Subscription(channel, "RpcQueue")); Console.WriteLine("服務端啟動成功");
rpc.MainLoop();
Console.ReadKey();
}
這里是自己的一個RpcServer,在HandleSimpleCall方法里返回對處理的回調消息,在ProcessRequest中做出具體的處理邏輯
/// <summary> /// 發送短信的Rpc /// </summary> public class SmsSimpleRpcServer : SimpleRpcServer { public SmsSimpleRpcServer(Subscription subscription) : base(subscription) { } /// <summary> /// 執行完成后進行h回調 /// </summary> /// <param name="isRedelivered"></param> /// <param name="requestProperties"></param> /// <param name="body"></param> /// <param name="replyProperties"></param> /// <returns></returns> public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties) { replyProperties = null; return Encoding.UTF8.GetBytes($"給{Encoding.UTF8.GetString(body)}發送短信成功"); } /// <summary> /// 進行處理 /// </summary> /// <param name="evt"></param> public override void ProcessRequest(BasicDeliverEventArgs evt) { // todo..... base.ProcessRequest(evt); } }
回到client端,這里的代碼也是非常容易的。創建一個SimpleRpcClient,然后指定了交換機類型,因為用的是默認的,所以exchange傳的是null, routingkey是我們的rpcqueue。最后調用call方法
using (var channel = RabbitMqHelper.GetConnection().CreateModel()) { //創建client的rpc SimpleRpcClient client = new SimpleRpcClient(channel, new PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName: string.Empty, routingKey: "RpcQueue")); bool flag = true; var sendmsg = ""; while (flag) { Console.WriteLine("請輸入要發送的消息"); sendmsg = Console.ReadLine(); if (string.IsNullOrWhiteSpace(sendmsg)) { Console.Write("請輸入消息"); continue; } var msg = client.Call(Encoding.UTF8.GetBytes(sendmsg)); Console.WriteLine(Encoding.UTF8.GetString(msg)); } Console.ReadKey(); }
把程序運行起來
后面說一些內部的東西,其實上在創建一次SimpleRpcClient的時候都會創建一個回調隊列,這個隊列在程序關閉后會自動消失,所以這些建議創建一次就夠了,都使用這個。如果創建多次會影響性能
在回調的時候,通過源碼也可以看到判斷了correlation_id的一致性
在server端也可以看到在執行Process后會發布消息到回調隊列