【轉載】 https://coder4.com/homs_online/spring-boot/sb-thrift.html
Spring Boot自動配置簡介
在介紹RPC之前,我們先來學習下Spring Boot的自動配置。
我們前面已經提到:Spring Boot來源於Spring,並且做了眾多改進,其中最有用的設計理念是約定優於配置,它通過自動配置功能(大多數開發者平時習慣設置的配置作為默認配置)為開發者快速、准確地構建出標准化的應用。
以集成MySQL數據庫為例,在Spring Boot出現之前,我們要
- 配置JDBC驅動依賴
- 配置XML文件中數據源
- 配置XML中的DataSource Bean
- 配置XML中的XXXTemplate Bean
- 配置XML中的XXXTransactionManager Bean
有了Spring Boot的自動配置后,自動配置幫我們生成了各種DataSource、XXXTemplate、XXXTransactionManager,我們所需要做的只有一條,就是激活它
- maven中依賴包含自動配置的包
- 配置JDBC驅動依賴
- yaml文件中定義數據源
自動配置進行智能檢測,只要滿足上述3個條件,其他的Bean都會被自動生成並注入到Spring環境中。我們需要使用時只需要@Autowired一下就可以了,是不是非常簡單!
由於篇幅所限,本書不會對自動配置的書寫做零起點教學,如果你想了解自動配置的原理,可以參考這篇文章spring boot實戰(第十三篇)自動配置原理分析
在本節的后續部分,我們會以Thrift RPC Server為例,看看自動配置是如何書寫的。
RPC簡介
遠程過程調用(remote procedure call或簡稱RPC),指的是運行於本地(客戶端)的程序像調用本地程序一樣,直接調用另一台計算機(服務器端)的程序,而程序員無需額外為遠程交互做額外的編程。
RPC極大地簡化了分布似乎系統中節點之間網絡通信的開發工作量,是微服務架構中的重要組件之一。
在本書中,我們選用Thrift作為RPC框架。由於篇幅所限,我們不會對Thrift RPC作出詳盡的介紹,如果你還不熟悉,可以參考官方的快速入門文檔。
Spring Boot整合Thrift RPC服務端
簡要來說,啟動一個Thrift RPC的服務端需要如下步驟:
- 書寫DSL(.thrift文件),定義函數、數據結構等。
- 編譯並生成樁代碼。
- 編寫Handler(RPC的邏輯入口)。
- 基於上述Handler,構造Processor。
- 構造Server,Thrift提供了多種服務端供選擇,常用的有TThreadPoolServer(多線程服務器)和TNonblockingServer(非阻塞服務器)。
- 設置Server的Protocol,類似的,Thrift提供了多種傳輸協議,最常用的是TBinaryProtocol和TCompactProtocol。
- 設置Server的Transport(Factory),用這種方式指定底層的傳輸協議,常用的有TFramedTransport、TNonBlockingTransport,不同的Transport可以類似Java的IOStreawm方式,相互疊加,以產生更強大的效果。
上述對Thrift服務器的架構做了簡要介紹,如果想更深入了解,可以自行閱讀官方源碼。
首先,我們來看一下thrift定義(根據上一節的介紹,文件放在lmsia-abc-common包中)
namespace java com.coder4.lmsia.abc service lmsiaAbcThrift { string sayHi() }
調用thrift進行編譯后,我們也將對應的樁文件放置在lmsia-abc-client下,目錄結構可以參見上一節。
為了更方便的在Spring Boot中集成Thrift服務器,我將相應代碼抽取成了公用庫lmsia-thrift-server
├── build.gradle
├── gradle
│ └── wrapper
│ ├── gradle-wrapper.jar
│ └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── README.md
├── settings.gradle
└── src
├── main
│ ├── java
│ │ └── com
│ │ └── coder4
│ │ └── lmsia
│ │ └── thrift
│ │ └── server
│ │ ├── configuration
│ │ │ └── ThriftServerConfiguration.java
│ │ └── ThriftServerRunnable.java
│ └── resources
│ └── META-INF
│ └── spring.factories
└── test
└── java
簡單解析下項目結構: gradle相關: 與前節介紹的類似,只不過這里是單項目功能。 ThriftServerConfiguration: 自動配置,當滿足條件后會自動激活,激活后可自動啟動Thrift RPC服務。 ThriftServerRunnable: Thrift RPC服務器的構造邏輯、運行線程。 spring.factories: 當我們以類庫方式提供自動配置時,需要增加這個spring.factories,讓別的項目能"定位到"要檢查的自動配置。
首先,我們來看一下ThriftServerRunnable.java
package com.coder4.lmsia.thrift.server; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author coder4 */ public class ThriftServerRunnable implements Runnable { private static final int THRIFT_PORT = 3000; private static final int THRIFT_TIMEOUT = 5000; private static final int THRIFT_TCP_BACKLOG = 5000; private static final int THRIFT_CORE_THREADS = 128; private static final int THRIFT_MAX_THREADS = 256; private static final int THRIFT_SELECTOR_THREADS = 16; private static final TProtocolFactory THRIFT_PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); // 16MB private static final int THRIFT_MAX_FRAME_SIZE = 16 * 1024 * 1024; // 4MB private static final int THRIFT_MAX_READ_BUF_SIZE = 4 * 1024 * 1024; protected ExecutorService threadPool; protected TServer server; protected Thread thread; private TProcessor processor; private boolean isDestroy = false; public ThriftServerRunnable(TProcessor processor) { this.processor = processor; } public TServer build() throws TTransportException { TNonblockingServerSocket.NonblockingAbstractServerSocketArgs socketArgs = new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs(); socketArgs.port(THRIFT_PORT); socketArgs.clientTimeout(THRIFT_TIMEOUT); socketArgs.backlog(THRIFT_TCP_BACKLOG); TNonblockingServerTransport transport = new TNonblockingServerSocket(socketArgs); threadPool = new ThreadPoolExecutor(THRIFT_CORE_THREADS, THRIFT_MAX_THREADS, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); TTransportFactory transportFactory = new TFramedTransport.Factory(THRIFT_MAX_FRAME_SIZE); TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport) .selectorThreads(THRIFT_SELECTOR_THREADS) .executorService(threadPool) .transportFactory(transportFactory) .inputProtocolFactory(THRIFT_PROTOCOL_FACTORY) .outputProtocolFactory(THRIFT_PROTOCOL_FACTORY) .processor(processor); args.maxReadBufferBytes = THRIFT_MAX_READ_BUF_SIZE; return new TThreadedSelectorServer(args); } @Override public void run() { try { server = build(); server.serve(); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("Start Thrift RPC Server Exception"); } } public void stop() throws Exception { threadPool.shutdown(); server.stop(); } }
我們來解釋一下:
- build方法用於構造一個可供運行的Thrift RPC Server
- 構造非阻塞Socket,並設置監聽端口、超時
- 構造非阻塞Transport
- 構造線程池,在這里我們的服務器模型是非阻塞線程池RPC服務器。
- 構造底層傳輸協議即TFramedTransport
- 構造ThriftServer,並設置前面構造的非阻塞Transport、線程池、協議TBinaryProtocol
- 整個ThriftServerRunnable類是一個線程Runnablerun,run函數中構造RPC服務,並啟動服務(servee)
- stop服務提供停止服務的方法
下面我們來看一下自動配置ThriftServerConfiguration.java:
package com.coder4.lmsia.thrift.server.configuration; import com.coder4.lmsia.thrift.server.ThriftServerRunnable; import org.apache.thrift.TProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Configuration; import java.util.concurrent.TimeUnit; /** * @author coder4 */ @Configuration @ConditionalOnBean(value = {TProcessor.class}) public class ThriftServerConfiguration implements InitializingBean, DisposableBean { private Logger LOG = LoggerFactory.getLogger(ThriftServerConfiguration.class); private static final int GRACEFUL_SHOWDOWN_SEC = 3; @Autowired private TProcessor processor; private ThriftServerRunnable thriftServer; private Thread thread; @Override public void destroy() throws Exception { LOG.info("Wait for graceful shutdown on destroy(), {} seconds", GRACEFUL_SHOWDOWN_SEC); Thread.sleep(TimeUnit.SECONDS.toMillis(GRACEFUL_SHOWDOWN_SEC)); LOG.info("Shutdown rpc server."); thriftServer.stop(); thread.join(); } @Override public void afterPropertiesSet() throws Exception { thriftServer = new ThriftServerRunnable(processor); thread = new Thread(thriftServer); thread.start(); } }
這是我們編寫的第一個自動配置,我們稍微詳細的解釋一下:
- 啟動條件: 僅當服務提供了TProcessor才啟用,我們稍后會在lmsia-abc項目中看到,后者封裝了RPC的樁入口,提供了TProcessor。
- InitializingBean: 自動配置實現了InitializingBean,為什么要實現這個接口呢?當這個自動配置被初始化時,所有Autowired的屬性被自動注入(即Processor),而前面ThriftServerRunnable中我么已經看到,只有拿到了TProcessor,才能啟動RPC服務。因此,我們使用了InitializingBean,它自帶了afterPropertiesSet這個回調,會在所有屬性被注入完成后,調用這個回調函數。
- 在這里,我們調用了ThriftServerRunnable實現了Thrift RPC服務器的啟動。
- DisposableBean: 除了InitializingBean,我們還實現了DisposableBean。看名字就可以知道,這是Spring為了服務關閉時清理資源而設計的接口。事實也是如此,當服務關閉時,會依次調用每個自動配置,如果實現了DisposableBean,則回調destroy函數。
- 在這里,我們先讓線程休眠3秒,然后才關閉Thrift RPC服務,這主要是為了Graceful Shutdown而設計的("優雅關閉"),關於這一點,我們會在下一節會做詳細講解。
最后,我們的自動配置默認是無法被發現的,需要一個配置文件spring.factories:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.coder4.lmsia.thrift.server.configuration.ThriftServerConfiguration
解讀完lmsia-thrift-server后,我們看看如何將它整合進lmsia-abc項目中。
-
在lmsia-abc-server子項目中的build.gradle中加入:
compile 'com.github.liheyuan:lmsia-thrift-server:0.0.1'
-
提供一個TProcessor,如前文所述,這是啟用自動配置的必要條件,ThriftProcessorConfiguration: ```java package com.coder4.lmsia.abc.server.configuration;
import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift; import com.coder4.lmsia.abc.server.thrift.ThriftServerHandler; import org.apache.thrift.TProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
/**
-
@author coder4 */ @Configuration @ConditionalOnProperty(name = "thriftServer.enabled", matchIfMissing = true) public class ThriftProcessorConfiguration {
@Bean(name = "thriftProcessor") public TProcessor processor(ThriftServerHandler handler) {
return new LmsiaAbcThrift.Processor(handler);
}
}
我們簡單解釋下:
* 這也是一個自動配置,僅當配置文件中thriftServer.enabled=true時才啟用(不配置默認true)
* 提供的TProcessor,需要依賴ThriftServerHandler,這個就是Thrift生成的樁函數,項目結構分析中已經提到過,這是RPC服務器的邏輯入口。
怎么樣,使用了自動配置后,啟動一個Thrift 服務器是不是非常簡單?
## Spring Boot整合Thrift RPC客戶端
只有服務端是不行的,還需要有客戶端。
類似地,為了方便的生成客戶端,我們把代碼進行了整理和抽象,放到了[lmsia-thrift-client](https://github.com/liheyuan/lmsia-thrift-client)項目中。
首先看一下項目結構:
```shell
├── build.gradle
├── gradle
│ └── wrapper
│ ├── gradle-wrapper.jar
│ └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── README.md
├── settings.gradle
└── src
├── main
│ ├── java
│ │ └── com
│ │ └── coder4
│ │ └── lmsia
│ │ └── thrift
│ │ └── client
│ │ ├── ThriftClient.java
│ │ ├── AbstractThriftClient.java
│ │ ├── EasyThriftClient.java
│ │ ├── K8ServiceThriftClient.java
│ │ ├── K8ServiceKey.java
│ │ ├── builder
│ │ │ ├── EasyThriftClientBuilder.java
│ │ │ └── K8ServiceThriftClientBuilder.java
│ │ ├── func
│ │ │ ├── ThriftCallFunc.java
│ │ │ └── ThriftExecFunc.java
│ │ ├── pool
│ │ │ ├── TTransportPoolFactory.java
│ │ │ └── TTransportPool.java
│ │ └── utils
│ │ └── ThriftUrlStr.java
│ └── resources
└── test
└── java
└── LibraryTest.java
解釋下項目結構:
- gradle相關的與之前類似,不再贅述
- ThriftClient相關,定義了Thrift的客戶端
- ThriftClient 抽象了客戶端的接口
- AbstractThriftClient 實現了除連接外的Thrift Client操作
- EasyThriftClient 使用IP和端口直連的Thrift Client
- K8ServiceThriftClient 使用Kubernetes服務名字(根據微服務自動發現一節中的介紹,服務名字實際也是Host)和端口的Thrift Client,並內置了連接池。
- func 函數編程工具類
- builder 方便快速構造上述兩種Thrift Client
- pool 客戶端連接池
本小節主要對IP、端口直連的客戶端即EasyThriftClient進行介紹。關於支持服務自動發現以及連接池功能的K8ServiceThriftClient,將在下一節進行介紹。
先看一下接口定義,ThriftClient:
package com.coder4.lmsia.thrift.client; import com.coder4.lmsia.thrift.client.func.ThriftCallFunc; import com.coder4.lmsia.thrift.client.func.ThriftExecFunc; import org.apache.thrift.TServiceClient; import java.util.concurrent.Future; /** * @author coder4 */ public interface ThriftClient<TCLIENT extends TServiceClient> { /** * sync call with return value * @param tcall thrift rpc client call * @param <TRET> return type * @return */ <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall); /** * sync call without return value * @param texec thrift rpc client */ void exec(ThriftExecFunc<TCLIENT> texec); /** * async call with return value * @param tcall thrift rpc client call * @param <TRET> * @return */ <TRET> Future<TRET> asyncCall(ThriftCallFunc<TCLIENT, TRET> tcall); /** * asnyc call without return value * @param texec thrift rpc client call */ <TRET> Future<?> asyncExec(ThriftExecFunc<TCLIENT> texec); }
這里需要解釋一下,上述實際分成了兩大類:
- exec 無返回值的rpc調用
- call 有返回值的調用
這里使用了Java 8的函數式編程進行抽象。如果不太熟悉的朋友,可以自行查閱相關資料。
在函數式編程的幫助下,我們可以將每一個rpc調用都分為同步和異步兩種,異步的調用會返回一個Future。
再來看一下AbstractThriftClient:
/** * @(#)AbstractThriftClient.java, Aug 01, 2017. * <p> * Copyright 2017 fenbi.com. All rights reserved. * FENBI.COM PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package com.coder4.lmsia.thrift.client; import com.coder4.lmsia.thrift.client.func.ThriftCallFunc; import com.coder4.lmsia.thrift.client.func.ThriftExecFunc; import org.apache.thrift.TServiceClient; import org.apache.thrift.TServiceClientFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TTransport; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author coder4 */ public abstract class AbstractThriftClient<TCLIENT extends TServiceClient> implements ThriftClient<TCLIENT> { protected static final int THRIFT_CLIENT_DEFAULT_TIMEOUT = 5000; protected static final int THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 16; private Class<?> thriftClass; private static final TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory(); private TServiceClientFactory<TCLIENT> clientFactory; // For async call private ExecutorService threadPool; public void init() { try { clientFactory = getThriftClientFactoryClass().newInstance(); } catch (Exception e) { throw new RuntimeException(); } if (!check()) { throw new RuntimeException("Client config failed check!"); } threadPool = new ThreadPoolExecutor( 10, 100, 0, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>()); } protected boolean check() { if (thriftClass == null) { return false; } return true; } @Override public <TRET> Future<TRET> asyncCall(ThriftCallFunc<TCLIENT, TRET> tcall) { return threadPool.submit(() -> this.call(tcall)); } @Override public <TRET> Future<?> asyncExec(ThriftExecFunc<TCLIENT> texec) { return threadPool.submit(() -> this.exec(texec)); } protected TCLIENT createClient(TTransport transport) throws Exception { // Step 1: get TProtocol TProtocol protocol = protocolFactory.getProtocol(transport); // Step 2: get client return clientFactory.getClient(protocol); } private Class<TServiceClientFactory<TCLIENT>> getThriftClientFactoryClass() { Class<TCLIENT> clientClazz = getThriftClientClass(); if (clientClazz == null) { return null; } for (Class<?> clazz : clientClazz.getDeclaredClasses()) { if (TServiceClientFactory.class.isAssignableFrom(clazz)) { return (Class<TServiceClientFactory<TCLIENT>>) clazz; } } return null; } private Class<TCLIENT> getThriftClientClass() { for (Class<?> clazz : thriftClass.getDeclaredClasses()) { if (TServiceClient.class.isAssignableFrom(clazz)) { return (Class<TCLIENT>) clazz; } } return null; } public void setThriftClass(Class<?> thriftClass) { this.thriftClass = thriftClass; } }
上述抽象的Thrift客戶端實現了如下功能:
- 客戶端線程池,這里主要是為異步調用准備的,與之前構造的服務端的線程池是完全不同的。
- asyncCall和asyncExec使用了線程池來完成異步調用
- thriftClass 存儲了Thrift的樁代碼了類,不同業務生成的ThriftClass不一樣,所以這里存儲了class。
- createClient提供了共用函數,傳入一個transport,即可構造生成一個Thrift Client,特別注意的是,這里設定的通信協議為TBinaryProtocol,必須與服務端保持一致,否則無法成功通信。
由於call和exec與連接實現較為相關,因此並未在這一層中實現,最后我們來看一下EasyThriftClient:
package com.coder4.lmsia.thrift.client; import com.coder4.lmsia.thrift.client.func.ThriftCallFunc; import com.coder4.lmsia.thrift.client.func.ThriftExecFunc; import org.apache.thrift.TServiceClient; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; /** * @author coder4 */ public class EasyThriftClient<TCLIENT extends TServiceClient> extends AbstractThriftClient<TCLIENT> { private static final int EASY_THRIFT_BUFFER_SIZE = 1024 * 16; protected String thriftServerHost; protected int thriftServerPort; @Override protected boolean check() { if (thriftServerHost == null || thriftServerHost.isEmpty()) { return false; } if (thriftServerPort <= 0) { return false; } return super.check(); } private TTransport borrowTransport() throws Exception { TSocket socket = new TSocket(thriftServerHost, thriftServerPort, THRIFT_CLIENT_DEFAULT_TIMEOUT); TTransport transport = new TFramedTransport( socket, THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE); transport.open(); return transport; } private void returnTransport(TTransport transport) { if (transport != null && transport.isOpen()) { transport.close(); } } private void returnBrokenTransport(TTransport transport) { if (transport != null && transport.isOpen()) { transport.close(); } } @Override public <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall) { // Step 1: get TTransport TTransport tpt = null; try { tpt = borrowTransport(); } catch (Exception e) { throw new RuntimeException(e); } // Step 2: get client & call try { TCLIENT tcli = createClient(tpt); TRET ret = tcall.call(tcli); returnTransport(tpt); return ret; } catch (Exception e) { returnBrokenTransport(tpt); throw new RuntimeException(e); } } @Override public void exec(ThriftExecFunc<TCLIENT> texec) { // Step 1: get TTransport TTransport tpt = null; try { tpt = borrowTransport(); } catch (Exception e) { throw new RuntimeException(e); } // Step 2: get client & exec try { TCLIENT tcli = createClient(tpt); texec.exec(tcli); returnTransport(tpt); } catch (Exception e) { returnBrokenTransport(tpt); throw new RuntimeException(e); } } public String getThriftServerHost() { return thriftServerHost; } public void setThriftServerHost(String thriftServerHost) { this.thriftServerHost = thriftServerHost; } public int getThriftServerPort() { return thriftServerPort; } public void setThriftServerPort(int thriftServerPort) { this.thriftServerPort = thriftServerPort; }
簡單解釋下上述代碼
- 需要外部傳入RPC服務器的主機名和端口 thriftServerHost和thriftServerPort
- borrowTransport完成Transport(Thrift中類似Socket的抽象) 的構造,注意這里要使用TFramedTransport,與之前服務端的構造保持一致。
- returnTransport關閉Transport
- returnBrokenTransport關閉出異常的Transport
- call和exec 在拿到Transport后,使用函數式編程的方式,完成rpc調用,如果有異常則關閉連接。
最后我們來看一下對應的Builder,EasyThriftClientBuilder:
package com.coder4.lmsia.thrift.client.builder; import com.coder4.lmsia.thrift.client.EasyThriftClient; import org.apache.thrift.TServiceClient; /** * @author coder4 */ public class EasyThriftClientBuilder<TCLIENT extends TServiceClient> { private final EasyThriftClient<TCLIENT> client = new EasyThriftClient<>(); protected EasyThriftClient<TCLIENT> build() { client.init(); return client; } protected EasyThriftClientBuilder<TCLIENT> setHost(String host) { client.setThriftServerHost(host); return this; } protected EasyThriftClientBuilder<TCLIENT> setPort(int port) { client.setThriftServerPort(port); return this; } protected EasyThriftClientBuilder<TCLIENT> setThriftClass(Class<?> thriftClass) { client.setThriftClass(thriftClass); return this; } }
Builder的代碼比較簡單,就是以鏈式調用的方式,通過主機和端口,方便地構造一個EasyThriftClient。
看了EasyThriftClient后下面我們來看一下如何集成到項目中。
在Gradle子項目划分與微服務的代碼結構一節中,我們已經提到,將每個微服務的RPC客戶端放在xx-client子工程中,現在我們再來回顧下lmsia-abc-client的目錄結構。
├── build.gradle
└── src
├── main
│ ├── java
│ │ └── com
│ │ └── coder4
│ │ └── lmsia
│ │ └── abc
│ │ └── client
│ │ ├── configuration
│ │ │ └── LmsiaAbcThriftClientConfiguration.java
│ │ ├── LmsiaAbcEasyThriftClientBuilder.java
│ │ └── LmsiaK8ServiceThriftClientBuilder.java
│ └── resources
│ └── META-INF
│ └── spring.factories
└── test
我們簡單介紹一下:
- LmsiaAbcThriftClientConfiguration: 客戶端自動配置,當激活時,自動生成lmsia-abc對應的RPC服務的客戶端。引用者直接@Autowired一下,就可以使用了。
- LmsiaAbcEasyThriftClientBuilder: EasyThriftClient構造器,主要是自動配置需要。
- spring.factories: 與服務端的自動配置類似,需要在這個文件中指定自動配置的類路徑,才能讓Spring Boot自動掃描到自動配置。
- 其他K8ServiceThriftClient相關的部分,我們將在下一小節進行介紹。
LmsiaAbcEasyThriftClientBuilder文件:
package com.coder4.lmsia.abc.client; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client; import com.coder4.lmsia.thrift.client.ThriftClient; import com.coder4.lmsia.thrift.client.builder.EasyThriftClientBuilder; /** * @author coder4 */ public class LmsiaAbcEasyThriftClientBuilder extends EasyThriftClientBuilder<Client> { public LmsiaAbcEasyThriftClientBuilder(String host, int port) { setThriftClass(LmsiaAbcThrift.class); setHost(host); setPort(port); } public static ThriftClient<Client> buildClient(String host, int port) { return new LmsiaAbcEasyThriftClientBuilder(host, port).build(); } }
上述Builder完成了實際的參數填充,主要有:
- ThriftClient的樁代碼類設置(LmsiaAbcThrift.class)
- 設置主機名和端口
LmsiaAbcClientConfiguration文件:
package com.coder4.lmsia.abc.client.configuration; import com.coder4.lmsia.abc.client.LmsiaAbcEasyThriftClientBuilder; import com.coder4.lmsia.abc.client.LmsiaK8ServiceClientBuilder; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client; import com.coder4.lmsia.thrift.client.K8ServiceKey; import com.coder4.lmsia.thrift.client.ThriftClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; @Configuration public class LmsiaAbcThriftClientConfiguration { private Logger LOG = LoggerFactory.getLogger(getClass()); @Bean(name = "lmsiaAbcThriftClient") @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient") @ConditionalOnProperty(name = {"lmsiaAbcThriftServer.host", "lmsiaAbcThriftServer.port"}) public ThriftClient<Client> easyThriftClient( @Value("${lmsiaAbcThriftServer.host}") String host, @Value("${lmsiaAbcThriftServer.port}") int port ) { LOG.info("######## LmsiaAbcClientConfiguration ########"); LOG.info("easyClient host = {}, port = {}", host, port); return LmsiaAbcEasyThriftClientBuilder.buildClient(host, port); } }
如上所示,滿足兩個條件時,會自動構造LmsiaAbcEasyThriftClient:
- 還沒有生成其他的LmsiaAbcEasyThriftClient(ConditionalOnMissingBean)
- 配置中指定了lmsiaAbcThriftServer.host和lmsiaAbcThriftServer.port
根據我們前面的介紹,大家應該能理解,雖然有自動配置,但上述配置是一種很糟糕的方式。試想一下,如果我們的服務依賴了5個其他RPC服務,那么豈不是要分別配置5組IP和端口?此外,這種方式也無法支持節點的負載均衡。
如何解決這個問題呢?我們將在K8ServiceThriftClient中解決。
本小節的最后,我們看一下spring.factories:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.coder4.lmsia.abc.client.configuration.LmsiaAbcThriftClientConfiguration
和之前lmsia-abc-server子工程中的文件類似,這里設置了自動配置的詳細類路徑,方便Spring Boot的自動掃描。
K8ServiceThriftClient
在對EasyThriftClient的介紹中,我們發現了一個問題,需要單獨配置IP和端口,不支持服務自動發現。
此外,在這個客戶端的實現中,默認每次都要建立新的連接。而對於后端服務而言,RPC的服務端和客戶端多數都是在內網環境中,連接情況比較穩定,可以通過連接池的方式減少連接握手開銷,從而提升RPC服務的性能。如果你對連接池的原理還不太熟悉,可以參考百科連接池
為此,我們本將介紹K8ServiceThriftClient,它很好的解決了上述問題。
首先,我們使用commons-pool2來構建了TTransport層的連接池。
TTransportPoolFactory:
package com.coder4.lmsia.thrift.client.pool; import com.coder4.lmsia.thrift.client.K8ServiceKey; import org.apache.commons.pool2.BaseKeyedPooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; /** * @author coder4 */ public class TTransportPoolFactory extends BaseKeyedPooledObjectFactory<K8ServiceKey, TTransport> { protected static final int THRIFT_CLIENT_DEFAULT_TIMEOUT = 5000; protected static final int THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 16; @Override public TTransport create(K8ServiceKey key) throws Exception { if (key != null) { String host = key.getK8ServiceHost(); int port = key.getK8ServicePort(); TSocket socket = new TSocket(host, port, THRIFT_CLIENT_DEFAULT_TIMEOUT); TTransport transport = new TFramedTransport( socket, THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE); transport.open(); return transport; } else { return null; } } @Override public PooledObject<TTransport> wrap(TTransport transport) { return new DefaultPooledObject<>(transport); } @Override public void destroyObject(K8ServiceKey key, PooledObject<TTransport> obj) throws Exception { obj.getObject().close(); } @Override public boolean validateObject(K8ServiceKey key, PooledObject<TTransport> obj) { return obj.getObject().isOpen(); } }
上述代碼主要完成以下功能:
- 連接超時配置(5秒)
- create, 生成新連接(TTransport),這里與之前的EasyThriftClient非常類似,不再贅述
- 驗證連接是否有效,通過TTransport的isOpen判斷。
TTransportPool:
package com.coder4.lmsia.thrift.client.pool; import com.coder4.lmsia.thrift.client.K8ServiceKey; import org.apache.commons.pool2.impl.GenericKeyedObjectPool; import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author coder4 */ public class TTransportPool extends GenericKeyedObjectPool<K8ServiceKey, TTransport> { private Logger LOG = LoggerFactory.getLogger(getClass()); private static int MAX_CONN = 1024; private static int MIN_IDLE_CONN = 8; private static int MAX_IDLE_CONN = 32; public TTransportPool(TTransportPoolFactory factory) { super(factory); setTimeBetweenEvictionRunsMillis(45 * 1000); setNumTestsPerEvictionRun(5); setMaxWaitMillis(30 * 1000); setMaxTotal(MAX_CONN); setMaxTotalPerKey(MAX_CONN); setMinIdlePerKey(MIN_IDLE_CONN); setMaxTotalPerKey(MAX_IDLE_CONN); setTestOnCreate(true); setTestOnBorrow(true); setTestWhileIdle(true); } @Override public TTransportPoolFactory getFactory() { return (TTransportPoolFactory) super.getFactory(); } public void returnBrokenObject(K8ServiceKey key, TTransport transport) { try { invalidateObject(key, transport); } catch (Exception e) { LOG.warn("return broken key " + key); e.printStackTrace(); } } }
上述代碼主要是完成連接池的配置,比較直觀:
- 設置最大連接數1024
- 設置最大空閑數32,最小空閑數8,每間隔45秒嘗試更改維護連接池中的連接數量。
- 當每次"創建"、從池子中"借用"、"空閑"時,檢查連接是否有效。
下面我們來看一下如何在K8ServiceThriftClient中使用:
package com.coder4.lmsia.thrift.client; import com.coder4.lmsia.thrift.client.func.ThriftCallFunc; import com.coder4.lmsia.thrift.client.func.ThriftExecFunc; import com.coder4.lmsia.thrift.client.pool.TTransportPool; import com.coder4.lmsia.thrift.client.pool.TTransportPoolFactory; import org.apache.thrift.TServiceClient; import org.apache.thrift.transport.TTransport; public class K8ServiceThriftClient<TCLIENT extends TServiceClient> extends AbstractThriftClient<TCLIENT> { private K8ServiceKey k8ServiceKey; private TTransportPool connPool; @Override public void init() { super.init(); // check if (k8ServiceKey == null) { throw new RuntimeException("invalid k8ServiceName or k8Serviceport"); } // init pool connPool = new TTransportPool(new TTransportPoolFactory()); } @Override public <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall) { // Step 1: get TTransport TTransport tpt = null; K8ServiceKey key = getConnBorrowKey(); try { tpt = connPool.borrowObject(key); } catch (Exception e) { throw new RuntimeException(e); } // Step 2: get client & call try { TCLIENT tcli = createClient(tpt); TRET ret = tcall.call(tcli); returnTransport(key, tpt); return ret; } catch (Exception e) { returnBrokenTransport(key, tpt); throw new RuntimeException(e); } } @Override public void exec(ThriftExecFunc<TCLIENT> texec) { // Step 1: get TTransport TTransport tpt = null; K8ServiceKey key = getConnBorrowKey(); try { // borrow transport tpt = connPool.borrowObject(key); } catch (Exception e) { throw new RuntimeException(e); } // Step 2: get client & exec try { TCLIENT tcli = createClient(tpt); texec.exec(tcli); returnTransport(key, tpt); } catch (Exception e) { returnBrokenTransport(key, tpt); throw new RuntimeException(e); } } private K8ServiceKey getConnBorrowKey() { return k8ServiceKey; } private void returnTransport(K8ServiceKey key, TTransport transport) { connPool.returnObject(key, transport); } private void returnBrokenTransport(K8ServiceKey key, TTransport transport) { connPool.returnBrokenObject(key, transport); } public K8ServiceKey getK8ServiceKey() { return k8ServiceKey; } public void setK8ServiceKey(K8ServiceKey k8ServiceKey) { this.k8ServiceKey = k8ServiceKey; } }
上述大部分代碼和EasyThriftClient非常接近,有差異的部分主要是與連接的"借用"、"歸還"相關的:
- 在call和exec中,借用連接
- getConnBorrowKey先構造一個key,包含了主機名和端口。這里的主機名是微服務的自動發現中提到的Kubernetes服務,如果你對相關原理不太熟悉,可以自行回顧對應章節。
- 從connPool中借用一個連接(TTransport)
- 剩余發起rpc調用的步驟就和EasyThriftClient相同了,不再贅述。
- 當rpc調用結束后
- 正常結束,調用connPool.returnObject將TTransport歸還到連接池中。
- 非正常結束,調用connPool.returnBrokenTransport,讓連接池銷毀這個連接,以防后續借用到這個可能出錯的TTransport。
類似的,我們也配套了對應的Builder:
package com.coder4.lmsia.thrift.client.builder; import com.coder4.lmsia.thrift.client.EasyThriftClient; import org.apache.thrift.TServiceClient; /** * @author coder4 */ public class EasyThriftClientBuilder<TCLIENT extends TServiceClient> { private final EasyThriftClient<TCLIENT> client = new EasyThriftClient<>(); protected EasyThriftClient<TCLIENT> build() { client.init(); return client; } protected EasyThriftClientBuilder<TCLIENT> setHost(String host) { client.setThriftServerHost(host); return this; } protected EasyThriftClientBuilder<TCLIENT> setPort(int port) { client.setThriftServerPort(port); return this; } protected EasyThriftClientBuilder<TCLIENT> setThriftClass(Class<?> thriftClass) { client.setThriftClass(thriftClass); return this; } }
上述Builder主要是設置所需的兩個參數,Host和Port,看起來和EasyThriftClient並沒有什么不同?
別着急,我們繼續看一下lmsia-abc-client中的集成:
package com.coder4.lmsia.abc.client; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift; import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client; import com.coder4.lmsia.thrift.client.K8ServiceKey; import com.coder4.lmsia.thrift.client.ThriftClient; import com.coder4.lmsia.thrift.client.builder.K8ServiceThriftClientBuilder; /** * @author coder4 */ public class LmsiaK8ServiceThriftClientBuilder extends K8ServiceThriftClientBuilder<Client> { public LmsiaK8ServiceThriftClientBuilder(K8ServiceKey k8ServiceKey) { setThriftClass(LmsiaAbcThrift.class); setK8ServiceKey(k8ServiceKey); } public static ThriftClient<Client> buildClient(K8ServiceKey k8ServiceKey) { return new LmsiaK8ServiceThriftClientBuilder(k8ServiceKey).build(); } }
在集成的時候,我們需要傳入一個key,可以手動制定,也可以自動配置
我們看一下完整的自動配置代碼,LmsiaAbcThriftClientConfiguration:
public class LmsiaAbcThriftClientConfiguration { @Bean(name = "lmsiaAbcThriftClient") @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient") @ConditionalOnProperty(name = {"lmsiaAbcThriftServer.host", "lmsiaAbcThriftServer.port"}) public ThriftClient<Client> easyThriftClient( @Value("${lmsiaAbcThriftServer.host}") String host, @Value("${lmsiaAbcThriftServer.port}") int port ) { LOG.info("######## LmsiaAbcThriftClientConfiguration ########"); LOG.info("easyThriftClient host = {}, port = {}", host, port); return LmsiaAbcEasyThriftClientBuilder.buildClient(host, port); } @Bean(name = "lmsiaAbcThriftClient") @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient") public ThriftClient<LmsiaAbcThrift.Client> k8ServiceThriftClient() { LOG.info("######## LmsiaAbcThriftClientConfiguration ########"); K8ServiceKey k8ServiceKey = new K8ServiceKey(K8_SERVICE_NAME, K8_SERVICE_PORT); LOG.info("k8ServiceThriftClient key:" + k8ServiceKey); return LmsiaK8ServiceThriftClientBuilder.buildClient(k8ServiceKey); } //... }
對比easyThriftClient和k8ServiceThriftClient不難發現,K8ServiceThriftClient的參數,是通過常量直接寫死的。也就是我們在微服務的自動發現與負載均衡中提到的,約定好服務的命名規則。
看下常量定義:
public class LmsiaAbcConstant { // ...... public static final String PROJECT_NAME = "lmsia-abc"; public static final String K8_SERVICE_NAME = PROJECT_NAME + "-server"; public static final int K8_SERVICE_PORT = 3000; // ...... }
這樣以來,一旦確定了項目名,那么Kubernetes中的服務名字也確定了。因此,k8ServiceThriftClient自動配置會被自動激活,即只要引用了lmsia-abc-client這個包,就會自動配置好一個RPC客戶端,是不是非常方便?
我們來看一下具體的使用例子:
import com.coder4.lmsia.thrift.client.ThriftClient; public class LmsiaAbctProxy { @Autowired private ThriftClient<Client> client; public String hello() { return client.call(cli -> cli.sayHi()); }
至此,我們已經完成了在Spring Boo中集成Thrift RPC的服務端、客戶端的工作。
- 服務端,我們通過ThriftServerConfiguration、ThriftProcessorConfiguration自動配置了Thrift RPC服務端。
- 客戶端,通過Kubernetes的服務功能,自動配置了帶服務發現功能的Thrift RPC客戶端K8ServiceThriftClient。該客戶端同時內置了連接池,用於節省連接開銷。