Thrift應用(java)


Thrift(java版)

1.      簡單介紹

Thrift是什么?能做什么?

Thrift是Facebook於2007年開發的跨語言的rpc服框架,提供多語言(C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 等)的編譯功能,並提供多種服務器工作模式;用戶通過Thrift的IDL(接口定義語言)來描述接口函數及數據類型,然后通過Thrift的編譯環境生成各種語言類型的接口文件,用戶可以根據自己的需要采用不同的語言開發客戶端代碼和服務器端代碼。

例如,我想開發一個快速計算的RPC服務,它主要通過接口函數getInt對外提供服務,這個RPC服務的getInt函數使用用戶傳入的參數,經過復雜的計算,計算出一個整形值返回給用戶;服務器端使用java語言開發,而調用客戶端可以是java、c、python等語言開發的程序,在這種應用場景下,我們只需要使用Thrift的IDL描述一下getInt函數(以.thrift為后綴的文件),然后使用Thrift的多語言編譯功能,將這個IDL文件編譯成C、java、python幾種語言對應的“特定語言接口文件”(每種語言只需要一條簡單的命令即可編譯完成),這樣拿到對應語言的“特定語言接口文件”之后,就可以開發客戶端和服務器端的代碼了,開發過程中只要接口不變,客戶端和服務器端的開發可以獨立的進行。

Thrift為服務器端程序提供了很多的工作模式,例如:線程池模型、非阻塞模型等等,可以根據自己的實際應用場景選擇一種工作模式高效地對外提供服務;

2.      環境配置

1. 如果是Maven構建項目的,在pom.xml 中添加如下內容:

 

 

 

 

 

<dependency>

  <groupId>org.apache.thrift</groupId>

  <artifactId>libthrift</artifactId>

  <version>0.9.0</version>

</dependency>

<dependency>

       <groupId>org.slf4j</groupId>

       <artifactId>slf4j-log4j12</artifactId>

       <version>1.5.8</version>

</dependency>

2.如果自己編譯lib包,把下載的壓縮包解壓到X:盤,然后在X:\thrift-0.9.0\lib\java 目錄下運行ant進行自動編譯,會在X:\thrift-0.9.0\lib\java\build\ 目錄下看到編譯好的lib包:libthrift-0.9.0.jar

3.      基本概念

3.1 數據類型

基本類型:

bool: 布爾值,true 或 false,對應 Java 的 Boolean

byte: 8 位有符號整數,對應 Java 的 byte

i16:16 位有符號整數,對應 Java 的 short

i32:32 位有符號整數,對應 Java 的 int

i64:64 位有符號整數,對應 Java 的 long

double:64 位浮點數,對應 Java 的 double

string:utf-8編碼的字符串,對應 Java 的 String

結構體類型:

Struct: 定義公共的對象,類似於 C 語言中的結構體定義,在 Java 中是一個 JavaBean

容器類型:

list:對應 Java 的 ArrayList

set: 對應 Java 的 HashSet

map: 對應 Java 的 HashMap

異常類型:

        exception:對應 Java 的 Exception

服務類型:

        service:對應服務的類

枚舉類型:

       

3.2 數據傳輸協議

Thrift 可以讓用戶選擇客戶端與服務端之間傳輸通信協議的類別,在傳輸協議上總體划分為文本 (text) 和二進制 (binary) 傳輸協議,為節約帶寬,提高傳輸效率,一般情況下使用二進制類型的傳輸協議為多數,有時還會使用基於文本類型的協議,這需要根據項目 / 產品中的實際需求。常用協議有以下幾種:

TBinaryProtocol : 二進制格式.

TCompactProtocol : 壓縮格式

TJSONProtocol : JSON格式

TSimpleJSONProtocol : 提供JSON只寫協議,生成的文件很容易通過腳本語言解析

3.3 傳輸層

常用的傳輸層有以下幾種:

TSocket—使用阻塞式 I/O 進行傳輸,是最常見的模式

TFramedTransport—非阻塞方式,按塊的大小進行傳輸,類似於 Java 中的 NIO

若使用 TFramedTransport 傳輸層,其服務器必須修改為非阻塞的服務類型,客戶端只需替換 TTransport 部分

TNonblockingTransport —— 使用非阻塞方式,用於構建異步客戶端

3.4編碼基本步驟

服務端編碼步驟:

實現服務處理接口impl

創建TProcessor

創建TServerTransport

創建TProtocol

創建TServer

啟動Server

客戶端編碼步驟:

創建Transport

創建TProtocol

基於TTransport和TProtocol創建 Client

調用Client的相應方法

 

Tips: 客戶端和服務端的協議要一致

4.      實例演示

4.1 生成Thrift代碼

4.1.1.      創建thrift文件

/*userinfo.thrift*/

namespace java com.maociyuan.cnblogs.home.userinfo

 

/*結構體類型:*/

struct UserInfo

{

1:i32 userid,

2:string username,

3:string userpwd,

4:string sex,

5:string age,

}

/*服務類型:*/

service UserInfoService{

UserInfo lg_userinfo_getUserInfoById(1:i32 userid),    

string   lg_userinfo_getUserNameById(1:i32 userid),   

i32      lg_userinfo_getUserCount(),                     

bool     lg_userinfo_checkUserById(1:i32 userid),     

 }

 

4.1.2.      編譯thrift文件

在命令窗口,進入thrift文件目錄執行如下命令

thrift-0.8.0.exe -r -gen java ./userinfo.thrift

生成兩個java文件

UserInfo.java

UserInfoService.java

導入工程中

 

4.2 實現接口Iface

package com.maociyuan.cnblogs.home.userinfo;

import java.util.HashMap;

import java.util.Map;

import org.apache.thrift.TException;

 

public class UserInfoServiceImpl implements UserInfoService.Iface {

    private static Map<Integer, UserInfo> userMap = new HashMap<Integer, UserInfo>();

    static {

       userMap.put(1, new UserInfo(1,"mao","mao","男","2016"));

       userMap.put(2, new UserInfo(2,"ci","ci","女","07"));

       userMap.put(3, new UserInfo(3,"yuan","yuan","男","28"));

    }

    public UserInfo lg_userinfo_getUserInfoById(int userid) throws TException {

       // TODO Auto-generated method stub

       return userMap.get(userid);

    }

 

    public String lg_userinfo_getUserNameById(int userid) throws TException {

       // TODO Auto-generated method stub

       return userMap.get(userid).getUsername();

    }

 

    public int lg_userinfo_getUserCount() throws TException {

       // TODO Auto-generated method stub

       return userMap.size();

    }

 

    public boolean lg_userinfo_checkUserById(int userid) throws TException {

       // TODO Auto-generated method stub

       return userMap.containsKey(userid);

    }

}

 

4.3 服務端代碼

package com.maociyuan.cnblogs.home.service;

 

import org.apache.thrift.TProcessor;

import org.apache.thrift.protocol.TBinaryProtocol;

import org.apache.thrift.server.THsHaServer;

import org.apache.thrift.server.TNonblockingServer;

import org.apache.thrift.server.TServer;

import org.apache.thrift.server.TSimpleServer;

import org.apache.thrift.server.TThreadPoolServer;

import org.apache.thrift.transport.TFramedTransport;

import org.apache.thrift.transport.TNonblockingServerSocket;

import org.apache.thrift.transport.TServerSocket;

 

import com.maociyuan.cnblogs.home.userinfo.UserInfoService;

import com.maociyuan.cnblogs.home.userinfo.UserInfoServiceImpl;

 

public class UserInfoServiceDemo {

    public static final int SERVER_PORT = 8090;

    public static final int SERVER_PORT1 = 8091;

    public static final int SERVER_PORT2 = 8092;

    public static final int SERVER_PORT3 = 8093;

 

    // 簡單的單線程服務模型,一般用於測試

    public void startSimleServer() {

       try {

           System.out.println("UserInfoServiceDemo TSimpleServer start ....");

 

           TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());

           TServerSocket serverTransport = new TServerSocket(SERVER_PORT);

           TServer.Args tArgs = new TServer.Args(serverTransport);

           tArgs.processor(tprocessor);

 

           tArgs.protocolFactory(new TBinaryProtocol.Factory());

          

           TServer server = new TSimpleServer(tArgs);

           server.serve();

 

       } catch (Exception e) {

           System.out.println("Server start error!!!");

           e.printStackTrace();

       }

    }

   

    //線程池服務模型,使用標准的阻塞式IO,預先創建一組線程處理請求。

    public void startTThreadPoolServer() {

       try {

            System.out.println("UserInfoServiceDemo TThreadPoolServer start ....");

 

            TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());

            TServerSocket serverTransport = new TServerSocket(SERVER_PORT1);

            TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);

            ttpsArgs.processor(tprocessor);

            ttpsArgs.protocolFactory(new TBinaryProtocol.Factory());

 

            // 線程池服務模型,使用標准的阻塞式IO,預先創建一組線程處理請求。

            TServer server = new TThreadPoolServer(ttpsArgs);

            server.serve();

 

       } catch (Exception e) {

           System.out.println("Server start error!!!");

           e.printStackTrace();

       }

    }

   

    // 線程池服務模型,使用標准的阻塞式IO,使用非阻塞式IO

    public void startTNonblockingServer() {

       try {

            System.out.println("UserInfoServiceDemo TNonblockingServer start ....");

 

            TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());

            TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT2);

            TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);

            tnbArgs.processor(tprocessor);

            

            // 使用非阻塞式IO,服務端和客戶端需要指定TFramedTransport數據傳輸的方式

            tnbArgs.transportFactory(new TFramedTransport.Factory());

            tnbArgs.protocolFactory(new TBinaryProtocol.Factory());

 

            TServer server = new TNonblockingServer(tnbArgs);

            server.serve();

 

       } catch (Exception e) {

           System.out.println("Server start error!!!");

           e.printStackTrace();

       }

    }

   

    //半同步半異步的服務端模型,需要指定為: TFramedTransport 數據傳輸的方式。

    public void startTHsHaServer() {

       try {

           System.out.println("HelloWorld THsHaServer start ....");

 

           TProcessor tprocessor = new UserInfoService.Processor<UserInfoService.Iface>(new UserInfoServiceImpl());

 

           TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(SERVER_PORT3);

           THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);

           thhsArgs.processor(tprocessor);

           thhsArgs.transportFactory(new TFramedTransport.Factory());

           thhsArgs.protocolFactory(new TBinaryProtocol.Factory());

 

           TServer server = new THsHaServer(thhsArgs);

           server.serve();

 

       } catch (Exception e) {

           System.out.println("Server start error!!!");

           e.printStackTrace();

       }

    }

 

    public static void main(String[] args) {

       UserInfoServiceDemo server = new UserInfoServiceDemo();

       //server.startSimleServer();

       //server.startTThreadPoolServer();

       //server.startTNonblockingServer();

       //server.startTHsHaServer();

    }

}

 

4.4 客戶端代碼

UserInfoClientDemo.java

package com.maociyuan.cnblogs.home.client;

 

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

 

import org.apache.thrift.TException;

import org.apache.thrift.async.AsyncMethodCallback;

import org.apache.thrift.async.TAsyncClientManager;

import org.apache.thrift.protocol.TBinaryProtocol;

import org.apache.thrift.protocol.TCompactProtocol;

import org.apache.thrift.protocol.TProtocol;

import org.apache.thrift.protocol.TProtocolFactory;

import org.apache.thrift.transport.TNonblockingSocket;

import org.apache.thrift.transport.TNonblockingTransport;

import org.apache.thrift.transport.TSocket;

import org.apache.thrift.transport.TTransport;

import org.apache.thrift.transport.TTransportException;

 

import com.maociyuan.cnblogs.home.userinfo.UserInfoService;

import com.maociyuan.cnblogs.home.userinfo.UserInfoService.AsyncClient.lg_userinfo_getUserNameById_call;

 

public class UserInfoClientDemo {

    public static final String SERVER_IP = "localhost";

    public static final int SERVER_PORT = 8090;

    public static final int SERVER_PORT1 = 8091;

    public static final int SERVER_PORT2 = 8092;

    public static final int SERVER_PORT3 = 8093;

    public static final int TIMEOUT = 30000;

 

    public void startClient(int userid) {

       TTransport transport = null;

       try {

           transport = new TSocket(SERVER_IP, SERVER_PORT, TIMEOUT);

           // 協議要和服務端一致

           TProtocol protocol = new TBinaryProtocol(transport);

           // TProtocol protocol = new TCompactProtocol(transport);

           // TProtocol protocol = new TJSONProtocol(transport);

           UserInfoService.Client client = new UserInfoService.Client(protocol);

           transport.open();

           String result = client.lg_userinfo_getUserNameById(userid);

           System.out.println("Thrify client result =: " + result);

       } catch (TTransportException e) {

           e.printStackTrace();

       } catch (TException e) {

          

                  //處理服務端返回值為null問題

                  if (e instanceof TApplicationException

                        && ((TApplicationException) e).getType() ==  

                                 TApplicationException.MISSING_RESULT) {

                System.out.println("The result of lg_userinfo_getUserNameById function is NULL");

            }

       } finally {

           if (null != transport) {

              transport.close();

           }

       }

    }

   

    public void startClientAsync(int userid,int port) {

       TNonblockingTransport transport = null;

       try {

           TAsyncClientManager clientManager = new TAsyncClientManager();

           transport = new TNonblockingSocket(SERVER_IP,port, TIMEOUT);

 

           TProtocolFactory tprotocol = new TCompactProtocol.Factory();

           UserInfoService.AsyncClient asyncClient = new UserInfoService.AsyncClient(

                  tprotocol, clientManager, transport);

           System.out.println("Client start .....");

 

           CountDownLatch latch = new CountDownLatch(1);

           AsynCallback callBack = new AsynCallback(latch);

           System.out.println("call method sayHello start ...");

           asyncClient.lg_userinfo_getUserNameById(userid, callBack);

           System.out.println("call method sayHello .... end");

           boolean wait = latch.await(30, TimeUnit.SECONDS);

           System.out.println("latch.await =:" + wait);

       } catch (Exception e) {

           e.printStackTrace();

       }

       System.out.println("startClient end.");

    }

   

    public class AsynCallback implements AsyncMethodCallback<lg_userinfo_getUserNameById_call>{

       private CountDownLatch latch;

 

       public AsynCallback(CountDownLatch latch) {

           this.latch = latch;

       }

 

       @Override

       public void onComplete(lg_userinfo_getUserNameById_call response) {

           System.out.println("onComplete");

           try {

              Thread.sleep(1000L * 1);

              System.out.println("AsynCall result =:" + response.getResult().toString());

           } catch (TException e) {

              e.printStackTrace();

           } catch (Exception e) {

              e.printStackTrace();

           } finally {

              latch.countDown();

           }

       }

 

       @Override

       public void onError(Exception exception) {

           System.out.println("onError :" + exception.getMessage());

           latch.countDown();

       }

    }

 

    public static void main(String[] args) {

       UserInfoClientDemo client = new UserInfoClientDemo();

       client.startClient(1);

       client.startClient(2);

       client.startClient(3);

       client.startClientAsync(1,SERVER_PORT2);

       client.startClientAsync(2,SERVER_PORT3);

       client.startClientAsync(3,SERVER_PORT2);

    }

}

5.      服務端工作模式

常見的服務端工作模式有5種:TSimpleServer、TNonblockingServer、THsHaServer、TThreadPoolServer、TThreadedSelectorServer

5.1  TSimpleServer模式

TSimpleServer單線程服務器端使用標准的阻塞式 I/O,只有一個工作線程,循環監聽新請求的到來並完成對請求的處理,它只是在簡單的演示時候使用,它的工作方式如圖

 

5.2  TThreadPoolServer模式

TThreadPoolServer模式--多線程服務器端使用標准的阻塞式 I/O,采用阻塞socket方式工作,主線程負責阻塞式監聽“監聽socket”中是否有新socket到來,業務處理交由一個線程池來處理,如下圖所示:

 

TThreadPoolServer模式優點:

線程池模式中,數據讀取和業務處理都交由線程池完成,主線程只負責監聽新連接,因此在並發量較大時新連接也能夠被及時接受。線程池模式比較適合服務器端能預知最多有多少個客戶端並發的情況,這時每個請求都能被業務線程池及時處理,性能也非常高。

TThreadPoolServer模式缺點:

線程池模式的處理能力受限於線程池的工作能力,當並發請求數大於線程池中的線程數時,新請求也只能排隊等待。

5.3  TNonblockingServer模式

TNonblockingServer工作模式,該模式也是單線程工作,但是該模式采用NIO的方式,所有的socket都被注冊到selector中,在一個線程中通過seletor循環監控所有的socket,每次selector結束時,處理所有的處於就緒狀態的socket,對於有數據到來的socket進行數據讀取操作,對於有數據發送的socket則進行數據發送,對於監聽socket則產生一個新業務socket並將其注冊到selector中,如下圖所示:

 

上圖中讀取數據之后的業務處理就是根據讀取到的調用請求,調用具體函數完成處理,只有完成函數處理才能進行后續的操作;

TNonblockingServer模式優點:

相比於TSimpleServer效率提升主要體現在IO多路復用上,TNonblockingServer采用非阻塞IO,同時監控多個socket的狀態變化;

TNonblockingServer模式缺點:

TNonblockingServer模式在業務處理上還是采用單線程順序來完成,在業務處理比較復雜、耗時的時候,例如某些接口函數需要讀取數據庫執行時間較長,此時該模式效率也不高,因為多個調用請求任務依然是順序一個接一個執行。

5.4  THsHaServer模式(半同步半異步)

 THsHaServer類是TNonblockingServer類的子類,在5.2節中的TNonblockingServer模式中,采用一個線程來完成對所有socket的監聽和業務處理,造成了效率的低下,THsHaServer模式的引入則是部分解決了這些問題。THsHaServer模式中,引入一個線程池來專門進行業務處理,如下圖所示;

 

THsHaServer的優點:

與TNonblockingServer模式相比,THsHaServer在完成數據讀取之后,將業務處理過程交由一個線程池來完成,主線程直接返回進行下一次循環操作,效率大大提升;

THsHaServer的缺點:

由圖5.3可以看出,主線程需要完成對所有socket的監聽以及數據讀寫的工作,當並發請求數較大時,且發送數據量較多時,監聽socket上新連接請求不能被及時接受。

 

5.5  TThreadedSelectorServer模式

TThreadedSelectorServer模式是目前Thrift提供的最高級的模式,它內部有如果幾個部分構成:

(1)  一個AcceptThread線程對象,專門用於處理監聽socket上的新連接;

(2)  若干個SelectorThread對象專門用於處理業務socket的網絡I/O操作,所有網絡數據的讀寫均是有這些線程來完成;

(3)  一個負載均衡器SelectorThreadLoadBalancer對象,主要用於AcceptThread線程接收到一個新socket連接請求時,決定將這個新連接請求分配給哪個SelectorThread線程。

(4)  一個ExecutorService類型的工作線程池,在SelectorThread線程中,監聽到有業務socket中有調用請求過來,則將請求讀取之后,交個ExecutorService線程池中的線程完成此次調用的具體執行;

 

如上圖所示,TThreadedSelectorServer模式中有一個專門的線程AcceptThread用於處理新連接請求,因此能夠及時響應大量並發連接請求;另外它將網絡I/O操作分散到多個SelectorThread線程中來完成,因此能夠快速對網絡I/O進行讀寫操作,能夠很好地應對網絡I/O較多的情況;TThreadedSelectorServer對於大部分應用場景性能都不會差,因此,如果實在不知道選擇哪種工作模式,使用TThreadedSelectorServer就可以。

 

6.      實現原理

5.1 架構

Thrift 包含一個完整的堆棧結構用於構建客戶端和服務器端。下圖描繪了 Thrift 的整體架構

 

如圖所示,圖中黃色部分是用戶實現的業務邏輯,褐色部分是根據 Thrift 定義的服務接口描述文件生成的客戶端和服務器端代碼框架,紅色部分是根據 Thrift 文件生成代碼實現數據的讀寫操作。紅色部分以下是 Thrift 的傳輸體系、協議以及底層 I/O 通信,使用 Thrift 可以很方便的定義一個服務並且選擇不同的傳輸協議和傳輸層而不用重新生成代碼。

Thrift 服務器包含用於綁定協議和傳輸層的基礎架構,它提供阻塞、非阻塞、單線程和多線程的模式運行在服務器上,可以配合服務器 / 容器一起運行,可以和現有的 J2EE 服務器 /Web 容器無縫的結合。

服務端和客戶端具體的調用流程如下:

圖.服務端啟動、服務時序圖:

 

 

該圖所示是 HelloServiceServer 啟動的過程以及服務被客戶端調用時,服務器的響應過程。從圖中我們可以看到,程序調用了 TThreadPoolServer 的 serve 方法后,server 進入阻塞監聽狀態,其阻塞在 TServerSocket 的 accept 方法上。當接收到來自客戶端的消息后,服務器發起一個新線程處理這個消息請求,原線程再次進入阻塞狀態。在新線程中,服務器通過 TBinaryProtocol 協議讀取消息內容,調用 HelloServiceImpl 的 helloVoid 方法,並將結果寫入 helloVoid_result 中傳回客戶端。

 

圖.客戶端調用服務時序圖:

 

該圖所示是 HelloServiceClient 調用服務的過程以及接收到服務器端的返回值后處理結果的過程。從圖中我們可以看到,程序調用了 Hello.Client 的 helloVoid 方法,在 helloVoid 方法中,通過 send_helloVoid 方法發送對服務的調用請求,通過 recv_helloVoid 方法接收服務處理請求后返回的結果。

5.2 調用流程

Thrift框架的遠程過程調用的工作過程如下:

        (1) 通過IDL定義一個接口的thrift文件,然后通過thrift的多語言編譯功能,將接口定義的thrift文件翻譯成對應的語言版本的接口文件;

        (2) Thrift生成的特定語言的接口文件中包括客戶端部分和服務器部分;

        (3) 客戶端通過接口文件中的客戶端部分生成一個Client對象,這個客戶端對象中包含所有接口函數的存根實現,然后用戶代碼就可以通過這個Client對象來調用thrift文件中的那些接口函數了,但是,客戶端調用接口函數時實際上調用的是接口函數的本地存根實現,

        (4) 接口函數的存根實現將調用請求發送給thrift服務器端,然后thrift服務器根據調用的函數名和函數參數,調用實際的實現函數來完成具體的操作

        (5) Thrift服務器在完成處理之后,將函數的返回值發送給調用的Client對象;

(6) Thrift的Client對象將函數的返回值再交付給用戶的調用函數

 

5.3 源碼分析

源碼分析主要分析thrift生成的java接口文件,並以TestThriftService.java為例,以該文件為線索,逐漸分析文件中遇到的其他類和文件;在thrift生成的服務接口文件中,共包含以下幾部分:

        (1)異步客戶端類AsyncClient和異步接口AsyncIface,本節暫不涉及這些異步操作相關內容;

        (2)同步客戶端類Client和同步接口Iface,Client類繼承自TServiceClient,並實現了同步接口Iface;Iface就是根據thrift文件中所定義的接口函數所生成;Client類是在開發Thrift的客戶端程序時使用,Client類是Iface的客戶端存根實現, Iface在開發Thrift服務器的時候要使用,Thrift的服務器端程序要實現接口Iface。

        (3)Processor類,該類主要是開發Thrift服務器程序的時候使用,該類內部定義了一個map,它保存了所有函數名到函數對象的映射,一旦Thrift接到一個函數調用請求,就從該map中根據函數名字找到該函數的函數對象,然后執行它;

        (4)參數類,為每個接口函數定義一個參數類,例如:為接口getInt產生一個參數類:getInt_args,一般情況下,接口函數參數類的命名方式為:接口函數名_args;

        (5)返回值類,每個接口函數定義了一個返回值類,例如:為接口getInt產生一個返回值類:getInt_result,一般情況下,接口函數返回值類的命名方式為:接口函數名_result;

        參數類和返回值類中有對數據的讀寫操作,在參數類中,將按照協議類將調用的函數名和參數進行封裝,在返回值類中,將按照協議規定讀取數據。

        Thrift調用過程中,Thrift客戶端和服務器之間主要用到傳輸層類、協議層類和處理類三個主要的核心類,這三個類的相互協作共同完成rpc的整個調用過程。在調用過程中將按照以下順序進行協同工作:

        (1)     將客戶端程序調用的函數名和參數傳遞給協議層(TProtocol),協議層將函數名和參數按照協議格式進行封裝,然后封裝的結果交給下層的傳輸層。此處需要注意:要與Thrift服務器程序所使用的協議類型一樣,否則Thrift服務器程序便無法在其協議層進行數據解析;

        (2)     傳輸層(TTransport)將協議層傳遞過來的數據進行處理,例如傳輸層的實現類TFramedTransport就是將數據封裝成幀的形式,即“數據長度+數據內容”,然后將處理之后的數據通過網絡發送給Thrift服務器;此處也需要注意:要與Thrift服務器程序所采用的傳輸層的實現類一致,否則Thrift的傳輸層也無法將數據進行逆向的處理;

        (3)     Thrift服務器通過傳輸層(TTransport)接收網絡上傳輸過來的調用請求數據,然后將接收到的數據進行逆向的處理,例如傳輸層的實現類TFramedTransport就是將“數據長度+數據內容”形式的網絡數據,轉成只有數據內容的形式,然后再交付給Thrift服務器的協議類(TProtocol);

        (4)     Thrift服務端的協議類(TProtocol)將傳輸層處理之后的數據按照協議進行解封裝,並將解封裝之后的數據交個Processor類進行處理;

        (5)     Thrift服務端的Processor類根據協議層(TProtocol)解析的結果,按照函數名找到函數名所對應的函數對象;

        (6)     Thrift服務端使用傳過來的參數調用這個找到的函數對象;

        (7)     Thrift服務端將函數對象執行的結果交給協議層;

        (8)     Thrift服務器端的協議層將函數的執行結果進行協議封裝;

        (9)     Thrift服務器端的傳輸層將協議層封裝的結果進行處理,例如封裝成幀,然后發送給Thrift客戶端程序;

        (10)    Thrift客戶端程序的傳輸層將收到的網絡結果進行逆向處理,得到實際的協議數據;

        (11)    Thrift客戶端的協議層將數據按照協議格式進行解封裝,然后得到具體的函數執行結果,並將其交付給調用函數;

 

客戶端協議類和服務端協議類都是指具體實現了TProtocol接口的協議類,在實際開發過程中二者必須一樣,否則便不能進行通信;同樣,客戶端傳輸類和服務端傳輸類是指TTransport的子類,二者也需保持一致;

       在上述開發thrift客戶端和服務器端程序時需要用到三個類:傳輸類(TTransport)、協議接口(TProtocol)和處理類(Processor),其中TTransport是抽象類,在實際開發過程中可根據具體清空選擇不同的實現類;TProtocol是個協議接口,每種不同的協議都必須實現此接口才能被thrift所調用。例如TProtocol類的實現類就有TBinaryProtocol等;在Thrift生成代碼的內部,還需要將待傳輸的內容封裝成消息類TMessage。處理類(Processor)主要在開發Thrift服務器端程序的時候使用。

 

1、TMessage

       Thrift在客戶端和服務器端傳遞數據的時候(包括發送調用請求和返回執行結果),都是將數據按照TMessage進行組裝,然后發送;TMessage包括三部分:消息的名稱、消息的序列號和消息的類型,消息名稱為字符串類型,消息的序列號為32位的整形,消息的類型為byte類型,消息的類型:

public final classTType {

  public staticfinal byte STOP   =0;

  public staticfinal byte VOID   =1;

  public staticfinal byte BOOL   =2;

  public staticfinal byte BYTE   =3;

  public staticfinal byte DOUBLE = 4;

  public staticfinal byte I16    =6;

  public staticfinal byte I32    =8;

  public staticfinal byte I64    =10;

  public staticfinal byte STRING = 11;

  public staticfinal byte STRUCT = 12;

  public staticfinal byte MAP    =13;

  public staticfinal byte SET    =14;

  public staticfinal byte LIST   =15;

  public staticfinal byte ENUM   =16;

}

2.      傳輸類(TTransport)

       傳輸類或其各種實現類,都是對I/O層的一個封裝,可更直觀的理解為它封裝了一個socket,不同的實現類有不同的封裝方式,例如TFramedTransport類,它里面還封裝了一個讀寫buf,在寫入的時候,數據都先寫到這個buf里面,等到寫完調用該類的flush函數的時候,它會將寫buf的內容,封裝成幀再發送出去;

       TFramedTransport是對TTransport的繼承,由於tcp是基於字節流的方式進行傳輸,因此這種基於幀的方式傳輸就要求在無頭無尾的字節流中每次寫入和讀出一個幀,TFramedTransport是按照下面的方式來組織幀的:每個幀都是按照4字節的幀長加上幀的內容來組織,幀內容就是我們要收發的數據,如下:

  +---------------+---------------+

  |   4字節的幀長  |   幀的內容       |

  +---------------+---------------+

3.      協議接口(TProtocol)

       提供了一組操作協議接口,主要用於規定采用哪種協議進行數據的讀寫,它內部包含一個傳輸類(TTransport)成員對象,通過TTransport對象從輸入輸出流中讀寫數據;它規定了很多讀寫方式,例如:

readByte()

readDouble()

readString()

       每種實現類都根據自己所實現的協議來完成TProtocol接口函數的功能,例如實現了TProtocol接口的TBinaryProtocol類,對於readDouble()函數就是按照二進制的方式讀取出一個Double類型的數據。

       類TBinaryProtocol是TProtocol的一個實現類,TBinaryProtocol協議規定采用這種協議格式的進行消息傳輸時,需要為消息內容封裝一個首部,TBinaryProtocol協議的首部有兩種操作方式:一種是嚴格讀寫模式,一種值普通的讀寫模式;這兩種模式下消息首部的組織方式不一樣,在創建時也可以自己指定使用那種模式,但是要注意,如果要指定模式,Thrift的服務器端和客戶端都需要指定。

       嚴格讀寫模型下的消息首部的前16字節固定為版本號:0x8001,如圖1所示;

 

       圖1二進制協議中嚴格讀寫模式下的消息組織方式

       在嚴格讀寫模式下,首部中前32字節包括固定的16字節協議版本號0x8001,8字節的0x00,8字節的消息類型;然后是若干字節字符串格式的消息名稱,消息名稱的組織方式也是“長度+內容”的方式;再下來是32位的消息序列號;在序列號之后的才是消息內容。

       普通讀寫模式下,沒有版本信息,首部的前32字節就是消息的名稱,然后是消息的名字,接下來是32為的消息序列號,最后是消息的內容。

 

         圖2 二進制協議中普通讀寫模式下的消息組織方式

       通信過程中消息的首部在TBinaryProtocol類中進行通過readMessageBegin讀取,通過writeMessageBegin寫入;但是消息的內容讀取在返回值封裝類(例如:getStr_result)中進行;

(1)     TBinaryProtocol的讀取數據過程:

在Client中調用TBinaryProtocol讀取數據的過程如下:

readMessageBegin()

讀取數據

readMessageEnd()

readMessageBegin詳細過程如下:

      [1]首先從傳輸過來的網絡數據中讀取32位數據,然后根據首位是否為1來判斷當前讀到的消息是嚴格讀寫模式還是普通讀寫模式;如果是嚴格讀寫模式則這32位數據包括版本號和消息類型,否則這32位保存的是后面的消息名稱

      [2]讀取消息的名稱,如果是嚴格讀寫模式,則消息名稱為字符串格式,保存方式為“長度+內容”的方式,如果是普通讀寫模式,則消息名稱的長度直接根據[1]中讀取的長度來讀取消息名稱;

      [3]讀取消息類型,如果是嚴格讀寫模式,則消息類型已經由[1]中讀取出來了,在其32位數據中的低8位中保存着;如果是普通讀寫模式,則要繼續讀取一字節的消息類型;

      [4]讀取32為的消息序列號;

      讀取數據的過程是在函數返回值的封裝類中來完成,根據讀取的數值的類型來具體讀取數據的內容;在TBinaryProtocol協議中readMessageEnd函數為空,什么都沒有干。

(2)     TBinaryProtocol的寫入數據過程:

      在sendBase函數調用TBinaryProtocol將調用函數和參數發送到Thrift服務器的過程如下:

writeMessageBegin(TMessage m)

寫入數據到TTransport實現類的buf中

writeMessageEnd();

getTransport().flush();

writeMessageBegin函數需要一個參數TMessage作為消息的首部,寫入過程與讀取過程類似,首先判斷需要執行嚴格讀寫模式還是普通讀寫模式,然后分別按照讀寫模式的具體消息將消息首部寫入TBinaryProtocol的TTransport成員的buf中;

4.      Thrift客戶端存根代碼追蹤調試

      下面通過跟蹤附件中thrift客戶端代碼的test()函數,在該函數中調用了Thrift存根函數getStr,通過追蹤該函數的執行過程來查看整個Thrift的調用流程:

      (1)客戶端代碼先打開socket,然后調用存根對象的

m_transport.open();

String res = testClient.getStr("test1","test2");

      (2)在getStr的存根實現中,首先發送調用請求,然后等待Thrift服務器端返回的結果:

send_getStr(srcStr1, srcStr2);

return recv_getStr();

      (3)發送調用請求函數send_getStr中主要將參數存儲到參數對象中,然后把參數和函數名發送出去:

getStr_args args = new getStr_args();//創建一個該函數的參數對象

args.setSrcStr1(srcStr1);//將參數值設置帶參數對象中

args.setSrcStr2(srcStr2);

sendBase("getStr", args);//將函數名和參數對象發送出去

      (4)sendBase函數,存根類Client繼承自基類TServiceClient,sendBase函數即是在TServiceClient類中實現的,它的主要功能是調用協議類將調用的函數名和參數發送給Thrift服務器:

oprot_.writeMessageBegin(new TMessage(methodName,TMessageType.CALL, ++seqid_));//將函數名,消息類型,序號等信息存放到oprot_的TFramedTransport成員的buf中

args.write(oprot_);//將參數存放到oprot_的TFramedTransport成員的buf中

oprot_.writeMessageEnd();

oprot_.getTransport().flush();//將oprot_的TFramedTransport成員的buf中的存放的消息發送出去;

      這里的oprot_就是在TProtocol的子類,本例中使用的是TBinaryProtocol,在調用TBinaryProtocol的函數時需要傳入一個TMessage對象(在本節第2小節中有對TMessage的描述),這個TMessage對象的名字就是調用函數名,消息的類型為TMessageType.CALL,調用序號使用在客戶端存根類中(實際上是在基類TServiceClient)中保存的一個序列號,每次調用時均使用此序列號,使用完再將序號加1。

      在TBinaryProtocol中包含有一個TFramedTransport對象,而TFramedTransport對象中維護了一個緩存,上述代碼中,寫入函數名、參數的時候都是寫入到TFramedTransport中所維護的那個緩存中,在調用TFramedTransport的flush函數的時候,flush函數首先計算緩存中數據的長度,將長度和數據內容組裝成幀,然后發送出去,幀的格式按照“長度+字段”的方式組織,如:

  +---------------+---------------+

  |   4字節的幀長  |    幀的內容       |

  +---------------+---------------+

      (5)recv_getStr,在調用send_getStr將調用請求發送出去之后,存根函數getStr中將調用recv_getStr等待Thrift服務器端返回調用結果,recv_getStr的代碼為:

getStr_resultresult = new getStr_result();//為接收返回結果創建一個返回值對象

  receiveBase(result, "getStr");//等待Thrift服務器將結果返回

      (6)receiveBase,在該函數中,首先通過協議層讀取消息的首部,然后由針對getStr生成的返回值類getStr_result讀取返回結果的內容;最后由協議層對象結束本次消息讀取操作;如下所示:

iprot_.readMessageBegin();//通過協議層對象讀取消息的首部

……

result.read(iprot_);//通過返回值類對象讀取具體的返回值;

……

iprot_.readMessageEnd();//調用協議層對象結束本次消息讀取

在本節第4小節中有對readMessageBegin函數的描述;

5.           處理類(Processor)

      該類主要由Thrift服務器端程序使用,它是由thrift編譯器根據IDL編寫的thrift文件生成的具體語言的接口文件中所包含的類,例如2.5節中提到的TestThriftService.java文件,處理類(Processor)主要由thrift服務器端使用,它繼承自基類TBaseProcessor。

例如,2.5節中提到服務器端程序的如下代碼:

TProcessor tProcessor =

New TestThriftService.Processor<TestThriftService.Iface>(m_myService);

      這里的TestThriftService.Processor就是這里提到的Processor類,包括尖括號里面的接口TestThriftService.Iface也是由thrift編譯器自動生成。Processor類主要完成函數名到對應的函數對象的映射,它內部維護一個map,map的key就是接口函數的名字,而value就是接口函數所對應的函數對象,這樣服務器端從網絡中讀取到客戶端發來的函數名字的時候,就通過這個map找到該函數名對應的函數對象,然后再用客戶端發來的參數調用該函數對象;在Thrift框架中,每個接口函數都有一個函數對象與之對應,這里的函數對象繼承自虛基類ProcessFunction。

      ProcessFunction類,它采用類似策略模式的實現方法,該類有一個字符串的成員變量,用於存放該函數對象對應的函數名字,在ProcessFunction類中主要實現了process方法,此方法的功能是通過協議層從傳輸層中讀取並解析出調用的參數,然后再由具體的函數對象提供的getResult函數計算出結果;每個繼承自虛基類ProcessFunction的函數對象都必須實現這個getResult函數,此函數內部根據函數調用參數,調用服務器端的函數,並獲得執行結果;process在通過getResult函數獲取到執行結果之后,通過協議類對象將結果發送給Thrift客戶端程序。

      Thrift服務器端程序在使用Thrrift服務框架時,需要提供以下幾個條件:

      (1)定義一個接口函數實現類的對象,在開發Thrift服務程序時,最主要的功能就是開發接口的實現函數,這個接口函數的實現類implements接口Iface,並實現了接口中所有函數;

      (2)創建一個監聽socket,Thrift服務框架將從此端口監聽新的調用請求到來;

      (3)創建一個實現了TProtocol接口的協議類對象,在與Thrift客戶端程序通信時將使用此協議進行網絡數據的封裝和解封裝;

      (4)創建一個傳輸類的子類,用於和Thrift服務器之間進行數據傳輸

7.      參考資料

學習

Apache Thrift 官網:可下載 Thrift 工具和源碼。

Thrift Features and Non-features:Thrift 的功能特點和不足之處。

Apache Thrift 介紹:介紹 Thrift 架構、協議、傳輸層和服務端類型,並與其他構建服務的方法 ( 如:REST) 進行比較分析。

Thrift 的安裝部署:Thrift 的安裝部署說明

Thrift: Scalable Cross-Language Services Implementation:Thrift 官方文檔,詳細介紹 Thrift 的

Thrift IDL 官方的IDL示例文件

 

設計

Thrift API:關於 Apache Thrift 0.6.1 構建服務端和客戶端的 API 手冊

Thrift 實例:Thrift 的簡單應用實例

Fully async Thrift client in Java:關於 Thrift 異步客戶端的介紹

developerWorks Java 技術專區:這里有數百篇關於 Java 編程各個方面的文章。

 

參考引用博客

http://www.ibm.com/developerworks/cn/java/j-lo-apachethrift/

http://houjixin.blog.163.com/blog/static/35628410201501654039437/

http://www.micmiu.com/soa/rpc/thrift-sample/

http://wenku.baidu.com/link?url=y3ecnxyUQmeD487ipA0jgAm2Bl5whKyVGCRHSYbMmxEX4yJJzOSWfc52ms1oVLa5N_9BDHzjXUZsEXVrifF6a-xXvahnbYvjD3LpOjAhUM_

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM