C#使用Thrift作為RPC框架入門(二)


前言

  在 [上一篇](C#使用Thrift作為RPC框架入門(一) - 楊凱2020 - 博客園 (cnblogs.com)) 文章中我們講述了Thrif的基本知識,包括在C#語言下使用需要用到的工具以及使用nuget安裝thrift開發包,還描述了它支持的數據類型,以及它支持IDL的描述文件,和一個簡單的例子。

接上文,我這里再補充兩點關於IDL描述文件的:

  • Thrift支持Byte類型,我們需要用i8來表示,它對應C#中的sbyte類型,如果我們用byte關鍵字,我們會看到一個【WARNING】

  •  在使用字節數組時,我們需要用binary類型,如果我們用list<byte>,也會看到一個【WARNING】

生成的代碼--兩個接口兩個類

  我們從上一篇文章最后那個簡單的例子中可以看到,Thrift框架把我們標記為service的結構生成了對應的一個類,這個類中有兩個內部接口兩個內部類。這就是我們使用該框架的重點部分。

兩個內部接口

這兩組接口代表了框架給我生成的兩組方法,這兩組方法中一組(ISync)是用於同步調用的方法,另一組(Iface)是用於異步調用的方法,而Iface又繼承了ISync接口。

  • - Iface接口被一個為簽名為Client的類繼承,這個類是我們使用RPF框架調用的客戶端。

  • - ISync接口被作為一個簽名為Processor類的構造函數的參數,**Processor**作為響應我們客戶端請求的處理器類,我們需要自己實現具體的Handler,去處理客戶端的請求,該實現是作為**Processor**的構造函數的實參

兩個內部類2

在講兩個內部接口的時候,我們以及提過了這兩個內部類的是干什么用的,接下來我們將通過一個具體的示例,來感受一下這個兩個類的使用方法

示例

客戶端代碼:

 static void Main(string[] args)
   {
     TTransport framedTransport
                = new TSocket("127.0.0.1",9999);
     Thrift.Protocol.TCompactProtocol compactProtocol =
                new Thrift.Protocol.TCompactProtocol(framedTransport);
     ThriftIDL.Services.PeopleService.Client client 
                = new ThriftIDL.Services.PeopleService.Client(compactProtocol);
            framedTransport.Open();
            People people = new People();
            client.SetPeople(people);
           
 }

服務器端代碼:

 

static void Main(string[] args)
 {
   Thrift.Transport.TServerSocket serverSocket = new Thrift.Transport.TServerSocket(9999,1000);
   TProcessor processor=   
            new ThriftIDL.Services.PeopleService.Processor(new PeoperServiceHandler());
   Thrift.Server.TSimpleServer server = new Thrift.Server.TSimpleServer(processor,serverSocket);
   server.Serve();
 }

 

服務器端處理器對應的代碼:

 public class PeoperServiceHandler : ThriftIDL.Services.PeopleService.Iface
    {
        public People GEtPeople()
        {
            throw new NotImplementedException();
        }

        public void SetPeople(People people)
        {
            throw new NotImplementedException();
        }
    }

我這里只做演示,並沒有實現具體的方法。

多路復用處理器

  對我們服務器端的代碼細細品味之后,我們會突然發現,如果我們有多個**service**,那么我們就要寫多個這樣的服務端代碼,這個似乎是我們不能接收,更要命的是,我們要每個**service**都要對應一個監聽端口,這個有點恐怖。

  Thrift框架想我們之所想,急我們之所急,它提供了一個多路復用的處理器類--TMultiplexedProtocol(客戶端)以及TMultiplexedProcessor(服務器端),接下來我們看一下TMultiplexedProcessor類是怎么使用的

示例2

服務器端代碼:

TMultiplexedProcessor multiplexedProcessor = new TMultiplexedProcessor();
 multiplexedProcessor.RegisterProcessor("serviceName1"
              , new Service1.Processor(new Service1Handler()));
               multiplexedProcessor.RegisterProcessor("serviceName2"
              , new Service2.Processor(new Service2Handler()));
               multiplexedProcessor.RegisterProcessor("serviceName3"
              , new Service3.Processor(new Service3Handler()));
            TServerSocket serverSocket = new Thrift.Transport.TServerSocket(Port, 4000);
         TThreadPoolServer   server = new TThreadPoolServer(multiplexedProcessor, serverSocket);
          server.Serve();

代碼中的Service1.Processor、Service2.Processor、Service3.Processor是我們定義的service生成的代碼類對應的客戶端調用代碼如下:

RPC:ServiceName1:
 Thrift.Transport.TSocket socket = new Thrift.Transport.TSocket(remoteAddress, port, 4000);
            TCompactProtocol compactProtocol = new TCompactProtocol(socket);
             TMultiplexedProtocol multiplexedProtocol = 
                    new TMultiplexedProtocol(compactProtocol, "serverName1");
            Service1.Client client1=new Service1.Client(multiplexedProtocol);
            socket.Open();

RPC:ServiceName2:

Thrift.Transport.TSocket socket = new Thrift.Transport.TSocket(remoteAddress, port, 4000);
            TCompactProtocol compactProtocol = new TCompactProtocol(socket);
             TMultiplexedProtocol multiplexedProtocol =
                     new TMultiplexedProtocol(compactProtocol, "serverName2");
            Service2.Client client1=new Service2.Client(multiplexedProtocol);
            socket.Open();

RPC:ServiceName3:

 

Thrift.Transport.TSocket socket = new Thrift.Transport.TSocket(remoteAddress, port, 4000);
            TCompactProtocol compactProtocol = new TCompactProtocol(socket);
             TMultiplexedProtocol multiplexedProtocol =
                     new TMultiplexedProtocol(compactProtocol, "serverName3");
            Service3.Client client1=new Service3.Client(multiplexedProtocol);
            socket.Open();

 

  以上就是多路復用處理器,在開發過程中的使用方法,代碼用使用到的類型,我們會在下一節中講解。

  那么,現在問題又來了,如果service數量巨多的話,這樣我們會得到大量的重復的代碼,我們應該怎樣處理這種情況呢?對!我們對Client和Processor做進一步的封裝,這里的封裝我使用了**約定勝於配置**的架構策略。

封裝后的多路復用

服務器端代碼:

 

public class RPCServer
    {
        TThreadPoolServer server = null;
        TMultiplexedProcessor multiplexedProcessor = null;
        public RPCServer(int Port)
        {
            multiplexedProcessor = new TMultiplexedProcessor();
            TServerSocket serverSocket = new Thrift.Transport.TServerSocket(Port, 4000);
            server = new TThreadPoolServer(multiplexedProcessor
                    , serverSocket,new Thrift.Transport.TTransportFactory()
                ,new Thrift.Protocol.TCompactProtocol.Factory());
        }

        public void RegisterProcessor(string serverName, TProcessor processor)
        {
            multiplexedProcessor.RegisterProcessor(serverName,processor);
        }

        public void RegisterProcessor<T>(object ProcessorHandler) where T: TProcessor
        {
            string[] strArray = typeof(T).FullName.Split(new string[] { ".", "+" }
                            , StringSplitOptions.RemoveEmptyEntries);
            string serverName = strArray[strArray.Length - 2].ToUpper();

            ConstructorInfo[] constructorInfos = typeof(T).GetConstructors();
            TProcessor processor = (T)constructorInfos[0].Invoke(new object[] { ProcessorHandler });

            multiplexedProcessor.RegisterProcessor(serverName, processor);
        }

        /// <summary>
        /// 會阻塞當前線程
        /// </summary>
        public void Start()
        {
            server.Serve();
        }

        public void Stop()
        {
            server.Stop();
        }
    }

客戶端代碼:

 

 public class RPCClient<T> : IDisposable
    {
        public T Instance;
        Thrift.Transport.TSocket socket = null;
        public RPCClient(string remoteAddress, int port)
        {
            socket = new Thrift.Transport.TSocket(remoteAddress, port, 4000);
            TCompactProtocol compactProtocol = new TCompactProtocol(socket);
            string[] strArray = typeof(T).FullName.Split(new string[] { ".", "+" }  
 , StringSplitOptions.RemoveEmptyEntries);
            string serverName = strArray[strArray.Length - 2].ToUpper();
            TMultiplexedProtocol multiplexedProtocol = 
new TMultiplexedProtocol(compactProtocol, serverName);
            ConstructorInfo constructorInfo = typeof(T)
    .GetConstructor(new Type[] { typeof(TProtocol) });
            Instance = (T)constructorInfo.Invoke(new object[] { multiplexedProtocol });
        }

        public void Open()
        {
            socket.Open();
        }

        public void Close()
        {
            socket.Close();
        }
        ......  

我們使用一下代碼進行服務器端service的注冊:

 RPCProxy.RPCServer rPCServer = new RPCProxy.RPCServer(52364);
            rPCServer.RegisterProcessor("ServiceName1"
                , new ServiceName1.Processor(new ServiceName1Handler()));
        rPCServer.RegisterProcessor<ServiceName2.Processor>(,);
        rPCServer.RegisterProcessor<ServiceName3.Processor>(,);
        rPCServer.Start();

客戶端的使用 用一下代碼:

RPCProxy.RPCClient<ServiceName1.Client> rPCClient1 =
                new RPCProxy.RPCClient<ServiceName1.Client>("127.0.0.1", 52364);

 RPCProxy.RPCClient<ServiceName2.Client> rPCClient2 =
                new RPCProxy.RPCClient<ServiceName2.Client>("127.0.0.1", 52364);
                ......

結尾

  這一小節我們詳細講解了Thrift框架生成的代碼在Client和Server的使用方法,已經多路復用處理器的使用方法,我們也對多路復用進行了封裝優化,使其更易用,其中我們使用了一下Thrift提供的類,我們並沒有詳細的講解。我們將在下一節講thrift的框架設計時來對這些類進行說明。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM