Thrift全面介紹


 
簡介
Thrift是一個軟件框架,用來進行可擴展且跨語言的服務的開發。它結合了功能強大的軟件堆棧和 代碼生成引擎,以構建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 這些編程語言間無縫結合的、高效的服務。
 
Thrift最初由facebook開發用做系統內各語言之間的RPC通信 。
2007年由facebook貢獻到apache基金 ,08年5月進入apache孵化器 。
支持多種語言之間的RPC方式的通信:php語言client可以構造一個對象,調用相應的服務方法來調用java語言的服務 ,跨越語言的C/S RPC調用 。
 
Thrift允許定義一個簡單的 定義文件中的數據類型和服務接口,以作為輸入文件, 編譯器生成代碼用來方便地生成RPC客戶端和服務器通信的無縫跨編程語言。
 
安裝
使用Homebrew來安裝thrift。
-ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
-brew install thrift
 
 
一個小實例
 
以UserService為例,描述一下使用thrift的方式,以及其原理。

service.thrift

  1. struct User {  
  2.     1:i64 id,  
  3.     2:string name,  
  4.     3:i64 timestamp,  
  5.     4:bool vip    
  6. }  
  7.   
  8. service UserService {  
  9.     User getById(1:i64 id)  
  10. }  
 
你可以將自己的Java服務通過".thrift"文件描述出來,並提供給服務消費端,那么消費端即可以生成自己的API文件。Thrift框架目前已經支持大部分主流的語言。需要注意,因為Thrift考慮到struct/service定義需要兼容多種語言的”風格",所以它只支持一些基本的數據類型(比如i32,i64,string等),以及service定義的方法不能重名,即使參數列表不同(並不是所有的語言都能像JAVA一樣支持重載)。
 
生成API文件
 
首先下載和安裝thrift客戶端,比如在windows平台下,下載thrift.exe。不過此處需要提醒,不同的thrift客戶端版本生成的API可能不兼容。本例使用thrift-0.9.0.exe,通過"--gen"指定生成API所適配的語言。本實例為生成java客戶端API。
Java代碼  收藏代碼
  1. //windows平台下,將API文件輸出在service目錄下(此目錄需要存在)  
  2. > thrift.exe --gen java -o service service.thrift  
 
需要明確的是:Thrift和其他RPC框架不同,thrift在生成的API文件中,已經描述了"調用過程"(即硬編碼),而不是像其他RPC那樣在運行時(runtime)動態解析方法調用或者參數。
 
UserService實現類
 
Java代碼   收藏代碼
  1. public class UserServiceImpl implements UserService.Iface {  
  2.     @Override  
  3.     public User getById(long id){  
  4.         System.out.println("invoke...id:" + id);  
  5.         return new User();//for test  
  6.     }  
  7. }  
 
實現類,需要放在Thrift server端。
 
原理簡析
 
User.java : thrift生成API的能力還是非常的有限,比如在struct中只能使用簡單的數據類型(不支持Date,Collection<?>等),不過我們能從User中看出,它生成的類實現了"Serializable"接口和"TBase”接口。其中Serializable接口表明這個類的實例是需要序列化之后在網絡中傳輸的,為了不干擾Java本身的序列化和反序列化機制,它還重寫了readObject和writeObject方法,不過這對thrift本身並沒有幫助。
 
TBase接口是thrift序列化和反序列化時使用的,它的兩個核心方法:read和write。在上述的thrift文件中,struct定義的每個屬性都有一個序號,比如1:id,那么thrift在序列化時,將會根據序號的順序依次將屬性的"名稱 + 值"寫入inputStream中,反序列化也是如此。(具體參見read和write的實現)。
Java代碼   收藏代碼
  1. //read方法逐個讀取字段,按照"索引",最終將"struct"對象封裝完畢.  
  2. //write方法也非常類似,按照"索引"順序逐個輸出到流中.  
  3. while (true){  
  4.         schemeField = iprot.readFieldBegin();  
  5.         if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {   
  6.           break;  
  7.         }  
  8.         switch (schemeField.id) {  
  9.           case 1: // ID  
  10.             if (schemeField.type == org.apache.thrift.protocol.TType.I32) {  
  11.               struct.id = iprot.readI32();  
  12.               struct.setIdIsSet(true);  
  13.             } else {   
  14.               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);  
  15.             }  
  16.             break;  
  17.           case 2: // NAME  
  18.           ..  
  19.         }  
  20. }  
 
因為thrift的序列化和反序列化實例數據時,是根據"屬性序號"進行,這可以保證數據在inputstream和outputstream中順序是嚴格的, 此外每個struct中"序號"不能重復,但是可以不需要從"1"開始.如果"序號"有重復,將導致無法生成API文件.這一點也要求API開發者,如果更改了thrift文件中的struct定義,需要重新生成客戶端API,否則服務將無法繼續使用(可能報錯,也可能數據錯誤).thrift序列化/反序列化的過程和JAVA自帶的序列化機制不同,它將不會攜帶額外的class結構,此外thrift這種序列化機制更加適合網絡傳輸,而且性能更加高效.
 
UserService.Client:  在生成的UserService中,有個Client靜態類,這個類就是一個典型的代理類,此類已經實現了UserService的所有方法。開發者需要使用Client類中的API方法與Thrift server端交互,它將負責與Thrift server的Socket鏈接中,發送請求和接收響應。
 
需要注意的時,每次Client方法調用,都會在一個Socket鏈接中進行。這就意味着,在使用Client消費服務之前,需要和Thrift server建立有效的TCP鏈接。(稍后代碼示例)
 
1) 發送請求:
Java代碼   收藏代碼
  1. //參見:TServiceClient  
  2. //API方法調用時,發送請求數據流  
  3. protected void sendBase(String methodName, TBase args) throws TException {  
  4.     oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_));//首先寫入"方法名稱"和"seqid_"  
  5.     args.write(oprot_);//序列化參數  
  6.     oprot_.writeMessageEnd();  
  7.     oprot_.getTransport().flush();  
  8. }  
  9.   
  10. protected void receiveBase(TBase result, String methodName) throws TException {  
  11.     TMessage msg = iprot_.readMessageBegin();//如果執行有異常  
  12.     if (msg.type == TMessageType.EXCEPTION) {  
  13.       TApplicationException x = TApplicationException.read(iprot_);  
  14.       iprot_.readMessageEnd();  
  15.       throw x;  
  16.     }//檢測seqid是否一致  
  17.     if (msg.seqid != seqid_) {  
  18.       throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");  
  19.     }  
  20.     result.read(iprot_);//反序列化  
  21.     iprot_.readMessageEnd();  
  22. }  
 
Thrift提供了簡單的容錯方式:每次方法調用,都會在Client端標記一個seqid,這是一個自增的本地ID,在TCP請求時將此seqid追加到流中,同時Server端響應時,也將此seqid原樣返回過來;這樣客戶端就可以根據此值用來判斷"請求--響應"是對應的,如果出現亂序,將會導致此請求以異常的方式結束。
 
2) 響應
Java代碼   收藏代碼
  1. //參考: TBaseProcessor.java  
  2. @Override  
  3. public boolean process(TProtocol in, TProtocol out) throws TException {  
  4.     TMessage msg = in.readMessageBegin();  
  5.     ProcessFunction fn = processMap.get(msg.name);//根據方法名,查找"內部類"  
  6.     if (fn == null) {  
  7.       TProtocolUtil.skip(in, TType.STRUCT);  
  8.       in.readMessageEnd();  
  9.       TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");  
  10.       out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));  
  11.       x.write(out);//序列化響應結果,直接輸出  
  12.       out.writeMessageEnd();  
  13.       out.getTransport().flush();  
  14.       return true;  
  15.     }  
  16.     fn.process(msg.seqid, in, out, iface);  
  17.     return true;  
  18. }  
 
thrift生成的UserService.Processor類,就是server端用來處理請求過程的"代理類";server端從socket中讀取請求需要調用的"方法名" +參數列表,並交付給Processor類處理;和其他的RPC調用不同的時,thrift並沒有使用類似於"反射機制"的方式來調用方法,而是將UserService的每個方法生成一個"內部類":
Java代碼   收藏代碼
  1. public static class getById<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getById_args> {  
  2.   public getById() {  
  3.     super("getById");//其中getById為標識符  
  4.   }  
  5.   
  6.   public getById_args getEmptyArgsInstance() {  
  7.     return new getById_args();  
  8.   }  
  9.   
  10.   protected boolean isOneway() {  
  11.     return false;  
  12.   }  
  13.   //實際處理方法  
  14.   public getById_result getResult(I iface, getById_args args) throws org.apache.thrift.TException {  
  15.     getById_result result = new getById_result();  
  16.     result.success = iface.getById(args.id);  
  17.     return result;  
  18.   }  
  19. }  
 
這個"內部類",將會在Processor初始化的時候,放入到一個map中,此后即可以通過"方法名"查找,然后調用其"getResult"方法了.
Java代碼   收藏代碼
  1. public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {  
  2.   
  3.     public Processor(I iface) {  
  4.       super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));  
  5.     }  
  6.   
  7.     protected Processor(I iface, Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {  
  8.       super(iface, getProcessMap(processMap));  
  9.     }  
  10.   
  11.     private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {  
  12.       //放入map  
  13.       processMap.put("getById", new getById());  
  14.       return processMap;  
  15.     }  
  16.     ....  
  17. }  
 
3) Server端Socket管理和執行策略
Java代碼   收藏代碼
  1. TThreadPoolServer  
  2. public void serve() {  
  3.     try {  
  4.       //啟動服務  
  5.       serverTransport_.listen();  
  6.     } catch (TTransportException ttx) {  
  7.       LOGGER.error("Error occurred during listening.", ttx);  
  8.       return;  
  9.     }  
  10.   
  11.     // Run the preServe event  
  12.     if (eventHandler_ != null) {  
  13.       eventHandler_.preServe();  
  14.     }  
  15.   
  16.     stopped_ = false;  
  17.     setServing(true);  
  18.     //循環,直到被關閉  
  19.     while (!stopped_) {  
  20.       int failureCount = 0;  
  21.       try {  
  22.         //accept客戶端Socket鏈接,  
  23.         //對於每個新鏈接,將會封裝成runnable,並提交給線程或者線程池中運行.  
  24.         TTransport client = serverTransport_.accept();  
  25.         WorkerProcess wp = new WorkerProcess(client);  
  26.         executorService_.execute(wp);  
  27.       } catch (TTransportException ttx) {  
  28.         if (!stopped_) {  
  29.           ++failureCount;  
  30.           LOGGER.warn("Transport error occurred during acceptance of message.", ttx);  
  31.         }  
  32.       }  
  33.     }  
  34.     //....  
  35. }  
 
Thrift Server端,設計思路也非常的直接。當前Service server啟動之后,將會以阻塞的方式偵聽Socket鏈接(代碼參考TThreadPoolServer),每建立一個Socket鏈接,都會將此Socket經過封裝之后,放入線程池中,本質上也是一個Socket鏈接對應一個Worker Thread。這個Thread只會處理此Socket中的所有數據請求,直到Socket關閉。
Java代碼   收藏代碼
  1. //參考:WorkerProcess  
  2. while (true) {  
  3.   
  4.     if (eventHandler != null) {  
  5.       eventHandler.processContext(connectionContext, inputTransport, outputTransport);  
  6.     }  
  7.   
  8.     if(stopped_ || !processor.process(inputProtocol, outputProtocol)) {  
  9.       break;  
  10.     }  
  11. }  
 
當有Socket鏈接不是很多的時候,TThreadPoolServer並不會有太大的性能問題,可以通過指定ThreadPool中線程的個數進行簡單的調優。如果Socket鏈接很多,我們只能使用TThreadedSelectorServer來做支撐,TThreadedSelectorServer內部基於NIO模式,具有異步的特性,可以極大的提升server端的並發能力;不過在絕大多數情況下,在thrift中使用"異步"似乎不太容易讓人接受,畢竟這意味着Client端需要阻塞,並且在高並發環境中這個阻塞時間是不可控的。但SelecorServer確實可以有效的提升Server的並發能力,而且在一定程度上可以提升吞吐能力,這或許是我們優化Thrift Server比較可靠的方式之一。
 
Client端代碼示例
Java代碼   收藏代碼
  1. public class UserServiceClient {  
  2.   
  3.     public void startClient() {  
  4.         TTransport transport;  
  5.         try {  
  6.             transport = new TSocket("localhost", 1234);  
  7.             TProtocol protocol = new TBinaryProtocol(transport);  
  8.             UserService.Client client = new UserService.Client(protocol);  
  9.             transport.open();  
  10.             User user = client.getById(1000);  
  11.             ////  
  12.             transport.close();  
  13.         } catch (TTransportException e) {  
  14.             e.printStackTrace();  
  15.         } catch (TException e) {  
  16.             e.printStackTrace();  
  17.         }  
  18.     }  
  19.   
  20. }  
 
Server端代碼示例 
Java代碼   收藏代碼
  1. public class Server {  
  2.     public void startServer() {  
  3.         try {  
  4.             TServerSocket serverTransport = new TServerSocket(1234);  
  5.             UserService.Processor process = new Processor(new UserServiceImpl());  
  6.             Factory portFactory = new TBinaryProtocol.Factory(true, true);  
  7.             Args args = new Args(serverTransport);  
  8.             args.processor(process);  
  9.             args.protocolFactory(portFactory);  
  10.             TServer server = new TThreadPoolServer(args);  
  11.             server.serve();  
  12.         } catch (TTransportException e) {  
  13.             e.printStackTrace();  
  14.         }  
  15.     }  
  16. }  
 
到這里,你就會發現,一個service,需要server端啟動一個ServerSocket,如果你有很多service,那么你需要讓這些service盡可能的分布在不同的物理server上,否則一個物理server上運行太多的ServerSocket進程並不是一件讓人愉快的事情.。或者你讓幾個service整合成一個。
 
問題總沒有想象的那么簡單,其實service被拆分的粒度越細,越容易被部署和擴展,對於負載均衡就更加有利。如何讓一個service分布式部署,稍后再繼續分享。
 
總結
1) Thrift文件定義struct和service API,此文件可以被其他語言生成API文件或者類文件。
2) 使用Thrift客戶端生成API文件。
3) Java服務端(即服務提供端),實現service功能。
4) 服務端將server發布成一個Thrift server:即將service嵌入到一個ServerSocket中。
5) 客戶端啟動Socket,並和Thrift server建立TCP連接,並使用Client代理類操作遠程接口。
 
服務端開發
 
Thrift服務server端,其實就是一個ServerSocket線程+處理器,當Thrift-client端建立鏈接之后,處理器負責解析socket流信息,並根據其指定的"方法名"+參數列表,來調用"服務實現類”的方法,並將執行結果(或者異常)寫入到socket中。
 
一個server,就需要創建一個ServerSocket,並偵聽本地的一個端口,這種情況對分布式部署,有一些額外的要求:client端需要知道一個"service"被部署在了那些server上。
 
設計思路:
1) 每個server內部采用threadPool的方式,來提升並發能力.
2) 當server啟動成功后,向zookeeper注冊服務節點,此后client端就可以"感知到"服務的狀態
3) 通過spring的方式,配置thrift-server服務類.
其中x注冊是可選選項
 
1.pom.xml
 
  1. <dependencies>  
  2.     <dependency>  
  3.         <groupId>org.springframework</groupId>  
  4.         <artifactId>spring-context</artifactId>  
  5.         <version>3.0.7.RELEASE</version>  
  6.     </dependency>  
  7.     <dependency>  
  8.         <groupId>org.apache.zookeeper</groupId>  
  9.         <artifactId>zookeeper</artifactId>  
  10.         <version>3.4.5</version>  
  11.         <!--<exclusions>-->  
  12.             <!--<exclusion>-->  
  13.                 <!--<groupId>log4j</groupId>-->  
  14.                 <!--<artifactId>log4j</artifactId>-->  
  15.             <!--</exclusion>-->  
  16.         <!--</exclusions>-->  
  17.     </dependency>  
  18.     <!--  
  19.     <dependency>  
  20.         <groupId>com.101tec</groupId>  
  21.         <artifactId>zkclient</artifactId>  
  22.         <version>0.4</version>  
  23.     </dependency>  
  24.     -->  
  25.     <dependency>  
  26.         <groupId>org.apache.thrift</groupId>  
  27.         <artifactId>libthrift</artifactId>  
  28.         <version>0.9.1</version>  
  29.     </dependency>  
  30.     <dependency>  
  31.         <groupId>org.apache.curator</groupId>  
  32.         <artifactId>curator-recipes</artifactId>  
  33.         <version>2.3.0</version>  
  34.     </dependency>  
  35.     <dependency>  
  36.         <groupId>commons-pool</groupId>  
  37.         <artifactId>commons-pool</artifactId>  
  38.         <version>1.6</version>  
  39.     </dependency>  
  40.   
  41. </dependencies>  
 
本實例,使用了apache-curator作為zookeeper客戶端.
 
2. spring-thrift-server.xml
 
  1. <!-- zookeeper -->  
  2. <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">  
  3.     <property name="connectString" value="127.0.0.1:2181"></property>  
  4.     <property name="namespace" value="demo/thrift-service"></property>  
  5. </bean>  
  6. <bean id="sericeAddressReporter" class="com.demo.thrift.support.impl.DynamicAddressReporter" destroy-method="close">  
  7.     <property name="zookeeper" ref="thriftZookeeper"></property>  
  8. </bean>  
  9. <bean id="userService" class="com.demo.service.UserServiceImpl"/>  
  10. <bean class="com.demo.thrift.ThriftServiceServerFactory" destroy-method="close">  
  11.     <property name="service" ref="userService"></property>  
  12.     <property name="configPath" value="UserServiceImpl"></property>  
  13.     <property name="port" value="9090"></property>  
  14.     <property name="addressReporter" ref="sericeAddressReporter"></property>  
  15. </bean>  
 
3. ThriftServiceServerFactory.java
 
此類嚴格上說並不是一個工廠類,它的主要作用就是封裝指定的"service" ,然后啟動一個server的過程,其中"service"屬性表示服務的實現類,addressReporter表示當server啟動成功后,需要指定的操作(比如,向zookeeper發送service的IP信息).
 
究竟當前server的ip地址是多少,在不同的設計中,有所不同,比如:有些管理員喜歡將本機的IP地址寫入到os下的某個文件中,如果上層應用需要獲取可靠的IP信息,就需要讀取這個文件...你可以實現自己的ThriftServerIpTransfer來獲取當前server的IP.
 
為了減少xml中的配置信息,在factory中,使用了反射機制來構建"Processor"類.
 
  1. public class ThriftServiceServerFactory implements InitializingBean {  
  2.   
  3.     private Integer port;  
  4.   
  5.     private Integer priority = 1;// default  
  6.   
  7.     private Object service;// serice實現類  
  8.   
  9.     private ThriftServerIpTransfer ipTransfer;  
  10.   
  11.     private ThriftServerAddressReporter addressReporter;  
  12.       
  13.     private ServerThread serverThread;  
  14.       
  15.     private String configPath;  
  16.   
  17.     public void setService(Object service) {  
  18.         this.service = service;  
  19.     }  
  20.   
  21.     public void setPriority(Integer priority) {  
  22.         this.priority = priority;  
  23.     }  
  24.   
  25.     public void setPort(Integer port) {  
  26.         this.port = port;  
  27.     }  
  28.   
  29.     public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {  
  30.         this.ipTransfer = ipTransfer;  
  31.     }  
  32.   
  33.     public void setAddressReporter(ThriftServerAddressReporter addressReporter) {  
  34.         this.addressReporter = addressReporter;  
  35.     }  
  36.       
  37.   
  38.     public void setConfigPath(String configPath) {  
  39.         this.configPath = configPath;  
  40.     }  
  41.   
  42.     @Override  
  43.     public void afterPropertiesSet() throws Exception {  
  44.         if (ipTransfer == null) {  
  45.             ipTransfer = new LocalNetworkIpTransfer();  
  46.         }  
  47.         String ip = ipTransfer.getIp();  
  48.         if (ip == null) {  
  49.             throw new NullPointerException("cant find server ip...");  
  50.         }  
  51.         String hostname = ip + ":" + port + ":" + priority;  
  52.         Class serviceClass = service.getClass();  
  53.         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
  54.         Class<?>[] interfaces = serviceClass.getInterfaces();  
  55.         if (interfaces.length == 0) {  
  56.             throw new IllegalClassFormatException("service-class should implements Iface");  
  57.         }  
  58.   
  59.         // reflect,load "Processor";  
  60.         Processor processor = null;  
  61.         for (Class clazz : interfaces) {  
  62.             String cname = clazz.getSimpleName();  
  63.             if (!cname.equals("Iface")) {  
  64.                 continue;  
  65.             }  
  66.             String pname = clazz.getEnclosingClass().getName() + "$Processor";  
  67.             try {  
  68.                 Class pclass = classLoader.loadClass(pname);  
  69.                 if (!pclass.isAssignableFrom(Processor.class)) {  
  70.                     continue;  
  71.                 }  
  72.                 Constructor constructor = pclass.getConstructor(clazz);  
  73.                 processor = (Processor) constructor.newInstance(service);  
  74.                 break;  
  75.             } catch (Exception e) {  
  76.                 //  
  77.             }  
  78.         }  
  79.   
  80.         if (processor == null) {  
  81.             throw new IllegalClassFormatException("service-class should implements Iface");  
  82.         }  
  83.         //需要單獨的線程,因為serve方法是阻塞的.  
  84.         serverThread = new ServerThread(processor, port);  
  85.         serverThread.start();  
  86.         // report  
  87.         if (addressReporter != null) {  
  88.             addressReporter.report(configPath, hostname);  
  89.         }  
  90.   
  91.     }  
  92.   
  93.     class ServerThread extends Thread {  
  94.         private TServer server;  
  95.   
  96.         ServerThread(Processor processor, int port) throws Exception {  
  97.             TServerSocket serverTransport = new TServerSocket(port);  
  98.             Factory portFactory = new TBinaryProtocol.Factory(true, true);  
  99.             Args args = new Args(serverTransport);  
  100.             args.processor(processor);  
  101.             args.protocolFactory(portFactory);  
  102.             server = new TThreadPoolServer(args);  
  103.         }  
  104.   
  105.         @Override  
  106.         public void run(){  
  107.             try{  
  108.                 server.serve();  
  109.             }catch(Exception e){  
  110.                 //  
  111.             }  
  112.         }  
  113.           
  114.         public void stopServer(){  
  115.             server.stop();  
  116.         }  
  117.     }  
  118.   
  119.     public void close() {  
  120.         serverThread.stopServer();  
  121.     }  
  122.   
  123. }  
 
4. DynamicAddressReporter.java
 
在ThriftServiceServerFactory中,有個可選的屬性:addressReporter, DynamicAddressReporter提供了向zookeeper注冊service信息的能力,當server啟動正常后,把server的IP + port發送到zookeeper中;那么此后服務消費client,就可以從zookeeper中獲取server列表,並與它們建立鏈接(池).這樣client端只需要關注zookeeper的節點名稱即可,不需要配置大量的ip+port.
  1. public class DynamicAddressReporter implements ThriftServerAddressReporter {  
  2.       
  3.     private CuratorFramework zookeeper;  
  4.       
  5.     public DynamicAddressReporter(){}  
  6.       
  7.     public DynamicAddressReporter(CuratorFramework zookeeper){  
  8.         this.zookeeper = zookeeper;  
  9.     }  
  10.   
  11.   
  12.     public void setZookeeper(CuratorFramework zookeeper) {  
  13.         this.zookeeper = zookeeper;  
  14.     }  
  15.   
  16.     @Override  
  17.     public void report(String service, String address) throws Exception {  
  18.         if(zookeeper.getState() == CuratorFrameworkState.LATENT){  
  19.             zookeeper.start();  
  20.             zookeeper.newNamespaceAwareEnsurePath(service);  
  21.         }  
  22.         zookeeper.create()  
  23.             .creatingParentsIfNeeded()  
  24.             .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)  
  25.             .forPath(service +"/i_",address.getBytes("utf-8"));  
  26.     }  
  27.       
  28.       
  29.     public void close(){  
  30.         zookeeper.close();  
  31.     }  
  32.   
  33. }  

5. 測試類

  1. public class ServiceMain {  
  2.   
  3.     /** 
  4.      * @param args 
  5.      */  
  6.     public static void main(String[] args) {  
  7.         try {  
  8.             ApplicationContext context = new ClassPathXmlApplicationContext("spring-thrift-server.xml");  
  9.             Thread.sleep(3000000);  
  10.         } catch (Exception e) {  
  11.             e.printStackTrace();  
  12.         }  
  13.   
  14.     }  
  15.   
  16. }  
 
客戶端開發
 
Thrift-client作為服務消費端,由於thrift使用socket通訊,因此它需要面對幾個問題:
1) client端需要知道server端的IP+port,如果是分布式部署,還需要知道所有server的IP+port列表。
2) client為了提升性能,不可能只使用一個socket來處理並發請求,當然也不能每個請求都創建一個socket;我們需要使用連接池方案。
3) 對於java開發工程師而言,基於spring配置thrift服務,可以提供很多的便利。
4) 基於zookeeper配置管理,那么client端就不需要"硬編碼"的配置server的ip+port,可以使用zookeeper來推送每個service的服務地址。
5) 因為thrift-client端不使用連接池的話,將不能有效的提高並發能力,本文重點描述看如何使用thrift-client連接池。
 
1. pom.xml
 
  1. <dependencies>  
  2.     <dependency>  
  3.         <groupId>org.springframework</groupId>  
  4.         <artifactId>spring-context</artifactId>  
  5.         <version>3.0.7.RELEASE</version>  
  6.     </dependency>  
  7.     <dependency>  
  8.         <groupId>org.apache.zookeeper</groupId>  
  9.         <artifactId>zookeeper</artifactId>  
  10.         <version>3.4.5</version>  
  11.         <!--<exclusions>-->  
  12.             <!--<exclusion>-->  
  13.                 <!--<groupId>log4j</groupId>-->  
  14.                 <!--<artifactId>log4j</artifactId>-->  
  15.             <!--</exclusion>-->  
  16.         <!--</exclusions>-->  
  17.     </dependency>  
  18.     <!--  
  19.     <dependency>  
  20.         <groupId>com.101tec</groupId>  
  21.         <artifactId>zkclient</artifactId>  
  22.         <version>0.4</version>  
  23.     </dependency>  
  24.     -->  
  25.     <dependency>  
  26.         <groupId>org.apache.thrift</groupId>  
  27.         <artifactId>libthrift</artifactId>  
  28.         <version>0.9.1</version>  
  29.     </dependency>  
  30.     <dependency>  
  31.         <groupId>org.apache.curator</groupId>  
  32.         <artifactId>curator-recipes</artifactId>  
  33.         <version>2.3.0</version>  
  34.     </dependency>  
  35.     <dependency>  
  36.         <groupId>commons-pool</groupId>  
  37.         <artifactId>commons-pool</artifactId>  
  38.         <version>1.6</version>  
  39.     </dependency>  
  40.   
  41. </dependencies>  

2. spring-thrift-client.xml

其中zookeeper作為可選項,開發者也可以通過制定serverAddress的方式指定server的地址.
 
  1. <!-- fixedAddress -->  
  2. <!--    
  3. <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory">  
  4.     <property name="service" value="com.demo.service.UserService"></property>  
  5.     <property name="serverAddress" value="127.0.0.1:9090:2"></property>  
  6.     <property name="maxActive" value="5"></property>  
  7.     <property name="idleTime" value="10000"></property>  
  8. </bean>  
  9. -->  
  10. <!-- zookeeper -->  
  11. <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">  
  12.     <property name="connectString" value="127.0.0.1:2181"></property>  
  13.     <property name="namespace" value="demo/thrift-service"></property>  
  14. </bean>  
  15. <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory" destroy-method="close">  
  16.     <property name="service" value="com.demo.service.UserService"></property>  
  17.     <property name="maxActive" value="5"></property>  
  18.     <property name="idleTime" value="1800000"></property>  
  19.     <property name="addressProvider">  
  20.         <bean class="com.demo.thrift.support.impl.DynamicAddressProvider">  
  21.             <property name="configPath" value="UserServiceImpl"></property>  
  22.             <property name="zookeeper" ref="thriftZookeeper"></property>  
  23.         </bean>  
  24.     </property>  
  25. </bean>  
 

3. ThriftServiceClientProxyFactory.java

因為我們要在client端使用連接池方案,那么就需要對client的方法調用過程,進行代理,這個類,就是維護了一個"Client"代理類,並在方法調用時,從"對象池"中取出一個"Client"對象,並在方法實際調用結束后歸還給"對象池".  
  1. @SuppressWarnings("rawtypes")  
  2. public class ThriftServiceClientProxyFactory implements FactoryBean,InitializingBean {  
  3.   
  4.     private String service;  
  5.   
  6.     private String serverAddress;  
  7.       
  8.     private Integer maxActive = 32;//最大活躍連接數  
  9.       
  10.     ////ms,default 3 min,鏈接空閑時間  
  11.     //-1,關閉空閑檢測  
  12.     private Integer idleTime = 180000;  
  13.     private ThriftServerAddressProvider addressProvider;  
  14.   
  15.     private Object proxyClient;  
  16.       
  17.   
  18.     public void setMaxActive(Integer maxActive) {  
  19.         this.maxActive = maxActive;  
  20.     }  
  21.   
  22.   
  23.     public void setIdleTime(Integer idleTime) {  
  24.         this.idleTime = idleTime;  
  25.     }  
  26.   
  27.   
  28.     public void setService(String service) {  
  29.         this.service = service;  
  30.     }  
  31.   
  32.   
  33.     public void setServerAddress(String serverAddress) {  
  34.         this.serverAddress = serverAddress;  
  35.     }  
  36.   
  37.   
  38.     public void setAddressProvider(ThriftServerAddressProvider addressProvider) {  
  39.         this.addressProvider = addressProvider;  
  40.     }  
  41.   
  42.     private Class objectClass;  
  43.       
  44.     private GenericObjectPool<TServiceClient> pool;  
  45.       
  46.     private PoolOperationCallBack callback = new PoolOperationCallBack() {  
  47.           
  48.         @Override  
  49.         public void make(TServiceClient client) {  
  50.             System.out.println("create");  
  51.               
  52.         }  
  53.           
  54.         @Override  
  55.         public void destroy(TServiceClient client) {  
  56.             System.out.println("destroy");  
  57.               
  58.         }  
  59.     };  
  60.   
  61.     @Override  
  62.     public void afterPropertiesSet() throws Exception {  
  63.         if(serverAddress != null){  
  64.             addressProvider = new FixedAddressProvider(serverAddress);  
  65.         }  
  66.         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
  67.         //加載Iface接口  
  68.         objectClass = classLoader.loadClass(service + "$Iface");  
  69.         //加載Client.Factory類  
  70.         Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>)classLoader.loadClass(service + "$Client$Factory");  
  71.         TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();  
  72.         ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(addressProvider, clientFactory,callback);  
  73.         GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();  
  74.         poolConfig.maxActive = maxActive;  
  75.         poolConfig.minIdle = 0;  
  76.         poolConfig.minEvictableIdleTimeMillis = idleTime;  
  77.         poolConfig.timeBetweenEvictionRunsMillis = idleTime/2L;  
  78.         pool = new GenericObjectPool<TServiceClient>(clientPool,poolConfig);  
  79.         proxyClient = Proxy.newProxyInstance(classLoader,new Class[]{objectClass},new InvocationHandler() {  
  80.             @Override  
  81.             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
  82.                 //  
  83.                 TServiceClient client = pool.borrowObject();  
  84.                 try{  
  85.                     return method.invoke(client, args);  
  86.                 }catch(Exception e){  
  87.                     throw e;  
  88.                 }finally{  
  89.                     pool.returnObject(client);  
  90.                 }  
  91.             }  
  92.         });  
  93.     }  
  94.   
  95.     @Override  
  96.     public Object getObject() throws Exception {  
  97.         return proxyClient;  
  98.     }  
  99.   
  100.     @Override  
  101.     public Class<?> getObjectType() {  
  102.         return objectClass;  
  103.     }  
  104.   
  105.     @Override  
  106.     public boolean isSingleton() {  
  107.         return true;  //To change body of implemented methods use File | Settings | File Templates.  
  108.     }  
  109.       
  110.     public void close(){  
  111.         if(addressProvider != null){  
  112.             addressProvider.close();  
  113.         }  
  114.     }  
  115. }  

4. ThriftClientPoolFactory.java

"Client"對象池,對象池中是已經實例化的Client對象,Client對象負責與Thrift server通信.
  1. /** 
  2.  * 連接池,thrift-client for spring 
  3.  */  
  4. public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient>{  
  5.   
  6.     private final ThriftServerAddressProvider addressProvider;  
  7.       
  8.     private final TServiceClientFactory<TServiceClient> clientFactory;  
  9.       
  10.     private PoolOperationCallBack callback;  
  11.   
  12.     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory) throws Exception {  
  13.         this.addressProvider = addressProvider;  
  14.         this.clientFactory = clientFactory;  
  15.     }  
  16.       
  17.     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory,PoolOperationCallBack callback) throws Exception {  
  18.         this.addressProvider = addressProvider;  
  19.         this.clientFactory = clientFactory;  
  20.         this.callback = callback;  
  21.     }  
  22.   
  23.   
  24.   
  25.     @Override  
  26.     public TServiceClient makeObject() throws Exception {  
  27.         InetSocketAddress address = addressProvider.selector();  
  28.         TSocket tsocket = new TSocket(address.getHostName(),address.getPort());  
  29.         TProtocol protocol = new TBinaryProtocol(tsocket);  
  30.         TServiceClient client = this.clientFactory.getClient(protocol);  
  31.         tsocket.open();  
  32.         if(callback != null){  
  33.             try{  
  34.                 callback.make(client);  
  35.             }catch(Exception e){  
  36.                 //  
  37.             }  
  38.         }  
  39.         return client;  
  40.     }  
  41.   
  42.     public void destroyObject(TServiceClient client) throws Exception {  
  43.         if(callback != null){  
  44.             try{  
  45.                 callback.destroy(client);  
  46.             }catch(Exception e){  
  47.                 //  
  48.             }  
  49.         }  
  50.         TTransport pin = client.getInputProtocol().getTransport();  
  51.         pin.close();  
  52.     }  
  53.   
  54.     public boolean validateObject(TServiceClient client) {  
  55.         TTransport pin = client.getInputProtocol().getTransport();  
  56.         return pin.isOpen();  
  57.     }  
  58.       
  59.     static interface PoolOperationCallBack {  
  60.         //銷毀client之前執行  
  61.         void destroy(TServiceClient client);  
  62.         //創建成功是執行  
  63.         void make(TServiceClient client);  
  64.     }  
  65.   
  66. }  

5. DynamicAddressProvider.java

將zookeeper作為server地址的提供者,這樣客戶端就不需要再配置文件中指定一堆ip + port,而且當server服務有更新時,也不需要client端重新配置.
  1. /** 
  2.  * 可以動態獲取address地址,方案設計參考 
  3.  * 1) 可以間歇性的調用一個web-service來獲取地址 
  4.  * 2) 可以使用事件監聽機制,被動的接收消息,來獲取最新的地址(比如基於MQ,nio等) 
  5.  * 3) 可以基於zookeeper-watcher機制,獲取最新地址 
  6.  * <p/> 
  7.  * 本實例,使用zookeeper作為"config"中心,使用apache-curator方法庫來簡化zookeeper開發 
  8.  * 如下實現,僅供參考 
  9.  */  
  10. public class DynamicAddressProvider implements ThriftServerAddressProvider, InitializingBean {  
  11.   
  12.     private String configPath;  
  13.   
  14.     private PathChildrenCache cachedPath;  
  15.   
  16.     private CuratorFramework zookeeper;  
  17.       
  18.     //用來保存當前provider所接觸過的地址記錄  
  19.     //當zookeeper集群故障時,可以使用trace中地址,作為"備份"  
  20.     private Set<String> trace = new HashSet<String>();  
  21.   
  22.     private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();  
  23.   
  24.     private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();  
  25.       
  26.     private Object lock = new Object();  
  27.       
  28.     private static final Integer DEFAULT_PRIORITY = 1;  
  29.   
  30.     public void setConfigPath(String configPath) {  
  31.         this.configPath = configPath;  
  32.     }  
  33.   
  34.     public void setZookeeper(CuratorFramework zookeeper) {  
  35.         this.zookeeper = zookeeper;  
  36.     }  
  37.   
  38.     @Override  
  39.     public void afterPropertiesSet() throws Exception {  
  40.         //如果zk尚未啟動,則啟動  
  41.         if(zookeeper.getState() == CuratorFrameworkState.LATENT){  
  42.             zookeeper.start();  
  43.         }  
  44.         buildPathChildrenCache(zookeeper, configPath, true);  
  45.         cachedPath.start(StartMode.POST_INITIALIZED_EVENT);  
  46.     }  
  47.   
  48.     private void buildPathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {  
  49.         cachedPath = new PathChildrenCache(client, path, cacheData);  
  50.         cachedPath.getListenable().addListener(new PathChildrenCacheListener() {  
  51.             @Override  
  52.             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
  53.                 PathChildrenCacheEvent.Type eventType = event.getType();  
  54.                 switch (eventType) {  
  55. //                    case CONNECTION_RECONNECTED:  
  56. //                          
  57. //                        break;  
  58.                     case CONNECTION_SUSPENDED:  
  59.                     case CONNECTION_LOST:  
  60.                         System.out.println("Connection error,waiting...");  
  61.                         return;  
  62.                     default:  
  63.                         //  
  64.                 }  
  65.                 //任何節點的時機數據變動,都會rebuild,此處為一個"簡單的"做法.  
  66.                 cachedPath.rebuild();  
  67.                 rebuild();  
  68.             }  
  69.               
  70.             protected void rebuild() throws Exception {  
  71.                 List<ChildData> children = cachedPath.getCurrentData();  
  72.                 if (children == null || children.isEmpty()) {  
  73.                     //有可能所有的thrift server都與zookeeper斷開了鏈接  
  74.                     //但是,有可能,thrift client與thrift server之間的網絡是良好的  
  75.                     //因此此處是否需要清空container,是需要多方面考慮的.  
  76.                     container.clear();  
  77.                     System.out.println("thrift server-cluster error....");  
  78.                     return;  
  79.                 }  
  80.                 List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();  
  81.                 for (ChildData data : children) {  
  82.                     String address = new String(data.getData(), "utf-8");  
  83.                     current.addAll(transfer(address));  
  84.                     trace.add(address);  
  85.                 }  
  86.                 Collections.shuffle(current);  
  87.                 synchronized (lock) {  
  88.                     container.clear();  
  89.                     container.addAll(current);  
  90.                     inner.clear();  
  91.                     inner.addAll(current);  
  92.                       
  93.                 }  
  94.             }  
  95.         });  
  96.     }  
  97.     
  98.     private List<InetSocketAddress> transfer(String address){  
  99.         String[] hostname = address.split(":");  
  100.         Integer priority = DEFAULT_PRIORITY;  
  101.         if (hostname.length == 3) {  
  102.             priority = Integer.valueOf(hostname[2]);  
  103.         }  
  104.         String ip = hostname[0];  
  105.         Integer port = Integer.valueOf(hostname[1]);  
  106.         List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();  
  107.         for (int i = 0; i < priority; i++) {  
  108.             result.add(new InetSocketAddress(ip, port));  
  109.         }  
  110.         return result;  
  111.     }  
  112.   
  113.   
  114.     @Override  
  115.     public List<InetSocketAddress> getAll() {  
  116.         return Collections.unmodifiableList(container);  
  117.     }  
  118.   
  119.     @Override  
  120.     public synchronized InetSocketAddress selector() {  
  121.         if (inner.isEmpty()) {  
  122.             if(!container.isEmpty()){  
  123.                 inner.addAll(container);  
  124.             }else if(!trace.isEmpty()){  
  125.                 synchronized (lock) {  
  126.                     for(String hostname : trace){  
  127.                         container.addAll(transfer(hostname));  
  128.                     }  
  129.                     Collections.shuffle(container);  
  130.                     inner.addAll(container);  
  131.                 }  
  132.             }  
  133.         }  
  134.         return inner.poll();//null  
  135.     }  
  136.   
  137.   
  138.     @Override  
  139.     public void close() {  
  140.         try {  
  141.             cachedPath.close();  
  142.             zookeeper.close();  
  143.         } catch (Exception e) {  
  144.             //  
  145.         }  
  146.     }  
  147. }  
 
到此為止,我們的Thrift基本上就可以順利運行起來了。
 
使用ZooKeeper構建集群
通用辦法是使用apache-curator組件來支持thrift連接zk提供集群服務並且推送服務信息變化。
 
其他高級話題
 
對於Thrift服務化的改造,主要是客戶端,可以從如下幾個方面進行:
 
1.服務端的服務注冊,客戶端自動發現,無需手工修改配置,這里我們使用zookeeper,但由於zookeeper本身提供的客戶端使用較為復雜,因此采用curator-recipes工具類進行處理服務的注冊與發現。
2.客戶端使用連接池對服務調用進行管理,提升性能,這里我們使用Apache Commons項目commons-pool,可以大大減少代碼的復雜度。
3.關於Failover/LoadBalance,由於zookeeper的watcher,當服務端不可用是及時通知客戶端,並移除不可用的服務節點,而LoadBalance有很多算法,這里我們采用隨機加權方式,也是常有的負載算法,至於其他的算法介紹參考: 常見的負載均衡的基本算法
4.使thrift服務的注冊和發現可以基於spring配置,可以提供很多的便利。
5.其他的改造如:
1)通過動態代理實現client和server端的交互細節透明化,讓用戶只需通過服務方提供的接口進行訪問
2)Thrift通過兩種方式調用服務Client和Iface
  1. // *) Client API 調用  
  2. (EchoService.Client)client.echo("hello lilei");  ---(1)  
  3. // *) Service 接口 調用  
  4. (EchoService.Iface)service.echo("hello lilei");  ---(2)  
 
Client API的方式, 不推薦, 我們推薦Service接口的方式(服務化)。
 
下面我們來一一實現:
一、pom.xml引入依賴jar包
  1. <dependency>  
  2.             <groupId>org.apache.thrift</groupId>  
  3.             <artifactId>libthrift</artifactId>  
  4.             <version>0.9.2</version>  
  5.         </dependency>  
  6.         <dependency>  
  7.             <groupId>commons-pool</groupId>  
  8.             <artifactId>commons-pool</artifactId>  
  9.             <version>1.6</version>  
  10.         </dependency>  
  11.         <dependency>  
  12.             <groupId>org.springframework</groupId>  
  13.             <artifactId>spring-context</artifactId>  
  14.             <version>4.0.9.RELEASE</version>  
  15.         </dependency>  
  16.   
  17.         <dependency>  
  18.             <groupId>org.apache.zookeeper</groupId>  
  19.             <artifactId>zookeeper</artifactId>  
  20.             <version>3.4.6</version>  
  21.         </dependency>  
  22.         <dependency>  
  23.             <groupId>org.apache.curator</groupId>  
  24.             <artifactId>curator-recipes</artifactId>  
  25.             <version>2.7.1</version>  
  26.         </dependency>  
 

二、使用zookeeper管理服務節點配置

RPC服務往平台化的方向發展, 會屏蔽掉更多的服務細節(服務的IP地址集群, 集群的擴容和遷移), 只暴露服務接口. 這部分的演化, 使得server端和client端完全的解耦合. 兩者的交互通過ConfigServer(MetaServer)的中介角色來搭線。

注: 該圖源自dubbo的官網
這邊借助Zookeeper來扮演該角色, server扮演發布者的角色, 而client扮演訂閱者的角色.

Zookeeper是分布式應用協作服務. 它實現了paxos的一致性算法, 在命名管理/配置推送/數據同步/主從切換方面扮演重要的角色。 其數據組織類似文件系統的目錄結構: 

每個節點被稱為znode, 為znode節點依據其特性, 又可以分為如下類型:
  1). PERSISTENT: 永久節點
  2). EPHEMERAL: 臨時節點, 會隨session(client disconnect)的消失而消失
  3). PERSISTENT_SEQUENTIAL: 永久節點, 其節點的名字編號是單調遞增的
  4). EPHEMERAL_SEQUENTIAL: 臨時節點, 其節點的名字編號是單調遞增的
  注: 臨時節點不能成為父節點
  Watcher觀察模式, client可以注冊對節點的狀態/內容變更的事件回調機制. 其Event事件的兩類屬性需要關注下:
  1). KeeperState: Disconnected,SyncConnected,Expired
  2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服務端:
  作為具體業務服務的RPC服務發布方, 對其自身的服務描述由以下元素構成.
  1). namespace: 命名空間,來區分不同應用 
  2). service: 服務接口, 采用發布方的類全名來表示
  3). version: 版本號
  借鑒了Maven的GAV坐標系, 三維坐標系更符合服務平台化的大環境. 
  *) 數據模型的設計
  具體RPC服務的注冊路徑為: /rpc/{namespace}/{service}/{version}, 該路徑上的節點都是永久節點
  RPC服務集群節點的注冊路徑為: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的節點是臨時節點.

1.定義Zookeeper的客戶端的管理

ZookeeperFactory.java
 
package cn.slimsmart.thrift.rpc.zookeeper;  
  1.   
  2. import org.apache.curator.framework.CuratorFramework;  
  3. import org.apache.curator.framework.CuratorFrameworkFactory;  
  4. import org.apache.curator.retry.ExponentialBackoffRetry;  
  5. import org.springframework.beans.factory.FactoryBean;  
  6. import org.springframework.util.StringUtils;  
  7.   
  8. /** 
  9.  * 獲取zookeeper客戶端鏈接 
  10.  */  
  11. public class ZookeeperFactory implements FactoryBean<CuratorFramework> {  
  12.   
  13.     private String zkHosts;  
  14.     // session超時  
  15.     private int sessionTimeout = 30000;  
  16.     private int connectionTimeout = 30000;  
  17.   
  18.     // 共享一個zk鏈接  
  19.     private boolean singleton = true;  
  20.   
  21.     // 全局path前綴,常用來區分不同的應用  
  22.     private String namespace;  
  23.   
  24.     private final static String ROOT = "rpc";  
  25.   
  26.     private CuratorFramework zkClient;  
  27.   
  28.     public void setZkHosts(String zkHosts) {  
  29.         this.zkHosts = zkHosts;  
  30.     }  
  31.   
  32.     public void setSessionTimeout(int sessionTimeout) {  
  33.         this.sessionTimeout = sessionTimeout;  
  34.     }  
  35.   
  36.     public void setConnectionTimeout(int connectionTimeout) {  
  37.         this.connectionTimeout = connectionTimeout;  
  38.     }  
  39.   
  40.     public void setSingleton(boolean singleton) {  
  41.         this.singleton = singleton;  
  42.     }  
  43.   
  44.     public void setNamespace(String namespace) {  
  45.         this.namespace = namespace;  
  46.     }  
  47.   
  48.     public void setZkClient(CuratorFramework zkClient) {  
  49.         this.zkClient = zkClient;  
  50.     }  
  51.   
  52.     @Override  
  53.     public CuratorFramework getObject() throws Exception {  
  54.         if (singleton) {  
  55.             if (zkClient == null) {  
  56.                 zkClient = create();  
  57.                 zkClient.start();  
  58.             }  
  59.             return zkClient;  
  60.         }  
  61.         return create();  
  62.     }  
  63.   
  64.     @Override  
  65.     public Class<?> getObjectType() {  
  66.         return CuratorFramework.class;  
  67.     }  
  68.   
  69.     @Override  
  70.     public boolean isSingleton() {  
  71.         return singleton;  
  72.     }  
  73.   
  74.     public CuratorFramework create() throws Exception {  
  75.         if (StringUtils.isEmpty(namespace)) {  
  76.             namespace = ROOT;  
  77.         } else {  
  78.             namespace = ROOT +"/"+ namespace;  
  79.         }  
  80.         return create(zkHosts, sessionTimeout, connectionTimeout, namespace);  
  81.     }  
  82.   
  83.     public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {  
  84.         CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();  
  85.         return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)  
  86.                 .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))  
  87.                 .defaultData(null).build();  
  88.     }  
  89.   
  90.     public void close() {  
  91.         if (zkClient != null) {  
  92.             zkClient.close();  
  93.         }  
  94.     }  
  95. }  
 

2.服務端注冊服務

由於服務端配置需要獲取本機的IP地址,因此定義IP獲取接口

ThriftServerIpResolve.java

 

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. /** 
  4.  *  
  5.  * 解析thrift-server端IP地址,用於注冊服務 
  6.  * 1) 可以從一個物理機器或者虛機的特殊文件中解析 
  7.  * 2) 可以獲取指定網卡序號的Ip 
  8.  * 3) 其他 
  9.  */  
  10. public interface ThriftServerIpResolve {  
  11.       
  12.     String getServerIp() throws Exception;  
  13.       
  14.     void reset();  
  15.       
  16.     //當IP變更時,將會調用reset方法  
  17.     static interface IpRestCalllBack{  
  18.         public void rest(String newIp);  
  19.     }  
  20. }  

可以對該接口做不通的實現,下面我們基於網卡獲取IP地址,也可以通過配置serverIp
ThriftServerIpLocalNetworkResolve.java

 

 

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.net.Inet6Address;  
  4. import java.net.InetAddress;  
  5. import java.net.NetworkInterface;  
  6. import java.net.SocketException;  
  7. import java.util.Enumeration;  
  8.   
  9. import org.slf4j.Logger;  
  10. import org.slf4j.LoggerFactory;  
  11.   
  12. /** 
  13.  * 解析網卡Ip 
  14.  * 
  15.  */  
  16. public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {  
  17.       
  18.     private Logger logger = LoggerFactory.getLogger(getClass());  
  19.   
  20.     //緩存  
  21.     private String serverIp;  
  22.       
  23.     public void setServerIp(String serverIp) {  
  24.         this.serverIp = serverIp;  
  25.     }  
  26.   
  27.     @Override  
  28.     public String getServerIp() {  
  29.         if (serverIp != null) {  
  30.             return serverIp;  
  31.         }  
  32.         // 一個主機有多個網絡接口  
  33.         try {  
  34.             Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();  
  35.             while (netInterfaces.hasMoreElements()) {  
  36.                 NetworkInterface netInterface = netInterfaces.nextElement();  
  37.                 // 每個網絡接口,都會有多個"網絡地址",比如一定會有lookback地址,會有siteLocal地址等.以及IPV4或者IPV6 .  
  38.                 Enumeration<InetAddress> addresses = netInterface.getInetAddresses();  
  39.                 while (addresses.hasMoreElements()) {  
  40.                     InetAddress address = addresses.nextElement();  
  41.                     if(address instanceof Inet6Address){  
  42.                         continue;  
  43.                     }  
  44.                     if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {  
  45.                         serverIp = address.getHostAddress();  
  46.                         logger.info("resolve server ip :"+ serverIp);  
  47.                         continue;  
  48.                     }  
  49.                 }  
  50.             }  
  51.         } catch (SocketException e) {  
  52.             e.printStackTrace();  
  53.         }  
  54.         return serverIp;  
  55.     }  
  56.   
  57.     @Override  
  58.     public void reset() {  
  59.         serverIp = null;  
  60.     }  
  61. }  

接下來我們定義發布服務接口,並實現將服務信息(服務接口、版本號,IP、port、weight)發布到zookeeper中。

ThriftServerAddressRegister.java

 

 

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. /** 
  4.  * 發布服務地址及端口到服務注冊中心,這里是zookeeper服務器 
  5.  */  
  6. public interface ThriftServerAddressRegister {  
  7.     /** 
  8.      * 發布服務接口 
  9.      * @param service 服務接口名稱,一個產品中不能重復 
  10.      * @param version 服務接口的版本號,默認1.0.0 
  11.      * @param address 服務發布的地址和端口 
  12.      */  
  13.     void register(String service,String version,String address);  
  14. }  

實現:ThriftServerAddressRegisterZookeeper.java

 

 

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.io.UnsupportedEncodingException;  
  4.   
  5. import org.apache.curator.framework.CuratorFramework;  
  6. import org.apache.curator.framework.imps.CuratorFrameworkState;  
  7. import org.apache.zookeeper.CreateMode;  
  8. import org.slf4j.Logger;  
  9. import org.slf4j.LoggerFactory;  
  10. import org.springframework.util.StringUtils;  
  11.   
  12. import cn.slimsmart.thrift.rpc.ThriftException;  
  13.   
  14. /** 
  15.  *  注冊服務列表到Zookeeper 
  16.  */  
  17. public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{  
  18.       
  19.     private Logger logger = LoggerFactory.getLogger(getClass());  
  20.       
  21.     private CuratorFramework zkClient;  
  22.       
  23.     public ThriftServerAddressRegisterZookeeper(){}  
  24.       
  25.     public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){  
  26.         this.zkClient = zkClient;  
  27.     }  
  28.   
  29.     public void setZkClient(CuratorFramework zkClient) {  
  30.         this.zkClient = zkClient;  
  31.     }  
  32.   
  33.     @Override  
  34.     public void register(String service, String version, String address) {  
  35.         if(zkClient.getState() == CuratorFrameworkState.LATENT){  
  36.             zkClient.start();  
  37.         }  
  38.         if(StringUtils.isEmpty(version)){  
  39.             version="1.0.0";  
  40.         }  
  41.         //臨時節點  
  42.         try {  
  43.             zkClient.create()  
  44.                 .creatingParentsIfNeeded()  
  45.                 .withMode(CreateMode.EPHEMERAL)  
  46.                 .forPath("/"+service+"/"+version+"/"+address);  
  47.         } catch (UnsupportedEncodingException e) {  
  48.             logger.error("register service address to zookeeper exception:{}",e);  
  49.             throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);  
  50.         } catch (Exception e) {  
  51.             logger.error("register service address to zookeeper exception:{}",e);  
  52.             throw new ThriftException("register service address to zookeeper exception:{}", e);  
  53.         }  
  54.     }  
  55.       
  56.     public void close(){  
  57.         zkClient.close();  
  58.     }  
  59. }  
 
 

3.客戶端發現服務

定義獲取服務地址接口

ThriftServerAddressProvider.java

 

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.net.InetSocketAddress;  
  4. import java.util.List;  
  5.   
  6. /** 
  7.  * thrift server-service地址提供者,以便構建客戶端連接池 
  8.  */  
  9. public interface ThriftServerAddressProvider {  
  10.       
  11.     //獲取服務名稱  
  12.     String getService();  
  13.   
  14.     /** 
  15.      * 獲取所有服務端地址 
  16.      * @return 
  17.      */  
  18.     List<InetSocketAddress> findServerAddressList();  
  19.   
  20.     /** 
  21.      * 選取一個合適的address,可以隨機獲取等' 
  22.      * 內部可以使用合適的算法. 
  23.      * @return 
  24.      */  
  25.     InetSocketAddress selector();  
  26.   
  27.     void close();  
  28. }  

基於zookeeper服務地址自動發現實現:ThriftServerAddressProviderZookeeper.java

  1. package cn.slimsmart.thrift.rpc.zookeeper;  
  2.   
  3. import java.net.InetSocketAddress;  
  4. import java.util.ArrayList;  
  5. import java.util.Collections;  
  6. import java.util.HashSet;  
  7. import java.util.LinkedList;  
  8. import java.util.List;  
  9. import java.util.Queue;  
  10. import java.util.Set;  
  11.   
  12. import org.apache.curator.framework.CuratorFramework;  
  13. import org.apache.curator.framework.imps.CuratorFrameworkState;  
  14. import org.apache.curator.framework.recipes.cache.ChildData;  
  15. import org.apache.curator.framework.recipes.cache.PathChildrenCache;  
  16. import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;  
  17. import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;  
  18. import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;  
  19. import org.slf4j.Logger;  
  20. import org.slf4j.LoggerFactory;  
  21. import org.springframework.beans.factory.InitializingBean;  
  22.   
  23. /** 
  24.  * 使用zookeeper作為"config"中心,使用apache-curator方法庫來簡化zookeeper開發 
  25.  */  
  26. public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {  
  27.   
  28.     private Logger logger = LoggerFactory.getLogger(getClass());  
  29.   
  30.     // 注冊服務  
  31.     private String service;  
  32.     // 服務版本號  
  33.     private String version = "1.0.0";  
  34.   
  35.     private PathChildrenCache cachedPath;  
  36.   
  37.     private CuratorFramework zkClient;  
  38.   
  39.     // 用來保存當前provider所接觸過的地址記錄  
  40.     // 當zookeeper集群故障時,可以使用trace中地址,作為"備份"  
  41.     private Set<String> trace = new HashSet<String>();  
  42.   
  43.     private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();  
  44.   
  45.     private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();  
  46.   
  47.     private Object lock = new Object();  
  48.   
  49.     // 默認權重  
  50.     private static final Integer DEFAULT_WEIGHT = 1;  
  51.   
  52.     public void setService(String service) {  
  53.         this.service = service;  
  54.     }  
  55.   
  56.     public void setVersion(String version) {  
  57.         this.version = version;  
  58.     }  
  59.   
  60.     public ThriftServerAddressProviderZookeeper() {  
  61.     }  
  62.   
  63.     public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {  
  64.         this.zkClient = zkClient;  
  65.     }  
  66.   
  67.     public void setZkClient(CuratorFramework zkClient) {  
  68.         this.zkClient = zkClient;  
  69.     }  
  70.   
  71.     @Override  
  72.     public void afterPropertiesSet() throws Exception {  
  73.         // 如果zk尚未啟動,則啟動  
  74.         if (zkClient.getState() == CuratorFrameworkState.LATENT) {  
  75.             zkClient.start();  
  76.         }  
  77.         buildPathChildrenCache(zkClient, getServicePath(), true);  
  78.         cachedPath.start(StartMode.POST_INITIALIZED_EVENT);  
  79.     }  
  80.   
  81.     private String getServicePath(){  
  82.         return "/" + service + "/" + version;  
  83.     }  
  84.     private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {  
  85.         cachedPath = new PathChildrenCache(client, path, cacheData);  
  86.         cachedPath.getListenable().addListener(new PathChildrenCacheListener() {  
  87.             @Override  
  88.             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
  89.                 PathChildrenCacheEvent.Type eventType = event.getType();  
  90.                 switch (eventType) {  
  91.                 case CONNECTION_RECONNECTED:  
  92.                     logger.info("Connection is reconection.");  
  93.                     break;  
  94.                 case CONNECTION_SUSPENDED:  
  95.                     logger.info("Connection is suspended.");  
  96.                     break;  
  97.                 case CONNECTION_LOST:  
  98.                     logger.warn("Connection error,waiting...");  
  99.                     return;  
  100.                 default:  
  101.                     //  
  102.                 }  
  103.                 // 任何節點的時機數據變動,都會rebuild,此處為一個"簡單的"做法.  
  104.                 cachedPath.rebuild();  
  105.                 rebuild();  
  106.             }  
  107.   
  108.             protected void rebuild() throws Exception {  
  109.                 List<ChildData> children = cachedPath.getCurrentData();  
  110.                 if (children == null || children.isEmpty()) {  
  111.                     // 有可能所有的thrift server都與zookeeper斷開了鏈接  
  112.                     // 但是,有可能,thrift client與thrift server之間的網絡是良好的  
  113.                     // 因此此處是否需要清空container,是需要多方面考慮的.  
  114.                     container.clear();  
  115.                     logger.error("thrift server-cluster error....");  
  116.                     return;  
  117.                 }  
  118.                 List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();  
  119.                 String path = null;  
  120.                 for (ChildData data : children) {  
  121.                     path = data.getPath();  
  122.                     logger.debug("get path:"+path);  
  123.                     path = path.substring(getServicePath().length()+1);  
  124.                     logger.debug("get serviceAddress:"+path);  
  125.                     String address = new String(path.getBytes(), "utf-8");  
  126.                     current.addAll(transfer(address));  
  127.                     trace.add(address);  
  128.                 }  
  129.                 Collections.shuffle(current);  
  130.                 synchronized (lock) {  
  131.                     container.clear();  
  132.                     container.addAll(current);  
  133.                     inner.clear();  
  134.                     inner.addAll(current);  
  135.   
  136.                 }  
  137.             }  
  138.         });  
  139.     }  
  140.   
  141.     private List<InetSocketAddress> transfer(String address) {  
  142.         String[] hostname = address.split(":");  
  143.         Integer weight = DEFAULT_WEIGHT;  
  144.         if (hostname.length == 3) {  
  145.             weight = Integer.valueOf(hostname[2]);  
  146.         }  
  147.         String ip = hostname[0];  
  148.         Integer port = Integer.valueOf(hostname[1]);  
  149.         List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();  
  150.         // 根據優先級,將ip:port添加多次到地址集中,然后隨機取地址實現負載  
  151.         for (int i = 0; i < weight; i++) {  
  152.             result.add(new InetSocketAddress(ip, port));  
  153.         }  
  154.         return result;  
  155.     }  
  156.   
  157.     @Override  
  158.     public List<InetSocketAddress> findServerAddressList() {  
  159.         return Collections.unmodifiableList(container);  
  160.     }  
  161.   
  162.     @Override  
  163.     public synchronized InetSocketAddress selector() {  
  164.         if (inner.isEmpty()) {  
  165.             if (!container.isEmpty()) {  
  166.                 inner.addAll(container);  
  167.             } else if (!trace.isEmpty()) {  
  168.                 synchronized (lock) {  
  169.                     for (String hostname : trace) {  
  170.                         container.addAll(transfer(hostname));  
  171.                     }  
  172.                     Collections.shuffle(container);  
  173.                     inner.addAll(container);  
  174.                 }  
  175.             }  
  176.         }  
  177.         return inner.poll();  
  178.     }  
  179.   
  180.     @Override  
  181.     public void close() {  
  182.         try {  
  183.             cachedPath.close();  
  184.             zkClient.close();  
  185.         } catch (Exception e) {  
  186.         }  
  187.     }  
  188.   
  189.     @Override  
  190.     public String getService() {  
  191.         return service;  
  192.     }  
  193.   
  194. }  

對此接口還做了一種實現,通過配置獲取服務地址,參考附件:FixedAddressProvider.java

 

三、服務端服務注冊實現

ThriftServiceServerFactory.java

 

  1. package cn.slimsmart.thrift.rpc;  
  2.   
  3. import java.lang.instrument.IllegalClassFormatException;  
  4. import java.lang.reflect.Constructor;  
  5.   
  6. import org.apache.thrift.TProcessor;  
  7. import org.apache.thrift.TProcessorFactory;  
  8. import org.apache.thrift.protocol.TBinaryProtocol;  
  9. import org.apache.thrift.server.TServer;  
  10. import org.apache.thrift.server.TThreadedSelectorServer;  
  11. import org.apache.thrift.transport.TFramedTransport;  
  12. import org.apache.thrift.transport.TNonblockingServerSocket;  
  13. import org.springframework.beans.factory.InitializingBean;  
  14. import org.springframework.util.StringUtils;  
  15.   
  16. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;  
  17. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;  
  18. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;  
  19.   
  20. /** 
  21.  * 服務端注冊服務工廠 
  22.  */  
  23. public class ThriftServiceServerFactory implements InitializingBean {  
  24.     // 服務注冊本機端口  
  25.     private Integer port = 8299;  
  26.     // 優先級  
  27.     private Integer weight = 1;// default  
  28.     // 服務實現類  
  29.     private Object service;// serice實現類  
  30.     //服務版本號  
  31.     private String version;  
  32.     // 解析本機IP  
  33.     private ThriftServerIpResolve thriftServerIpResolve;  
  34.     //服務注冊  
  35.     private ThriftServerAddressRegister thriftServerAddressRegister;  
  36.   
  37.     private ServerThread serverThread;  
  38.       
  39.     public void setPort(Integer port) {  
  40.         this.port = port;  
  41.     }  
  42.   
  43.     public void setWeight(Integer weight) {  
  44.         this.weight = weight;  
  45.     }  
  46.   
  47.     public void setService(Object service) {  
  48.         this.service = service;  
  49.     }  
  50.   
  51.     public void setVersion(String version) {  
  52.         this.version = version;  
  53.     }  
  54.   
  55.     public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {  
  56.         this.thriftServerIpResolve = thriftServerIpResolve;  
  57.     }  
  58.   
  59.     public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {  
  60.         this.thriftServerAddressRegister = thriftServerAddressRegister;  
  61.     }  
  62.   
  63.     @Override  
  64.     public void afterPropertiesSet() throws Exception {  
  65.         if (thriftServerIpResolve == null) {  
  66.             thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();  
  67.         }  
  68.         String serverIP = thriftServerIpResolve.getServerIp();  
  69.         if (StringUtils.isEmpty(serverIP)) {  
  70.             throw new ThriftException("cant find server ip...");  
  71.         }  
  72.   
  73.         String hostname = serverIP + ":" + port + ":" + weight;  
  74.         Class<?> serviceClass = service.getClass();  
  75.         // 獲取實現類接口  
  76.         Class<?>[] interfaces = serviceClass.getInterfaces();  
  77.         if (interfaces.length == 0) {  
  78.             throw new IllegalClassFormatException("service-class should implements Iface");  
  79.         }  
  80.         // reflect,load "Processor";  
  81.         TProcessor processor = null;  
  82.         String serviceName = null;  
  83.         for (Class<?> clazz : interfaces) {  
  84.             String cname = clazz.getSimpleName();  
  85.             if (!cname.equals("Iface")) {  
  86.                 continue;  
  87.             }  
  88.             serviceName = clazz.getEnclosingClass().getName();  
  89.             String pname = serviceName + "$Processor";  
  90.             try {  
  91.                 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
  92.                 Class<?> pclass = classLoader.loadClass(pname);  
  93.                 if (!TProcessor.class.isAssignableFrom(pclass)) {  
  94.                     continue;  
  95.                 }  
  96.                 Constructor<?> constructor = pclass.getConstructor(clazz);  
  97.                 processor = (TProcessor) constructor.newInstance(service);  
  98.                 break;  
  99.             } catch (Exception e) {  
  100.                 //  
  101.             }  
  102.         }  
  103.         if (processor == null) {  
  104.             throw new IllegalClassFormatException("service-class should implements Iface");  
  105.         }  
  106.         //需要單獨的線程,因為serve方法是阻塞的.  
  107.         serverThread = new ServerThread(processor, port);  
  108.         serverThread.start();  
  109.         // 注冊服務  
  110.         if (thriftServerAddressRegister != null) {  
  111.             thriftServerAddressRegister.register(serviceName, version, hostname);  
  112.         }  
  113.   
  114.     }  
  115.     class ServerThread extends Thread {  
  116.         private TServer server;  
  117.         ServerThread(TProcessor processor, int port) throws Exception {  
  118.             TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);  
  119.             TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);    
  120.             TProcessorFactory processorFactory = new TProcessorFactory(processor);  
  121.             tArgs.processorFactory(processorFactory);  
  122.             tArgs.transportFactory(new TFramedTransport.Factory());    
  123.             tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));   
  124.             server = new TThreadedSelectorServer(tArgs);  
  125.         }  
  126.   
  127.         @Override  
  128.         public void run(){  
  129.             try{  
  130.                 //啟動服務  
  131.                 server.serve();  
  132.             }catch(Exception e){  
  133.                 //  
  134.             }  
  135.         }  
  136.           
  137.         public void stopServer(){  
  138.             server.stop();  
  139.         }  
  140.     }  
  141.       
  142.     public void close() {  
  143.         serverThread.stopServer();  
  144.     }  
  145. }  
 
 

四、客戶端獲取服務代理及連接池實現
客戶端連接池實現:ThriftClientPoolFactory.java

 

  1. package cn.slimsmart.thrift.rpc;  
  2.   
  3. import java.net.InetSocketAddress;  
  4.   
  5. import org.apache.commons.pool.BasePoolableObjectFactory;  
  6. import org.apache.thrift.TServiceClient;  
  7. import org.apache.thrift.TServiceClientFactory;  
  8. import org.apache.thrift.protocol.TBinaryProtocol;  
  9. import org.apache.thrift.protocol.TProtocol;  
  10. import org.apache.thrift.transport.TFramedTransport;  
  11. import org.apache.thrift.transport.TSocket;  
  12. import org.apache.thrift.transport.TTransport;  
  13.   
  14. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;  
  15.   
  16. /** 
  17.  * 連接池,thrift-client for spring 
  18.  */  
  19. public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {  
  20.   
  21.     private final ThriftServerAddressProvider serverAddressProvider;  
  22.     private final TServiceClientFactory<TServiceClient> clientFactory;  
  23.     private PoolOperationCallBack callback;  
  24.   
  25.     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {  
  26.         this.serverAddressProvider = addressProvider;  
  27.         this.clientFactory = clientFactory;  
  28.     }  
  29.   
  30.     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,  
  31.             PoolOperationCallBack callback) throws Exception {  
  32.         this.serverAddressProvider = addressProvider;  
  33.         this.clientFactory = clientFactory;  
  34.         this.callback = callback;  
  35.     }  
  36.   
  37.     static interface PoolOperationCallBack {  
  38.         // 銷毀client之前執行  
  39.         void destroy(TServiceClient client);  
  40.   
  41.         // 創建成功是執行  
  42.         void make(TServiceClient client);  
  43.     }  
  44.   
  45.     public void destroyObject(TServiceClient client) throws Exception {  
  46.         if (callback != null) {  
  47.             try {  
  48.                 callback.destroy(client);  
  49.             } catch (Exception e) {  
  50.                 //  
  51.             }  
  52.         }  
  53.         TTransport pin = client.getInputProtocol().getTransport();  
  54.         pin.close();  
  55.     }  
  56.   
  57.     public boolean validateObject(TServiceClient client) {  
  58.         TTransport pin = client.getInputProtocol().getTransport();  
  59.         return pin.isOpen();  
  60.     }  
  61.   
  62.     @Override  
  63.     public TServiceClient makeObject() throws Exception {  
  64.         InetSocketAddress address = serverAddressProvider.selector();  
  65.         TSocket tsocket = new TSocket(address.getHostName(), address.getPort());  
  66.         TTransport transport = new TFramedTransport(tsocket);  
  67.         TProtocol protocol = new TBinaryProtocol(transport);  
  68.         TServiceClient client = this.clientFactory.getClient(protocol);  
  69.         transport.open();  
  70.         if (callback != null) {  
  71.             try {  
  72.                 callback.make(client);  
  73.             } catch (Exception e) {  
  74.                 //  
  75.             }  
  76.         }  
  77.         return client;  
  78.     }  
  79.   
  80. }  

客戶端服務代理工廠實現:ThriftServiceClientProxyFactory.java

 

 

  1. package cn.slimsmart.thrift.rpc;  
  2.   
  3. import java.lang.reflect.InvocationHandler;  
  4. import java.lang.reflect.Method;  
  5. import java.lang.reflect.Proxy;  
  6.   
  7. import org.apache.commons.pool.impl.GenericObjectPool;  
  8. import org.apache.thrift.TServiceClient;  
  9. import org.apache.thrift.TServiceClientFactory;  
  10. import org.springframework.beans.factory.FactoryBean;  
  11. import org.springframework.beans.factory.InitializingBean;  
  12.   
  13. import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;  
  14. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;  
  15.   
  16. /** 
  17.  * 客戶端代理 
  18.  */  
  19. @SuppressWarnings({ "unchecked", "rawtypes" })  
  20. public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {  
  21.   
  22.     private Integer maxActive = 32;// 最大活躍連接數  
  23.   
  24.     // ms,default 3 min,鏈接空閑時間  
  25.     // -1,關閉空閑檢測  
  26.     private Integer idleTime = 180000;  
  27.     private ThriftServerAddressProvider serverAddressProvider;  
  28.   
  29.     private Object proxyClient;  
  30.     private Class<?> objectClass;  
  31.   
  32.     private GenericObjectPool<TServiceClient> pool;  
  33.   
  34.     private PoolOperationCallBack callback = new PoolOperationCallBack() {  
  35.         @Override  
  36.         public void make(TServiceClient client) {  
  37.             System.out.println("create");  
  38.         }  
  39.   
  40.         @Override  
  41.         public void destroy(TServiceClient client) {  
  42.             System.out.println("destroy");  
  43.         }  
  44.     };  
  45.       
  46.     public void setMaxActive(Integer maxActive) {  
  47.         this.maxActive = maxActive;  
  48.     }  
  49.   
  50.     public void setIdleTime(Integer idleTime) {  
  51.         this.idleTime = idleTime;  
  52.     }  
  53.   
  54.     public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {  
  55.         this.serverAddressProvider = serverAddressProvider;  
  56.     }  
  57.   
  58.     @Override  
  59.     public void afterPropertiesSet() throws Exception {  
  60.         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
  61.         // 加載Iface接口  
  62.         objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");  
  63.         // 加載Client.Factory類  
  64.         Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");  
  65.         TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();  
  66.         ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);  
  67.         GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();  
  68.         poolConfig.maxActive = maxActive;  
  69.         poolConfig.minIdle = 0;  
  70.         poolConfig.minEvictableIdleTimeMillis = idleTime;  
  71.         poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;  
  72.         pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);  
  73.         proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {  
  74.             @Override  
  75.             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
  76.                 //  
  77.                 TServiceClient client = pool.borrowObject();  
  78.                 try {  
  79.                     return method.invoke(client, args);  
  80.                 } catch (Exception e) {  
  81.                     throw e;  
  82.                 } finally {  
  83.                     pool.returnObject(client);  
  84.                 }  
  85.             }  
  86.         });  
  87.     }  
  88.   
  89.     @Override  
  90.     public Object getObject() throws Exception {  
  91.         return proxyClient;  
  92.     }  
  93.   
  94.     @Override  
  95.     public Class<?> getObjectType() {  
  96.         return objectClass;  
  97.     }  
  98.   
  99.     @Override  
  100.     public boolean isSingleton() {  
  101.         return true;  
  102.     }  
  103.   
  104.     public void close() {  
  105.         if (serverAddressProvider != null) {  
  106.             serverAddressProvider.close();  
  107.         }  
  108.     }  
  109. }  

下面我們看一下服務端和客戶端的配置;

 

服務端spring-context-thrift-server.xml

 

  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
  4.     xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
  6.                 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd  
  7.                 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  
  8.                 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"  
  9.     default-lazy-init="false">  
  10.   
  11.     <!-- zookeeper -->  
  12.     <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"  
  13.         destroy-method="close">  
  14.         <property name="zkHosts"  
  15.             value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />  
  16.         <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />  
  17.         <property name="connectionTimeout" value="3000" />  
  18.         <property name="sessionTimeout" value="3000" />  
  19.         <property name="singleton" value="true" />  
  20.     </bean>  
  21.     <bean id="sericeAddressRegister"  
  22.         class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"  
  23.         destroy-method="close">  
  24.         <property name="zkClient" ref="thriftZookeeper" />  
  25.     </bean>  
  26.     <bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" />  
  27.   
  28.     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
  29.         destroy-method="close">  
  30.         <property name="service" ref="echoSerivceImpl" />  
  31.         <property name="port" value="9000" />  
  32.         <property name="version" value="1.0.0" />  
  33.         <property name="weight" value="1" />  
  34.         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
  35.     </bean>  
  36.       
  37.     <bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
  38.         destroy-method="close">  
  39.         <property name="service" ref="echoSerivceImpl" />  
  40.         <property name="port" value="9001" />  
  41.         <property name="version" value="1.0.0" />  
  42.         <property name="weight" value="1" />  
  43.         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
  44.     </bean>  
  45.       
  46.     <bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
  47.         destroy-method="close">  
  48.         <property name="service" ref="echoSerivceImpl" />  
  49.         <property name="port" value="9002" />  
  50.         <property name="version" value="1.0.0" />  
  51.         <property name="weight" value="1" />  
  52.         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
  53.     </bean>  
  54. </beans>  

客戶端:spring-context-thrift-client.xml

  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
  4.     xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
  6.                 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd  
  7.                 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  
  8.                 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"  
  9.     default-lazy-init="false">  
  10.       
  11.     <!-- fixedAddress -->  
  12.     <!--   
  13.     <bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">  
  14.          <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />  
  15.          <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />  
  16.     </bean>  
  17.     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">  
  18.         <property name="maxActive" value="5" />  
  19.         <property name="idleTime" value="10000" />  
  20.         <property name="serverAddressProvider" ref="fixedAddressProvider" />  
  21.     </bean>  
  22.    -->  
  23.     <!-- zookeeper   -->  
  24.     <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"  
  25.         destroy-method="close">  
  26.         <property name="zkHosts"  
  27.             value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />  
  28.         <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />  
  29.         <property name="connectionTimeout" value="3000" />  
  30.         <property name="sessionTimeout" value="3000" />  
  31.         <property name="singleton" value="true" />  
  32.     </bean>  
  33.     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">  
  34.         <property name="maxActive" value="5" />  
  35.         <property name="idleTime" value="1800000" />  
  36.         <property name="serverAddressProvider">  
  37.             <bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">  
  38.                 <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />  
  39.                 <property name="version" value="1.0.0" />  
  40.                 <property name="zkClient" ref="thriftZookeeper" />  
  41.             </bean>  
  42.         </property>  
  43.     </bean>  
  44. </beans>  
 
運行服務端后,我們可以看見zookeeper注冊了多個服務地址。
 
問題
遇到一個bison版本過低的問題: http://blog.csdn.net/qq910894904/article/details/41132779
 
參考資料
Thrift原理簡析(Java)- http://shift-alt-ctrl.iteye.com/blog/1987416
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM