RabbitMQ 實現RPC


NMIV$QS2F)`R0Y38S03W}DU

實現RPC

首先要弄明白,RPC是個什么東西。

(RPC) Remote Procedure Call Protocol 遠程過程調用協議

在一個大型的公司,系統由大大小小的服務構成,不同的團隊維護不同的代碼,部署在不同的機器。但是在做開發時候往往要用到其它團隊的方法,因為已經有了實現。但是這些服務部署不同的機器上,想要調用就需要網絡通信,這些代碼繁瑣且復雜,一不小心就會寫的很低效。RPC協議定義了規划,其它的公司都給出了不同的實現。比如微軟的wcf,以及現在火熱的WebApi。

 

在RabbitMQ中RPC的實現也是很簡單高效的,現在我們的客戶端、服務端都是消息發布者與消息接收者。

首先客戶端通過RPC向服務端發出請求

我這里有一堆東西需要你給我處理一下,correlation_id:這是我的請求標識,erply_to:你處理完過后把結果返回到這個隊列中。

服務端拿到了請求,開始處理並返回

correlation_id:這是你的請求標識 ,原封不動的給你。 這時候客戶端用自己的correlation_id與服務端返回的id進行對比。是我的,就接收。

image

 

一些繁瑣的細節rabbitmq已經為我們封裝了,簡單的SimpleRpcServer與SimpleRpcClient讓Rpc實現的更為方便,這里可以先看一下server端

使用默認的交換機,創建一個SimpleRpcServer的實例,這里需要注意的是,SimpleRpcServer的處理應該是根據業務來的,也就是自己的。在給出的類中沒有任何的實現,如果我們創建一個自己的RpcServer並且給出實現

          //創建返回一個新的頻道
            using (var channel = RabbitMqHelper.GetConnection().CreateModel())
            {

                //創建一個rpc queue
                channel.QueueDeclare("RpcQueue", true, false, false, null);

                SimpleRpcServer rpc = new SmsSimpleRpcServer(new Subscription(channel, "RpcQueue"));

                Console.WriteLine("服務端啟動成功");
   rpc.MainLoop();
                Console.ReadKey();

            }

這里是自己的一個RpcServer,在HandleSimpleCall方法里返回對處理的回調消息,在ProcessRequest中做出具體的處理邏輯

/// <summary>
    /// 發送短信的Rpc
    /// </summary>
    public class SmsSimpleRpcServer : SimpleRpcServer
    {
        public SmsSimpleRpcServer(Subscription subscription) : base(subscription)
        {
        }

        /// <summary>
        /// 執行完成后進行h回調
        /// </summary>
        /// <param name="isRedelivered"></param>
        /// <param name="requestProperties"></param>
        /// <param name="body"></param>
        /// <param name="replyProperties"></param>
        /// <returns></returns>
        public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
        {
            replyProperties = null;
            return Encoding.UTF8.GetBytes($"給{Encoding.UTF8.GetString(body)}發送短信成功");
        }


        /// <summary>
        /// 進行處理
        /// </summary>
        /// <param name="evt"></param>
        public override void ProcessRequest(BasicDeliverEventArgs evt)
        {
            // todo.....
            base.ProcessRequest(evt);
        }
    }

 

回到client端,這里的代碼也是非常容易的。創建一個SimpleRpcClient,然后指定了交換機類型,因為用的是默認的,所以exchange傳的是null, routingkey是我們的rpcqueue。最后調用call方法

using (var channel = RabbitMqHelper.GetConnection().CreateModel())
           {
               //創建client的rpc
               SimpleRpcClient client = new SimpleRpcClient(channel, new PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName: string.Empty, routingKey: "RpcQueue"));
               bool flag = true;
               var sendmsg = "";
               while (flag)
               {
                   Console.WriteLine("請輸入要發送的消息");
                   sendmsg = Console.ReadLine();
                   if (string.IsNullOrWhiteSpace(sendmsg))
                   {
                       Console.Write("請輸入消息");
                       continue;
                   }

                   var msg = client.Call(Encoding.UTF8.GetBytes(sendmsg));

                   Console.WriteLine(Encoding.UTF8.GetString(msg));


               }

               Console.ReadKey();

           }

把程序運行起來

image

 

后面說一些內部的東西,其實上在創建一次SimpleRpcClient的時候都會創建一個回調隊列,這個隊列在程序關閉后會自動消失,所以這些建議創建一次就夠了,都使用這個。如果創建多次會影響性能

image

image

 

在回調的時候,通過源碼也可以看到判斷了correlation_id的一致性

NMIV$QS2F)`R0Y38S03W}DU[4]

 

在server端也可以看到在執行Process后會發布消息到回調隊列

image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM