Spring Boot整合Thrift RPC


【轉載】 https://coder4.com/homs_online/spring-boot/sb-thrift.html

Spring Boot自動配置簡介

在介紹RPC之前,我們先來學習下Spring Boot的自動配置。

我們前面已經提到:Spring Boot來源於Spring,並且做了眾多改進,其中最有用的設計理念是約定優於配置,它通過自動配置功能(大多數開發者平時習慣設置的配置作為默認配置)為開發者快速、准確地構建出標准化的應用。

以集成MySQL數據庫為例,在Spring Boot出現之前,我們要

  1. 配置JDBC驅動依賴
  2. 配置XML文件中數據源
  3. 配置XML中的DataSource Bean
  4. 配置XML中的XXXTemplate Bean
  5. 配置XML中的XXXTransactionManager Bean

有了Spring Boot的自動配置后,自動配置幫我們生成了各種DataSource、XXXTemplate、XXXTransactionManager,我們所需要做的只有一條,就是激活它

  1. maven中依賴包含自動配置的包
  2. 配置JDBC驅動依賴
  3. yaml文件中定義數據源

自動配置進行智能檢測,只要滿足上述3個條件,其他的Bean都會被自動生成並注入到Spring環境中。我們需要使用時只需要@Autowired一下就可以了,是不是非常簡單!

由於篇幅所限,本書不會對自動配置的書寫做零起點教學,如果你想了解自動配置的原理,可以參考這篇文章spring boot實戰(第十三篇)自動配置原理分析

在本節的后續部分,我們會以Thrift RPC Server為例,看看自動配置是如何書寫的。

RPC簡介

遠程過程調用(remote procedure call或簡稱RPC),指的是運行於本地(客戶端)的程序像調用本地程序一樣,直接調用另一台計算機(服務器端)的程序,而程序員無需額外為遠程交互做額外的編程。

RPC極大地簡化了分布似乎系統中節點之間網絡通信的開發工作量,是微服務架構中的重要組件之一。

在本書中,我們選用Thrift作為RPC框架。由於篇幅所限,我們不會對Thrift RPC作出詳盡的介紹,如果你還不熟悉,可以參考官方的快速入門文檔

Spring Boot整合Thrift RPC服務端

簡要來說,啟動一個Thrift RPC的服務端需要如下步驟:

  1. 書寫DSL(.thrift文件),定義函數、數據結構等。
  2. 編譯並生成樁代碼。
  3. 編寫Handler(RPC的邏輯入口)。
  4. 基於上述Handler,構造Processor。
  5. 構造Server,Thrift提供了多種服務端供選擇,常用的有TThreadPoolServer(多線程服務器)和TNonblockingServer(非阻塞服務器)。
  6. 設置Server的Protocol,類似的,Thrift提供了多種傳輸協議,最常用的是TBinaryProtocol和TCompactProtocol。
  7. 設置Server的Transport(Factory),用這種方式指定底層的傳輸協議,常用的有TFramedTransport、TNonBlockingTransport,不同的Transport可以類似Java的IOStreawm方式,相互疊加,以產生更強大的效果。

上述對Thrift服務器的架構做了簡要介紹,如果想更深入了解,可以自行閱讀官方源碼

首先,我們來看一下thrift定義(根據上一節的介紹,文件放在lmsia-abc-common包中)

namespace java com.coder4.lmsia.abc service lmsiaAbcThrift { string sayHi() } 

調用thrift進行編譯后,我們也將對應的樁文件放置在lmsia-abc-client下,目錄結構可以參見上一節。

為了更方便的在Spring Boot中集成Thrift服務器,我將相應代碼抽取成了公用庫lmsia-thrift-server


├── build.gradle
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── README.md
├── settings.gradle
└── src
    ├── main
    │   ├── java
    │   │   └── com
    │   │       └── coder4
    │   │           └── lmsia
    │   │               └── thrift
    │   │                   └── server
    │   │                       ├── configuration
    │   │                       │   └── ThriftServerConfiguration.java
    │   │                       └── ThriftServerRunnable.java
    │   └── resources
    │       └── META-INF
    │           └── spring.factories
    └── test
        └── java

簡單解析下項目結構: gradle相關: 與前節介紹的類似,只不過這里是單項目功能。 ThriftServerConfiguration: 自動配置,當滿足條件后會自動激活,激活后可自動啟動Thrift RPC服務。 ThriftServerRunnable: Thrift RPC服務器的構造邏輯、運行線程。 spring.factories: 當我們以類庫方式提供自動配置時,需要增加這個spring.factories,讓別的項目能"定位到"要檢查的自動配置。

首先,我們來看一下ThriftServerRunnable.java


package com.coder4.lmsia.thrift.server; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author coder4 */ public class ThriftServerRunnable implements Runnable { private static final int THRIFT_PORT = 3000; private static final int THRIFT_TIMEOUT = 5000; private static final int THRIFT_TCP_BACKLOG = 5000; private static final int THRIFT_CORE_THREADS = 128; private static final int THRIFT_MAX_THREADS = 256; private static final int THRIFT_SELECTOR_THREADS = 16; private static final TProtocolFactory THRIFT_PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); // 16MB private static final int THRIFT_MAX_FRAME_SIZE = 16 * 1024 * 1024; // 4MB private static final int THRIFT_MAX_READ_BUF_SIZE = 4 * 1024 * 1024; protected ExecutorService threadPool; protected TServer server; protected Thread thread; private TProcessor processor; private boolean isDestroy = false; public ThriftServerRunnable(TProcessor processor) { this.processor = processor; } public TServer build() throws TTransportException { TNonblockingServerSocket.NonblockingAbstractServerSocketArgs socketArgs = new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs(); socketArgs.port(THRIFT_PORT); socketArgs.clientTimeout(THRIFT_TIMEOUT); socketArgs.backlog(THRIFT_TCP_BACKLOG); TNonblockingServerTransport transport = new TNonblockingServerSocket(socketArgs); threadPool = new ThreadPoolExecutor(THRIFT_CORE_THREADS, THRIFT_MAX_THREADS, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); TTransportFactory transportFactory = new TFramedTransport.Factory(THRIFT_MAX_FRAME_SIZE); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport) .selectorThreads(THRIFT_SELECTOR_THREADS) .executorService(threadPool) .transportFactory(transportFactory) .inputProtocolFactory(THRIFT_PROTOCOL_FACTORY) .outputProtocolFactory(THRIFT_PROTOCOL_FACTORY) .processor(processor); args.maxReadBufferBytes = THRIFT_MAX_READ_BUF_SIZE; return new TThreadedSelectorServer(args); } @Override public void run() { try { server = build(); server.serve(); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Start Thrift RPC Server Exception"); } } public void stop() throws Exception { threadPool.shutdown(); server.stop(); } } 

我們來解釋一下:

  • build方法用於構造一個可供運行的Thrift RPC Server
    1. 構造非阻塞Socket,並設置監聽端口、超時
    2. 構造非阻塞Transport
    3. 構造線程池,在這里我們的服務器模型是非阻塞線程池RPC服務器。
    4. 構造底層傳輸協議即TFramedTransport
    5. 構造ThriftServer,並設置前面構造的非阻塞Transport、線程池、協議TBinaryProtocol
  • 整個ThriftServerRunnable類是一個線程Runnablerun,run函數中構造RPC服務,並啟動服務(servee)
  • stop服務提供停止服務的方法

下面我們來看一下自動配置ThriftServerConfiguration.java:

package com.coder4.lmsia.thrift.server.configuration; import com.coder4.lmsia.thrift.server.ThriftServerRunnable; import org.apache.thrift.TProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Configuration; import java.util.concurrent.TimeUnit; /** * @author coder4 */ @Configuration @ConditionalOnBean(value = {TProcessor.class}) public class ThriftServerConfiguration implements InitializingBean, DisposableBean { private Logger LOG = LoggerFactory.getLogger(ThriftServerConfiguration.class); private static final int GRACEFUL_SHOWDOWN_SEC = 3; @Autowired private TProcessor processor; private ThriftServerRunnable thriftServer; private Thread thread; @Override public void destroy() throws Exception { LOG.info("Wait for graceful shutdown on destroy(), {} seconds", GRACEFUL_SHOWDOWN_SEC); Thread.sleep(TimeUnit.SECONDS.toMillis(GRACEFUL_SHOWDOWN_SEC)); LOG.info("Shutdown rpc server."); thriftServer.stop(); thread.join(); } @Override public void afterPropertiesSet() throws Exception { thriftServer = new ThriftServerRunnable(processor); thread = new Thread(thriftServer); thread.start(); } } 

這是我們編寫的第一個自動配置,我們稍微詳細的解釋一下:

  • 啟動條件: 僅當服務提供了TProcessor才啟用,我們稍后會在lmsia-abc項目中看到,后者封裝了RPC的樁入口,提供了TProcessor。
  • InitializingBean: 自動配置實現了InitializingBean,為什么要實現這個接口呢?當這個自動配置被初始化時,所有Autowired的屬性被自動注入(即Processor),而前面ThriftServerRunnable中我么已經看到,只有拿到了TProcessor,才能啟動RPC服務。因此,我們使用了InitializingBean,它自帶了afterPropertiesSet這個回調,會在所有屬性被注入完成后,調用這個回調函數。
    • 在這里,我們調用了ThriftServerRunnable實現了Thrift RPC服務器的啟動。
  • DisposableBean: 除了InitializingBean,我們還實現了DisposableBean。看名字就可以知道,這是Spring為了服務關閉時清理資源而設計的接口。事實也是如此,當服務關閉時,會依次調用每個自動配置,如果實現了DisposableBean,則回調destroy函數。
    • 在這里,我們先讓線程休眠3秒,然后才關閉Thrift RPC服務,這主要是為了Graceful Shutdown而設計的("優雅關閉"),關於這一點,我們會在下一節會做詳細講解。

最后,我們的自動配置默認是無法被發現的,需要一個配置文件spring.factories:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.coder4.lmsia.thrift.server.configuration.ThriftServerConfiguration

解讀完lmsia-thrift-server后,我們看看如何將它整合進lmsia-abc項目中。

  1. 在lmsia-abc-server子項目中的build.gradle中加入:

    compile 'com.github.liheyuan:lmsia-thrift-server:0.0.1'
    
  2. 提供一個TProcessor,如前文所述,這是啟用自動配置的必要條件,ThriftProcessorConfiguration: ```java package com.coder4.lmsia.abc.server.configuration;

import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift; import com.coder4.lmsia.abc.server.thrift.ThriftServerHandler; import org.apache.thrift.TProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

/**

  • @author coder4 */ @Configuration @ConditionalOnProperty(name = "thriftServer.enabled", matchIfMissing = true) public class ThriftProcessorConfiguration {

    @Bean(name = "thriftProcessor") public TProcessor processor(ThriftServerHandler handler) {

     return new LmsiaAbcThrift.Processor(handler);
    

    }

}


我們簡單解釋下:
* 這也是一個自動配置,僅當配置文件中thriftServer.enabled=true時才啟用(不配置默認true)
* 提供的TProcessor,需要依賴ThriftServerHandler,這個就是Thrift生成的樁函數,項目結構分析中已經提到過,這是RPC服務器的邏輯入口。

怎么樣,使用了自動配置后,啟動一個Thrift 服務器是不是非常簡單?

## Spring Boot整合Thrift RPC客戶端

只有服務端是不行的,還需要有客戶端。

類似地,為了方便的生成客戶端,我們把代碼進行了整理和抽象,放到了[lmsia-thrift-client](https://github.com/liheyuan/lmsia-thrift-client)項目中。

首先看一下項目結構:
```shell
├── build.gradle
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── README.md
├── settings.gradle
└── src
    ├── main
    │   ├── java
    │   │   └── com
    │   │       └── coder4
    │   │           └── lmsia
    │   │               └── thrift
    │   │                   └── client
    │   │                       ├── ThriftClient.java
    │   │                       ├── AbstractThriftClient.java
    │   │                       ├── EasyThriftClient.java
    │   │                       ├── K8ServiceThriftClient.java
    │   │                       ├── K8ServiceKey.java
    │   │                       ├── builder
    │   │                       │   ├── EasyThriftClientBuilder.java
    │   │                       │   └── K8ServiceThriftClientBuilder.java
    │   │                       ├── func
    │   │                       │   ├── ThriftCallFunc.java
    │   │                       │   └── ThriftExecFunc.java
    │   │                       ├── pool
    │   │                       │   ├── TTransportPoolFactory.java
    │   │                       │   └── TTransportPool.java
    │   │                       └── utils
    │   │                           └── ThriftUrlStr.java
    │   └── resources
    └── test
        └── java
            └── LibraryTest.java

解釋下項目結構:

  • gradle相關的與之前類似,不再贅述
  • ThriftClient相關,定義了Thrift的客戶端
    1. ThriftClient 抽象了客戶端的接口
    2. AbstractThriftClient 實現了除連接外的Thrift Client操作
    3. EasyThriftClient 使用IP和端口直連的Thrift Client
    4. K8ServiceThriftClient 使用Kubernetes服務名字(根據微服務自動發現一節中的介紹,服務名字實際也是Host)和端口的Thrift Client,並內置了連接池。
  • func 函數編程工具類
  • builder 方便快速構造上述兩種Thrift Client
  • pool 客戶端連接池

本小節主要對IP、端口直連的客戶端即EasyThriftClient進行介紹。關於支持服務自動發現以及連接池功能的K8ServiceThriftClient,將在下一節進行介紹。

先看一下接口定義,ThriftClient:

package com.coder4.lmsia.thrift.client; import com.coder4.lmsia.thrift.client.func.ThriftCallFunc; import com.coder4.lmsia.thrift.client.func.ThriftExecFunc; import org.apache.thrift.TServiceClient; import java.util.concurrent.Future; /** * @author coder4 */ public interface ThriftClient<TCLIENT extends TServiceClient> { /** * sync call with return value * @param tcall thrift rpc client call * @param <TRET> return type * @return */ <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall); /** * sync call without return value * @param texec thrift rpc client */ void exec(ThriftExecFunc<TCLIENT> texec); /** * async call with return value * @param tcall thrift rpc client call * @param <TRET> * @return */ <TRET> Future<TRET> asyncCall(ThriftCallFunc<TCLIENT, TRET> tcall); /** * asnyc call without return value * @param texec thrift rpc client call */ <TRET> Future<?> asyncExec(ThriftExecFunc<TCLIENT> texec); } 

這里需要解釋一下,上述實際分成了兩大類:

  • exec 無返回值的rpc調用
  • call 有返回值的調用

這里使用了Java 8的函數式編程進行抽象。如果不太熟悉的朋友,可以自行查閱相關資料。

在函數式編程的幫助下,我們可以將每一個rpc調用都分為同步和異步兩種,異步的調用會返回一個Future。

再來看一下AbstractThriftClient:

/** * @(#)AbstractThriftClient.java, Aug 01, 2017. * <p> * Copyright 2017 fenbi.com. All rights reserved. * FENBI.COM PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package com.coder4.lmsia.thrift.client; import com.coder4.lmsia.thrift.client.func.ThriftCallFunc; import com.coder4.lmsia.thrift.client.func.ThriftExecFunc; import org.apache.thrift.TServiceClient; import org.apache.thrift.TServiceClientFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TTransport; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author coder4 */ public abstract class AbstractThriftClient<TCLIENT extends TServiceClient> implements ThriftClient<TCLIENT> { protected static final int THRIFT_CLIENT_DEFAULT_TIMEOUT = 5000; protected static final int THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 16; private Class<?> thriftClass; private static final TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory(); private TServiceClientFactory<TCLIENT> clientFactory; // For async call private ExecutorService threadPool; public void init() { try { clientFactory = getThriftClientFactoryClass().newInstance(); } catch (Exception e) { throw new RuntimeException(); } if (!check()) { throw new RuntimeException("Client config failed check!"); } threadPool = new ThreadPoolExecutor( 10, 100, 0, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>()); } protected boolean check() { if (thriftClass == null) { return false; } return true; } @Override public <TRET> Future<TRET> asyncCall(ThriftCallFunc<TCLIENT, TRET> tcall) { return threadPool.submit(() -> this.call(tcall)); } @Override public <TRET> Future<?> asyncExec(ThriftExecFunc<TCLIENT> texec) { return threadPool.submit(() -> this.exec(texec)); } protected TCLIENT createClient(TTransport transport) throws Exception { // Step 1: get TProtocol TProtocol protocol = protocolFactory.getProtocol(transport); // Step 2: get client return clientFactory.getClient(protocol); } private Class<TServiceClientFactory<TCLIENT>> getThriftClientFactoryClass() { Class<TCLIENT> clientClazz = getThriftClientClass(); if (clientClazz == null) { return null; } for (Class<?> clazz : clientClazz.getDeclaredClasses()) { if (TServiceClientFactory.class.isAssignableFrom(clazz)) { return (Class<TServiceClientFactory<TCLIENT>>) clazz; } } return null; } private Class<TCLIENT> getThriftClientClass() { for (Class<?> clazz : thriftClass.getDeclaredClasses()) { if (TServiceClient.class.isAssignableFrom(clazz)) { return (Class<TCLIENT>) clazz; } } return null; } public void setThriftClass(Class<?> thriftClass) { this.thriftClass = thriftClass; } } 

上述抽象的Thrift客戶端實現了如下功能:

  1. 客戶端線程池,這里主要是為異步調用准備的,與之前構造的服務端的線程池是完全不同的。
    • asyncCall和asyncExec使用了線程池來完成異步調用
  2. thriftClass 存儲了Thrift的樁代碼了類,不同業務生成的ThriftClass不一樣,所以這里存儲了class。
  3. createClient提供了共用函數,傳入一個transport,即可構造生成一個Thrift Client,特別注意的是,這里設定的通信協議為TBinaryProtocol,必須與服務端保持一致,否則無法成功通信。

由於call和exec與連接實現較為相關,因此並未在這一層中實現,最后我們來看一下EasyThriftClient:

package com.coder4.lmsia.thrift.client; import com.coder4.lmsia.thrift.client.func.ThriftCallFunc; import com.coder4.lmsia.thrift.client.func.ThriftExecFunc; import org.apache.thrift.TServiceClient; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; /** * @author coder4 */ public class EasyThriftClient<TCLIENT extends TServiceClient> extends AbstractThriftClient<TCLIENT> { private static final int EASY_THRIFT_BUFFER_SIZE = 1024 * 16; protected String thriftServerHost; protected int thriftServerPort; @Override protected boolean check() { if (thriftServerHost == null || thriftServerHost.isEmpty()) { return false; } if (thriftServerPort <= 0) { return false; } return super.check(); } private TTransport borrowTransport() throws Exception { TSocket socket = new TSocket(thriftServerHost, thriftServerPort, THRIFT_CLIENT_DEFAULT_TIMEOUT); TTransport transport = new TFramedTransport( socket, THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE); transport.open(); return transport; } private void returnTransport(TTransport transport) { if (transport != null && transport.isOpen()) { transport.close(); } } private void returnBrokenTransport(TTransport transport) { if (transport != null && transport.isOpen()) { transport.close(); } } @Override public <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall) { // Step 1: get TTransport TTransport tpt = null; try { tpt = borrowTransport(); } catch (Exception e) { throw new RuntimeException(e); } // Step 2: get client & call try { TCLIENT tcli = createClient(tpt); TRET ret = tcall.call(tcli); returnTransport(tpt); return ret; } catch (Exception e) { returnBrokenTransport(tpt); throw new RuntimeException(e); } } @Override public void exec(ThriftExecFunc<TCLIENT> texec) { // Step 1: get TTransport TTransport tpt = null; try { tpt = borrowTransport(); } catch (Exception e) { throw new RuntimeException(e); } // Step 2: get client & exec try { TCLIENT tcli = createClient(tpt); texec.exec(tcli); returnTransport(tpt); } catch (Exception e) { returnBrokenTransport(tpt); throw new RuntimeException(e); } } public String getThriftServerHost() { return thriftServerHost; } public void setThriftServerHost(String thriftServerHost) { this.thriftServerHost = thriftServerHost; } public int getThriftServerPort() { return thriftServerPort; } public void setThriftServerPort(int thriftServerPort) { this.thriftServerPort = thriftServerPort; } 

簡單解釋下上述代碼

  1. 需要外部傳入RPC服務器的主機名和端口 thriftServerHost和thriftServerPort
  2. borrowTransport完成Transport(Thrift中類似Socket的抽象) 的構造,注意這里要使用TFramedTransport,與之前服務端的構造保持一致。
  3. returnTransport關閉Transport
  4. returnBrokenTransport關閉出異常的Transport
  5. call和exec 在拿到Transport后,使用函數式編程的方式,完成rpc調用,如果有異常則關閉連接。

最后我們來看一下對應的Builder,EasyThriftClientBuilder:

package com.coder4.lmsia.thrift.client.builder; import com.coder4.lmsia.thrift.client.EasyThriftClient; import org.apache.thrift.TServiceClient; /** * @author coder4 */ public class EasyThriftClientBuilder<TCLIENT extends TServiceClient> { private final EasyThriftClient<TCLIENT> client = new EasyThriftClient<>(); protected EasyThriftClient<TCLIENT> build() { client.init(); return client; } protected EasyThriftClientBuilder<TCLIENT> setHost(String host) { client.setThriftServerHost(host); return this; } protected EasyThriftClientBuilder<TCLIENT> setPort(int port) { client.setThriftServerPort(port); return this; } protected EasyThriftClientBuilder<TCLIENT> setThriftClass(Class<?> thriftClass) { client.setThriftClass(thriftClass); return this; } } 

Builder的代碼比較簡單,就是以鏈式調用的方式,通過主機和端口,方便地構造一個EasyThriftClient。

看了EasyThriftClient后下面我們來看一下如何集成到項目中。

Gradle子項目划分與微服務的代碼結構一節中,我們已經提到,將每個微服務的RPC客戶端放在xx-client子工程中,現在我們再來回顧下lmsia-abc-client的目錄結構。

├── build.gradle
└── src
    ├── main
    │   ├── java
    │   │   └── com
    │   │       └── coder4
    │   │           └── lmsia
    │   │               └── abc
    │   │                   └── client
    │   │                       ├── configuration
    │   │                       │   └── LmsiaAbcThriftClientConfiguration.java
    │   │                       ├── LmsiaAbcEasyThriftClientBuilder.java
    │   │                       └── LmsiaK8ServiceThriftClientBuilder.java
    │   └── resources
    │       └── META-INF
    │           └── spring.factories
    └── test

我們簡單介紹一下:

  1. LmsiaAbcThriftClientConfiguration: 客戶端自動配置,當激活時,自動生成lmsia-abc對應的RPC服務的客戶端。引用者直接@Autowired一下,就可以使用了。
  2. LmsiaAbcEasyThriftClientBuilder: EasyThriftClient構造器,主要是自動配置需要。
  3. spring.factories: 與服務端的自動配置類似,需要在這個文件中指定自動配置的類路徑,才能讓Spring Boot自動掃描到自動配置。
  4. 其他K8ServiceThriftClient相關的部分,我們將在下一小節進行介紹。

LmsiaAbcEasyThriftClientBuilder文件:

package com.coder4.lmsia.abc.client; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client; import com.coder4.lmsia.thrift.client.ThriftClient; import com.coder4.lmsia.thrift.client.builder.EasyThriftClientBuilder; /** * @author coder4 */ public class LmsiaAbcEasyThriftClientBuilder extends EasyThriftClientBuilder<Client> { public LmsiaAbcEasyThriftClientBuilder(String host, int port) { setThriftClass(LmsiaAbcThrift.class); setHost(host); setPort(port); } public static ThriftClient<Client> buildClient(String host, int port) { return new LmsiaAbcEasyThriftClientBuilder(host, port).build(); } } 

上述Builder完成了實際的參數填充,主要有:

  1. ThriftClient的樁代碼類設置(LmsiaAbcThrift.class)
  2. 設置主機名和端口

LmsiaAbcClientConfiguration文件:

package com.coder4.lmsia.abc.client.configuration; import com.coder4.lmsia.abc.client.LmsiaAbcEasyThriftClientBuilder; import com.coder4.lmsia.abc.client.LmsiaK8ServiceClientBuilder; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client; import com.coder4.lmsia.thrift.client.K8ServiceKey; import com.coder4.lmsia.thrift.client.ThriftClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; @Configuration public class LmsiaAbcThriftClientConfiguration { private Logger LOG = LoggerFactory.getLogger(getClass()); @Bean(name = "lmsiaAbcThriftClient") @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient") @ConditionalOnProperty(name = {"lmsiaAbcThriftServer.host", "lmsiaAbcThriftServer.port"}) public ThriftClient<Client> easyThriftClient( @Value("${lmsiaAbcThriftServer.host}") String host, @Value("${lmsiaAbcThriftServer.port}") int port ) { LOG.info("######## LmsiaAbcClientConfiguration ########"); LOG.info("easyClient host = {}, port = {}", host, port); return LmsiaAbcEasyThriftClientBuilder.buildClient(host, port); } } 

如上所示,滿足兩個條件時,會自動構造LmsiaAbcEasyThriftClient:

  1. 還沒有生成其他的LmsiaAbcEasyThriftClient(ConditionalOnMissingBean)
  2. 配置中指定了lmsiaAbcThriftServer.host和lmsiaAbcThriftServer.port

根據我們前面的介紹,大家應該能理解,雖然有自動配置,但上述配置是一種很糟糕的方式。試想一下,如果我們的服務依賴了5個其他RPC服務,那么豈不是要分別配置5組IP和端口?此外,這種方式也無法支持節點的負載均衡。

如何解決這個問題呢?我們將在K8ServiceThriftClient中解決。

本小節的最后,我們看一下spring.factories:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.coder4.lmsia.abc.client.configuration.LmsiaAbcThriftClientConfiguration

和之前lmsia-abc-server子工程中的文件類似,這里設置了自動配置的詳細類路徑,方便Spring Boot的自動掃描。

K8ServiceThriftClient

在對EasyThriftClient的介紹中,我們發現了一個問題,需要單獨配置IP和端口,不支持服務自動發現。

此外,在這個客戶端的實現中,默認每次都要建立新的連接。而對於后端服務而言,RPC的服務端和客戶端多數都是在內網環境中,連接情況比較穩定,可以通過連接池的方式減少連接握手開銷,從而提升RPC服務的性能。如果你對連接池的原理還不太熟悉,可以參考百科連接池

為此,我們本將介紹K8ServiceThriftClient,它很好的解決了上述問題。

首先,我們使用commons-pool2來構建了TTransport層的連接池。

TTransportPoolFactory:

package com.coder4.lmsia.thrift.client.pool; import com.coder4.lmsia.thrift.client.K8ServiceKey; import org.apache.commons.pool2.BaseKeyedPooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; /** * @author coder4 */ public class TTransportPoolFactory extends BaseKeyedPooledObjectFactory<K8ServiceKey, TTransport> { protected static final int THRIFT_CLIENT_DEFAULT_TIMEOUT = 5000; protected static final int THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 16; @Override public TTransport create(K8ServiceKey key) throws Exception { if (key != null) { String host = key.getK8ServiceHost(); int port = key.getK8ServicePort(); TSocket socket = new TSocket(host, port, THRIFT_CLIENT_DEFAULT_TIMEOUT); TTransport transport = new TFramedTransport( socket, THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE); transport.open(); return transport; } else { return null; } } @Override public PooledObject<TTransport> wrap(TTransport transport) { return new DefaultPooledObject<>(transport); } @Override public void destroyObject(K8ServiceKey key, PooledObject<TTransport> obj) throws Exception { obj.getObject().close(); } @Override public boolean validateObject(K8ServiceKey key, PooledObject<TTransport> obj) { return obj.getObject().isOpen(); } } 

上述代碼主要完成以下功能:

  1. 連接超時配置(5秒)
  2. create, 生成新連接(TTransport),這里與之前的EasyThriftClient非常類似,不再贅述
  3. 驗證連接是否有效,通過TTransport的isOpen判斷。

TTransportPool:

package com.coder4.lmsia.thrift.client.pool; import com.coder4.lmsia.thrift.client.K8ServiceKey; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author coder4 */ public class TTransportPool extends GenericKeyedObjectPool<K8ServiceKey, TTransport> { private Logger LOG = LoggerFactory.getLogger(getClass()); private static int MAX_CONN = 1024; private static int MIN_IDLE_CONN = 8; private static int MAX_IDLE_CONN = 32; public TTransportPool(TTransportPoolFactory factory) { super(factory); setTimeBetweenEvictionRunsMillis(45 * 1000); setNumTestsPerEvictionRun(5); setMaxWaitMillis(30 * 1000); setMaxTotal(MAX_CONN); setMaxTotalPerKey(MAX_CONN); setMinIdlePerKey(MIN_IDLE_CONN); setMaxTotalPerKey(MAX_IDLE_CONN); setTestOnCreate(true); setTestOnBorrow(true); setTestWhileIdle(true); } @Override public TTransportPoolFactory getFactory() { return (TTransportPoolFactory) super.getFactory(); } public void returnBrokenObject(K8ServiceKey key, TTransport transport) { try { invalidateObject(key, transport); } catch (Exception e) { LOG.warn("return broken key " + key); e.printStackTrace(); } } } 

上述代碼主要是完成連接池的配置,比較直觀:

  1. 設置最大連接數1024
  2. 設置最大空閑數32,最小空閑數8,每間隔45秒嘗試更改維護連接池中的連接數量。
  3. 當每次"創建"、從池子中"借用"、"空閑"時,檢查連接是否有效。

下面我們來看一下如何在K8ServiceThriftClient中使用:

package com.coder4.lmsia.thrift.client; import com.coder4.lmsia.thrift.client.func.ThriftCallFunc; import com.coder4.lmsia.thrift.client.func.ThriftExecFunc; import com.coder4.lmsia.thrift.client.pool.TTransportPool; import com.coder4.lmsia.thrift.client.pool.TTransportPoolFactory; import org.apache.thrift.TServiceClient; import org.apache.thrift.transport.TTransport; public class K8ServiceThriftClient<TCLIENT extends TServiceClient> extends AbstractThriftClient<TCLIENT> { private K8ServiceKey k8ServiceKey; private TTransportPool connPool; @Override public void init() { super.init(); // check if (k8ServiceKey == null) { throw new RuntimeException("invalid k8ServiceName or k8Serviceport"); } // init pool connPool = new TTransportPool(new TTransportPoolFactory()); } @Override public <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall) { // Step 1: get TTransport TTransport tpt = null; K8ServiceKey key = getConnBorrowKey(); try { tpt = connPool.borrowObject(key); } catch (Exception e) { throw new RuntimeException(e); } // Step 2: get client & call try { TCLIENT tcli = createClient(tpt); TRET ret = tcall.call(tcli); returnTransport(key, tpt); return ret; } catch (Exception e) { returnBrokenTransport(key, tpt); throw new RuntimeException(e); } } @Override public void exec(ThriftExecFunc<TCLIENT> texec) { // Step 1: get TTransport TTransport tpt = null; K8ServiceKey key = getConnBorrowKey(); try { // borrow transport tpt = connPool.borrowObject(key); } catch (Exception e) { throw new RuntimeException(e); } // Step 2: get client & exec try { TCLIENT tcli = createClient(tpt); texec.exec(tcli); returnTransport(key, tpt); } catch (Exception e) { returnBrokenTransport(key, tpt); throw new RuntimeException(e); } } private K8ServiceKey getConnBorrowKey() { return k8ServiceKey; } private void returnTransport(K8ServiceKey key, TTransport transport) { connPool.returnObject(key, transport); } private void returnBrokenTransport(K8ServiceKey key, TTransport transport) { connPool.returnBrokenObject(key, transport); } public K8ServiceKey getK8ServiceKey() { return k8ServiceKey; } public void setK8ServiceKey(K8ServiceKey k8ServiceKey) { this.k8ServiceKey = k8ServiceKey; } } 

上述大部分代碼和EasyThriftClient非常接近,有差異的部分主要是與連接的"借用"、"歸還"相關的:

  1. 在call和exec中,借用連接
    • getConnBorrowKey先構造一個key,包含了主機名和端口。這里的主機名是微服務的自動發現中提到的Kubernetes服務,如果你對相關原理不太熟悉,可以自行回顧對應章節。
    • 從connPool中借用一個連接(TTransport)
    • 剩余發起rpc調用的步驟就和EasyThriftClient相同了,不再贅述。
  2. 當rpc調用結束后
    • 正常結束,調用connPool.returnObject將TTransport歸還到連接池中。
    • 非正常結束,調用connPool.returnBrokenTransport,讓連接池銷毀這個連接,以防后續借用到這個可能出錯的TTransport。

類似的,我們也配套了對應的Builder:

package com.coder4.lmsia.thrift.client.builder; import com.coder4.lmsia.thrift.client.EasyThriftClient; import org.apache.thrift.TServiceClient; /** * @author coder4 */ public class EasyThriftClientBuilder<TCLIENT extends TServiceClient> { private final EasyThriftClient<TCLIENT> client = new EasyThriftClient<>(); protected EasyThriftClient<TCLIENT> build() { client.init(); return client; } protected EasyThriftClientBuilder<TCLIENT> setHost(String host) { client.setThriftServerHost(host); return this; } protected EasyThriftClientBuilder<TCLIENT> setPort(int port) { client.setThriftServerPort(port); return this; } protected EasyThriftClientBuilder<TCLIENT> setThriftClass(Class<?> thriftClass) { client.setThriftClass(thriftClass); return this; } } 

上述Builder主要是設置所需的兩個參數,Host和Port,看起來和EasyThriftClient並沒有什么不同?

別着急,我們繼續看一下lmsia-abc-client中的集成:

package com.coder4.lmsia.abc.client; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client; import com.coder4.lmsia.thrift.client.K8ServiceKey; import com.coder4.lmsia.thrift.client.ThriftClient; import com.coder4.lmsia.thrift.client.builder.K8ServiceThriftClientBuilder; /** * @author coder4 */ public class LmsiaK8ServiceThriftClientBuilder extends K8ServiceThriftClientBuilder<Client> { public LmsiaK8ServiceThriftClientBuilder(K8ServiceKey k8ServiceKey) { setThriftClass(LmsiaAbcThrift.class); setK8ServiceKey(k8ServiceKey); } public static ThriftClient<Client> buildClient(K8ServiceKey k8ServiceKey) { return new LmsiaK8ServiceThriftClientBuilder(k8ServiceKey).build(); } } 

在集成的時候,我們需要傳入一個key,可以手動制定,也可以自動配置

我們看一下完整的自動配置代碼,LmsiaAbcThriftClientConfiguration:

public class LmsiaAbcThriftClientConfiguration { @Bean(name = "lmsiaAbcThriftClient") @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient") @ConditionalOnProperty(name = {"lmsiaAbcThriftServer.host", "lmsiaAbcThriftServer.port"}) public ThriftClient<Client> easyThriftClient( @Value("${lmsiaAbcThriftServer.host}") String host, @Value("${lmsiaAbcThriftServer.port}") int port ) { LOG.info("######## LmsiaAbcThriftClientConfiguration ########"); LOG.info("easyThriftClient host = {}, port = {}", host, port); return LmsiaAbcEasyThriftClientBuilder.buildClient(host, port); } @Bean(name = "lmsiaAbcThriftClient") @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient") public ThriftClient<LmsiaAbcThrift.Client> k8ServiceThriftClient() { LOG.info("######## LmsiaAbcThriftClientConfiguration ########"); K8ServiceKey k8ServiceKey = new K8ServiceKey(K8_SERVICE_NAME, K8_SERVICE_PORT); LOG.info("k8ServiceThriftClient key:" + k8ServiceKey); return LmsiaK8ServiceThriftClientBuilder.buildClient(k8ServiceKey); } //... } 

對比easyThriftClient和k8ServiceThriftClient不難發現,K8ServiceThriftClient的參數,是通過常量直接寫死的。也就是我們在微服務的自動發現與負載均衡中提到的,約定好服務的命名規則。

看下常量定義:

public class LmsiaAbcConstant { // ...... public static final String PROJECT_NAME = "lmsia-abc"; public static final String K8_SERVICE_NAME = PROJECT_NAME + "-server"; public static final int K8_SERVICE_PORT = 3000; // ...... } 

這樣以來,一旦確定了項目名,那么Kubernetes中的服務名字也確定了。因此,k8ServiceThriftClient自動配置會被自動激活,即只要引用了lmsia-abc-client這個包,就會自動配置好一個RPC客戶端,是不是非常方便?

我們來看一下具體的使用例子:

import com.coder4.lmsia.thrift.client.ThriftClient; public class LmsiaAbctProxy { @Autowired private ThriftClient<Client> client; public String hello() { return client.call(cli -> cli.sayHi()); } 

至此,我們已經完成了在Spring Boo中集成Thrift RPC的服務端、客戶端的工作。

  • 服務端,我們通過ThriftServerConfiguration、ThriftProcessorConfiguration自動配置了Thrift RPC服務端。
  • 客戶端,通過Kubernetes的服務功能,自動配置了帶服務發現功能的Thrift RPC客戶端K8ServiceThriftClient。該客戶端同時內置了連接池,用於節省連接開銷。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM