Thrift - 快速入門


Getting Started

如果有homebrew的話,直接執行以下命令即可,brew會處理相關依賴(https://thrift.apache.org/docs/install/)。

brew install thrift

 

或者可以從源碼安裝。
下載tar包 https://thrift.apache.org/download
參考 https://thrift.apache.org/docs/BuildingFromSource

 

先寫一個例子,目錄結構如下:

├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   └── resources
│   └── test
│       └── java
└── thrift
    ├── Common.thrift
    └── ShopService.thrift

 

pom.xml中添加以下依賴:

<dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libthrift</artifactId>
    <version>0.10.0</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.18</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.25</version>
</dependency>

 

thrift目錄下創建兩個thrift文件:

Common.thrift

namespace java me.kavlez.thrift.service

service BaseService {
    string echoServiceName()
}

 

ShopService.thrift

include "Common.thrift"

namespace java me.kavlez.thrift.service

struct Shop {
    1: required i32 id,
    2: required string name
}

struct Item {
    1: required i32 id,
    2: required string name = "unknown",
    3: required string detail,
    4: required Shop shop
}

service ShopService extends Common.BaseService {

  Shop queryShopInfo(1: i32 id),
  bool isValidShop(1: Shop shop),
  set<Item> queryItems(1: i32 shopId),
}

 

Thrift提供了多個語言的生成器實現,按照thrift文件生成java類,生成代碼命令的用法如下:

thrift -r --gen <language> <Thrift filename>

 

其中-r即recursive,如果在文件中通過include關鍵字引用了其他文件,-r選項可以一並生成被引用的文件。

例如上面ShopService.thrift中的:

include Common.thrift

 

默認情況下,代碼會在gen-<language>目錄下生成,生成目錄可以通過--out指定。

生成后再拷貝有點麻煩,直接生成到代碼目錄下,在工程目錄下執行以下命令:

thrift -r --gen java --out src/main/java thrift/ShopService.thrift

 

執行后src/main/java/目錄下生成me/kavlez/thrift/service/目錄,以及4個java文件。

 

在service目錄下創建impl,提供接口實現:

package me.kavlez.thrift.service.impl;

import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.service.Item;
import me.kavlez.thrift.service.Shop;
import me.kavlez.thrift.service.ShopService;
import org.apache.thrift.TException;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
 * Created by Kavlez.Kim@gmail.com
 */
@Slf4j
public class ShopServiceImpl implements ShopService.Iface {
    @Override
    public Shop queryShopInfo(int id) throws TException {
        return new Shop(id, "DMC_".concat(String.valueOf(id)));
    }

    @Override
    public boolean isValidShop(Shop shop) throws TException {
        return shop != null;
    }

    @Override
    public Set<Item> queryItems(int shopId) throws TException {

        if (shopId < 1) {
            return Collections.emptySet();
        }

        Set<Item> items = new HashSet<>();
        Shop shop = new Shop(1101, "DMC");
        for (int i = 0; i < 8; i++) {
            Item item = new Item(shopId + i, "sample_".concat(String.valueOf(shopId + i))
                    , "this is sample_".concat(String.valueOf(i))
                    , shop);
            items.add(item);
        }
        return items;
    }

    @Override
    public String echoServiceName() throws TException {
        return "alo! this is shop service!";
    }
}

 

除了業務實現,我們需要額外做兩件事情——構建Server和Client。

構建Server,也就是為Server指定Transparent、Protocol、Processor:

package me.kavlez.thrift.server;

import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.service.ShopService;
import me.kavlez.thrift.service.impl.ShopServiceImpl;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;

/**
 * Created by Kavlez.Kim@gmail.com
 */
@Slf4j
public class SimpleServerHolder {

    public static TServer buildServer() {
        TServerSocket serverSocket = null;
        try {
            serverSocket = new TServerSocket(8081);
        } catch (TTransportException e) {
            e.printStackTrace();
        }
        TProcessor tprocessor = new ShopService.Processor<ShopService.Iface>(new ShopServiceImpl());

        TServer.Args tArgs = new TServer.Args(serverSocket);
        tArgs.protocolFactory(new TCompactProtocol.Factory());
        tArgs.processor(tprocessor);

        TServer server = new TSimpleServer(tArgs);
        return server;
    }

    public static void main(String[] args) {
        TServer server = SimpleServerHolder.buildServer();
        log.info("server ready...");
        server.serve();
    }
}

 

相應地,構建Client:

package me.kavlez.thrift.client;

import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.service.Item;
import me.kavlez.thrift.service.ShopService;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import java.util.Set;

/**
 * Created by Kavlez.Kim@gmail.com
 */
@Slf4j
public class SimpleClientHolder {

    private TTransport transport;

    public ShopService.Client buildClient(String serverAddr, int serverPort, int timeout) throws TException {

        this.transport = new TSocket(serverAddr, serverPort, timeout);
        TProtocol protocol = new TCompactProtocol(transport);
        transport.open();

        ShopService.Client client = new ShopService.Client(protocol);
        return client;
    }

    public static void main(String[] args) {
        SimpleClientHolder simpleClientHolder = new SimpleClientHolder();
        ShopService.Client client = null;
        try {
            client = simpleClientHolder.buildClient("localhost", 8081, 1000);
            Set<Item> items = client.queryItems(666);
            log.info("return items = {}", String.valueOf(items));
        } catch (TException e) {
            e.printStackTrace();
        }

        if (null != simpleClientHolder.transport) {
            simpleClientHolder.transport.close();
        }
    }
}

依次運行Server和Client,輸出正常。

 

IDL (Interface Description Language)

提供服務的第一步是用IDL編寫Thrift文件,IDL幾乎可以描述接口所需的所有元素,接口定義中包括以下內容:

 

namespace

每個thrift文件都在自己的命名空間中,多個thrift文件可以用同一個命名空間作為標識,並指定要使用的語言的generator。

例如:

namespace java me.kavlez.thrift.service
namespace php tutorial

 

基本類型

類型 說明
bool 布爾類型
i8 (byte) 8-bit 有符號整型,對應java的byte
i16 16-bit 有符號整型,對應java的short
i32 32-bit 有符號整型,對應java的int
i64 64-bit 有符號整型,對應java的long
double 64-bit 浮點類型,對應java的double
string 字符串
binary Blob (byte array)

 

結構體

用於定義一個對象類型。

字段默認為optional,可以聲明required。
字段可以設置默認值。
結構體之間可以互相引用。
0.9.2開始可以引用自身。

struct Shop {
  1: required i32 id,
  2: required string name
}

struct Item {
  1: required i32 id,
  2: required string name = "unknown",
  3: required string detail,
  4: required Shop shop
}

 

枚舉

值是可選項,枚舉不能嵌套;基本上就是K、V的形式,不能描述太復雜的枚舉類。

enum Numberz {
  ONE = 1,
  TWO,
  THREE,
  FIVE = 5,
  SIX,
  EIGHT = 8
}

 

常量

可以自定義常量,像Map、List這樣的復雜結構可以用json表示。

const i32 INT_CONST = 1234;    // a
const map<string,string> MAP_CONST = {"hello": "world", "goodnight": "moon"}
const list<string> LIST_CONST = ["a","b","c"]

 

容器類型

不支持異構容器,容器的元素類型必須一致。
元素類型可以是service以外的任何類型。

類型 說明
map<t1,t2> Map from one type to another
list<t1> Ordered list of one type
set<t1> Set of unique elements of one type

 

自定義異常

語法上和struct相似,生成后的代碼,不同語言各有各的實現方式。

exception IllegalShopException {
  1: i32 errorCode,
  2: string message,
  3: Shop shop
}

 

service

一個函數集合,語法和java定義接口的語法類似,下面是一些例子。

service ThriftTest {
    
  /**
   * 無返回,空參數列表
   */
  void testVoid(),
    
  /**
   * 聲明返回類型、參數
   */
  string testString(1: string thing),
  
  /**
   * 返回結構體
   */
  Shop queryShopInfo(1: i32 id),
    
  /**
   * 結構體作為參數
   */
  bool isValidShop(1: Shop shop),
    
  /**
   * ...
   */
  set<Item> queryItems(1: i32 shopId),
    
  /**
   * 拋出異常
   */
  bool changeShopStatus(1: i32 shopId) throws(1: IllegalShopException err),
  
  /**
   * 多異常
   */
  bool changeItemStatus(1: i32 itemId) throws(1: IllegalShopException shopErr,2:IllegalItemException itemErr),
    
  /**
   * oneway表示該方法在客戶端發起請求后不會等待響應,返回類型必須為void
   */
  oneway void sendMessage(1:i32 shopId,2:string message)
}

 

thrift working stack

用Thrift構建服務和客戶端,架構如下:

+-------------------+       +-------------------+
|      Server       |       |      Client       |
|                   |       |                   |
| +---------------+ |       | +---------------+ |
| |               | |       | |               | |
| |   your code   | |       | |   your code   | |
| +---------------+ |       | +---------------+ |
| |   Service     | |       | |   Service     | |
| |   processor   | |       | |   Client      | |
| +---------------+ |       | +---------------+ |
| |               | |       | |               | |
| |   Protocol    | |       | |   Protocol    | |
| +---------------+ |       | +---------------+ |
| |               | |       | |               | |
| |   Transport   |<--------->|   Transport   | |
| +---------------+ |       | +---------------+ |
+-------------------+       +-------------------+

 

生成的接口類中大致包括三樣,分別是Iface、Client、Processor。
另外還有Server、Transport、Protocol。

Transport

在RPC框架的語境下談傳輸層很容易只想到網絡通信,但Transport表述的並不只是網絡通信。

不如說Transport是多種IO的抽象,其不僅限於網絡IO。

比如,基礎的TIOStreamTransport,以及其兩個子類,TSocket和TZlibTransport。

TSocket在上面的例子中作為TBinaryProtocol依賴的transport類型,與Server的TServerSocket進行通信。 

但后者是封裝了InflaterInputStream和DeflaterOutputStream,其InputStream並不要求是SocketInputStream。

從開發角度來講,如果將一個TMemoryBuffer對象傳入Protocol,並以此創建某個service對應的Client,再調用相應接口。

整個過程在代碼上並沒有什么限制,只是運行時拋出org.apache.thrift.TApplicationException。

 

 

Protocol

protocol依賴transport,決定雙方以什么協議通信,同時也是通信內容的載體。

org.apache.thrift.protocol.TProtocol中的方法聲明里,一系列readXX和writeXX,在具體實現中通常都是通過transport來完成。

以TJSONProtocol為例,其實現的TProtocol的所有write方法都是以幾個私有的write方法組織起來。

比如,writeI32和writeI64都是通過私有方法writeJSONInteger,而writeJSONInteger則是由實例化時傳入的trasnport進行write。

 

 

Processor

構建自己的server時需要在tArgs提供一個Processor,比如本文中的ShopService.Processor。
(p.s. 如果需要提供多個Processor,比如再加一個ItemService,則使用TMultiplexedProcessor即可。)

Server通過Processor執行業務邏輯代碼,文件中描述的每個函數作為ProcessFunction子類進行實例化,放入Processor的processMap中。

Server收到請求,從輸入的protocol中讀取方法名,根據方法名從processMap中拿到對應的ProcessFunction;
通過ProcessFunction的process方法執行業務邏輯,過程大體分為3步:

  • 從protocol讀入請求參數,構建參數對象;
  • 傳入參數,本地執行業務方法。假設方法名為"getItems",調用結果則為getItems_success;
  • 將結果寫入protocol,調用protocol.writeXX;

Client

像本文中,指定Transport和Protocol,構建ShopService.Client,客戶端通過Client對象像調用本地方法一樣調用queryItems;
在ShopService中,Client類同樣實現了ShopService.Iface中的方法,以queryItems為例,其實現如下:

public Shop queryShopInfo(int id) throws org.apache.thrift.TException {
  send_queryShopInfo(id);
  return recv_queryShopInfo();
}

 

在send_queryShopInfo,構建該函數對應的xx_args對象,將其寫入oprot,並通過oprot.tranport進行flush;

相應地,recv_queryShopInfo就是從iport中讀取函數的返回值,構建該函數對應的queryShopInfo_result對象。

 

Server

將Transport、Protocol和Processor集合在一起就是一個完整的Server,父類TServer提供了唯一的抽象方法——serve()。

以TSimpleServer為例,serve中通過java.lang.ServerSocket的accept獲取client Socket並轉為client Transport,以此獲取相應的Processor、創建相應的inputTransport、outputTransport和iProt、oProt。

(p.s. 默認的TProcessorFactory沒有子類,其getProcessor(Transport)和並沒有通過transport來獲取processor。可以用來擴展,比如用一個server提供多版本服務之類的。)

剩下的工作由Processor進行處理,從iPort讀入請求信息並構造TMessage,找到相應的ProcessFunction並執行其process方法,這個在上面說過。

Thrift為TServer提供了3種實現:

  • TSimpleServer: 單線程ServerSocket實現,僅用於測試;

  • TThreadPoolServer: 封裝了ThreadPoolExecutor,用內部類WorkerProcess表示單個請求,通過每個WorkerProcess對象的transport獲取相應的Processor和Protocol,調用業務代碼並返回;

  • AbstractNonblockingServer: 非阻塞server抽象類,其serve()方法即整個過程的skeleton,serve()中調用的方法交給其子類提供具體實現。

    public void serve() {
        // start any IO threads
        if (!startThreads()) {
          return;
        }
    
        // start listening, or exit
        if (!startListening()) {
          return;
        }
    
        setServing(true);
    
        // this will block while we serve
        waitForShutdown();
    
        setServing(false);
    
        // do a little cleanup
        stopListening();
    }

 

AbstractNonblockingServer的3個子類,分別為:

  • TNonblockingServer: 實現父類的startThreads(),啟動selector線程(也就是SelectAcceptThread,父類聲明了protected final Selector selector),開始輪詢SelectedKeys,檢查狀態並進行相應處理:

    if (key.isAcceptable()) {
        handleAccept();
    } else if (key.isReadable()) {
    handleRead(key);
    } else if (key.isWritable()) {
    handleWrite(key);
    } else {
    LOGGER.warn("Unexpected state in select! " + key.interestOps());
    }

    另外,使用TNonblockingServer時transport必須為TFramedTransport,以此保證能正確讀取單次方法調用。

  • THsHaServer: "HsHa",即"Half-Sync/Half-Async",是TNonblockingServer的子類。

    工作流程和TNonblockingServer相似,主要區別在與handleRead()
    handleRead中完成讀取后,另外一項重要的工作就是requestInvoke(buffer),也就是執行processor.process(iProt,oProt)。

    不過,TNonblockingServer是單線程執行,而THsHaServer則是通過線程池。
    將FrameBuffer裝進Invocation(其run方法即frameBuffer.invoke()),提交給線程池處理。

    線程池參數的默認值如下:

    corePoolSize = 5;
    maximumPoolSize = Integer.MAX_VALUE;
    keepAliveTime = 60;
    workQueue = new LinkedBlockingQueue<Runnable>();
  • TThreadedSelectorServer: 進一步加強HsHaServer,用一個AcceptThread接收所有連接請求,並擔任負載均衡的角色。

    負載均衡的工作由構造器參數中的SelectorThreadLoadBalancer進行,該類只提供了一種實現——對已注冊的selector線程列表進行round robin。
    AcceptThread處理連接時,通過SelectorThreadLoadBalancer選出selector線程,將接收到的socketChannel放入selector線程的隊列中。

    雖然TThreadedSelectorServer的requestInvoke也是使用線程池進行,但線程池的默認配置和THsHaServer不同,默認時為corePoolSize為5的FixedThreadPool。
    如果corePoolSize小為0,則由caller線程執行。

 

最后,把之前的例子修改一下,看看效果。

AbstractTServerHolder.java

package me.kavlez.thrift.server;

import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TTransportException;

public abstract class AbstractTServerHolder {
    private TServer tServer;

    public abstract TServer build() throws TTransportException;
}

ThreadedSelectorServerHolder.java

package me.kavlez.thrift.server;

import me.kavlez.thrift.service.ShopService;
import me.kavlez.thrift.service.impl.ShopServiceImpl;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TTransportException;

public class ThreadedSelectorServerHolder extends AbstractTServerHolder {
    @Override
    public TServer build() throws TTransportException {
        TNonblockingServerTransport transport = new TNonblockingServerSocket(8090);
        TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport);

        ShopService.Processor<ShopService.Iface> shopServiceProcessor
                = new ShopService.Processor<ShopService.Iface>(new ShopServiceImpl());
        args.processor(shopServiceProcessor)
                .protocolFactory(new TBinaryProtocol.Factory())
                .transportFactory(new TFramedTransport.Factory());

        TServer server = new TThreadedSelectorServer(args);
        return server;

    }
}

Launcher.java

package me.kavlez.thrift;

import lombok.extern.slf4j.Slf4j;
import me.kavlez.thrift.client.AbstractShopServiceClientHolder;
import me.kavlez.thrift.client.NonBlockingClientHolder;
import me.kavlez.thrift.client.ShopServiceClientAgent;
import me.kavlez.thrift.server.AbstractTServerHolder;
import me.kavlez.thrift.server.ThreadedSelectorServerHolder;
import me.kavlez.thrift.service.Item;
import me.kavlez.thrift.service.ShopService;
import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TTransportException;

import java.io.FileNotFoundException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@Slf4j
public class Launcher {

    static class TServerClientHolderPair {
        private AbstractTServerHolder tServerHolder;
        private Class<? extends AbstractShopServiceClientHolder> clientHolderClass;

        public TServerClientHolderPair(AbstractTServerHolder tServerHolder, Class<? extends AbstractShopServiceClientHolder> clientHolderClass) {
            this.tServerHolder = tServerHolder;
            this.clientHolderClass = clientHolderClass;
        }
    }

    public static void main(String[] args) throws InterruptedException, TTransportException, FileNotFoundException {

        final AbstractTServerHolder serverHolder = new ThreadedSelectorServerHolder();
        final TServer tServer = serverHolder.build();

        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<?> serverFuture = executorService.submit(new Runnable() {
            @Override
            public void run() {
                tServer.serve();
            }
        });

        Thread.sleep(100);

        int times = 10;
        final CountDownLatch countDownLatch = new CountDownLatch(times);

        class ShopServiceClientTask implements Runnable {

            @Override
            public void run() {
                AbstractShopServiceClientHolder clientHolder = null;
                clientHolder = new NonBlockingClientHolder();

                try {
                    ShopService.Iface shopService = new ShopServiceClientAgent(clientHolder.build());
                    for (int i = 0; i < 1000; i++) {
                        Set<Item> items = shopService.queryItems(666);
                        log.info("return items = {}", String.valueOf(items));
                    }

                } catch (TException e) {
                    log.info("thread name={} get TException", Thread.currentThread().getName(), e);
                } finally {
                    clientHolder.close();
                    countDownLatch.countDown();
                }
            }
        }

        long start = System.currentTimeMillis();

        for (int i = 0; i < times; i++) {
            executorService.submit(new ShopServiceClientTask());
        }

        countDownLatch.await();
        log.info("used {} ms ", System.currentTimeMillis() - start);
        tServer.setShouldStop(true);
        tServer.stop();
        executorService.shutdown();

    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM