一、背景簡介
分布式系統中存在很多拆分的服務,在不斷迭代升級的過程中,會出現如下常見的棘手情況:
某個技術組件版本升級,依賴包升級導致部分語法或者API過期,或者組件修復緊急的漏洞,從而會導致分布式系統下各個服務被動的升級迭代,很容易引發意外的問題;不同的服務中對組件的依賴和版本各不相同,從而導致不兼容問題的出現,很難對版本做統一的管理和維護,一旦出現問題很容易手忙腳亂,引發蝴蝶效應;
所以在復雜的系統中,對於依賴的框架和組件進行統一管理和二次淺封裝,可以較大程度降低上述問題的處理成本與風險,同時可以更好的管理和控制技術棧。
二、框架淺封裝
1、淺封裝作用
為什么淺封裝,核心目的在於統一管理和協調組件的依賴與升級,並對常用方法做一層包裝,實際上很多組件使用到的功能點並不多,只是在業務中的使用點很多,這樣給組件本身的迭代升級帶來了一定的難度:
例如某個組件常用的API中存在巨大風險漏洞,或者替換掉過期的用法,需要對整個系統中涉及的地方做升級,這種操作的成本是非常高的;
如果是對這種常用的組件方法進行二次包裝,作為處理業務的工具方法,那么解決上面的問題就相對輕松許多,只要對封裝的工具方法升級,服務的依賴升級即可,降低時間成本和風險。
通過淺封裝的手段,可以實現兩個方面的解耦:
業務與技術
技術棧中常用的方法進行二次淺封裝,這樣可以較大程度的降低業務與技術的耦合,如此可以獨立的升級技術棧,擴展功能而不影響業務服務的迭代。
框架與組件
不同的框架與組件都需要一定程度的自定義配置,同時分模塊管理,在不同的服務中引入特定的依賴,也可以在基礎包中做統一依賴,以此實現技術棧的快速組合搭配。
這里說的淺封裝,是指包裝常規常用的語法,組件本身就是技術層面的深度封裝,所以也不可能完全隔開技術棧原生用法。
2、統一版本控制
例如微服務架構下,不同的研發組負責不同的業務模塊,然而受到開發人員的經驗和能力影響,很容易出現不同的服務組件選型不一致,或者相同的組件依賴版本不同,這樣很難對系統架構做標准的統一管理。
對於二次封裝的方式,可以嚴格的控制技術棧的迭代擴展,以及版本沖突的問題,通過對二次封裝層的統一升級,可以快速實現業務服務的升級,解決不同服務的依賴差異問題。
三、實踐案例
1、案例簡介
Java分布式系統中,微服務基礎組件(Nacos、Feign、Gateway、Seata)等,系統中間件(Quartz、Redis、Kafka、ElasticSearch,Logstash)等,對常用功能、配置、API等,進行二次淺封裝並統一集成管理,以滿足日常開發中基礎環境搭建與臨時工具的快速實現。
- butte-flyer 組件封裝的應用案例;
- butte-frame 常用技術組件二次封裝;
2、分層架構
整體划分五層:網關層、應用層、業務層、中間件層、基礎層,組合成一套分布式系統。

服務總覽
| 服務名 | 分層 | 端口 | 緩存庫 | 數據庫 | 描述 |
|---|---|---|---|---|---|
| flyer-gateway | 網關層 | 8010 | db1 | nacos | 路由控制 |
| flyer-facade | 應用層 | 8082 | db2 | facade | 門面服務 |
| flyer-admin | 應用層 | 8083 | db3 | admin | 后端管理 |
| flyer-account | 業務層 | 8084 | db4 | account | 賬戶管理 |
| flyer-quartz | 業務層 | 8085 | db5 | quartz | 定時任務 |
| kafka | 中間件 | 9092 | --- | ------ | 消息隊列 |
| elasticsearch | 中間件 | 9200 | --- | ------ | 搜索引擎 |
| redis | 中間件 | 6379 | --- | ------ | 緩存中心 |
| logstash | 中間件 | 5044 | --- | es6.8.6 | 日志采集 |
| nacos | 基礎層 | 8848 | --- | nacos | 注冊配置 |
| seata | 基礎層 | 8091 | --- | seata | 分布事務 |
| mysql | 基礎層 | 3306 | --- | ------ | 數據存儲 |
3、目錄結構
在butte-frame中對各個技術棧進行二次封裝管理,在butte-flyer中進行依賴引用。
butte-frame
├── frame-base 基礎代碼塊
├── frame-jdbc 數據庫組件
├── frame-core 服務基礎依賴
├── frame-gateway 路由網關
├── frame-nacos 注冊與配置中心
├── frame-seata 分布式事務
├── frame-feign 服務間調用
├── frame-security 安全管理
├── frame-search 搜索引擎
├── frame-redis 緩存管理
├── frame-kafka 消息中間件
├── frame-quartz 定時任務
├── frame-swagger 接口文檔
└── frame-sleuth 鏈路日志
butte-flyer
├── flyer-gateway 網關服務:路由控制
├── flyer-facade 門面服務:功能協作接口
├── flyer-account 賬戶服務:用戶賬戶
├── flyer-quartz 任務服務:定時任務
└── flyer-admin 管理服務:后端管理
4、技術棧組件
系統常用的技術棧:基礎框架、微服務組件、緩存、安全管理、數據庫、定時任務、工具依賴等。
| 名稱 | 版本 | 說明 |
|---|---|---|
| spring-cloud | 2.2.5.RELEASE | 微服務框架基礎 |
| spring-boot | 2.2.5.RELEASE | 服務基礎依賴 |
| gateway | 2.2.5.RELEASE | 路由網關 |
| nacos | 2.2.5.RELEASE | 注冊中心與配置管理 |
| seata | 2.2.5.RELEASE | 分布式事務管理 |
| feign | 2.2.5.RELEASE | 微服務間請求調用 |
| security | 2.2.5.RELEASE | 安全管理 |
| sleuth | 2.2.5.RELEASE | 請求軌跡鏈路 |
| security-jwt | 1.0.10.RELEASE | JWT加密組件 |
| hikari | 3.4.2 | 數據庫連接池,默認 |
| mybatis-plus | 3.4.2 | ORM持久層框架 |
| kafka | 2.0.1 | MQ消息隊列 |
| elasticsearch | 6.8.6 | 搜索引擎 |
| logstash | 5.2 | 日志采集 |
| redis | 2.2.5.RELEASE | 緩存管理與加鎖控制 |
| quartz | 2.3.2 | 定時任務管理 |
| swagger | 2.6.1 | 接口文檔 |
| apache-common | 2.7.0 | 基礎依賴包 |
| hutool | 5.3.1 | 基礎工具包 |
四、微服務組件
1、Nacos
Nacos在整個組件體系中,提供兩個核心能力,注冊發現:適配微服務注冊與發現標准,快速實現動態服務注冊發現、元數據管理等,提供微服務組件中最基礎的能力;配置中心:統一管理各個服務配置,集中在Nacos中存儲管理,隔離多環境的不同配置,並且可以規避線上配置放開的風險;

連接管理
spring:
cloud:
nacos:
# 配置讀取
config:
prefix: application
server-addr: 127.0.0.1:8848
file-extension: yml
refresh-enabled: true
# 注冊中心
discovery:
server-addr: 127.0.0.1:8848
配置管理
- bootstrap.yml :服務中文件,連接和讀取Nacos中配置信息;
- application.yml :公共基礎配置,這里配置mybatis組件;
- application-dev.yml :中間件連接配置,用作環境標識隔離;
- application-def.yml :各個服務的自定義配置,參數加載;

2、Gateway
Gateway網關核心能力,提供統一的API路由管理,作為微服務架構體系下請求唯一入口,還可以在網關層處理所有的非業務功能,例如:安全控制,流量監控限流,等等。
路由控制:各個服務的發現和路由;
@Component
public class RouteFactory implements RouteDefinitionRepository {
@Resource
private RouteService routeService ;
/**
* 加載全部路由
* @since 2021-11-14 18:08
*/
@Override
public Flux<RouteDefinition> getRouteDefinitions() {
return Flux.fromIterable(routeService.getRouteDefinitions());
}
/**
* 添加路由
* @since 2021-11-14 18:08
*/
@Override
public Mono<Void> save(Mono<RouteDefinition> routeMono) {
return routeMono.flatMap(routeDefinition -> {
routeService.saveRouter(routeDefinition);
return Mono.empty();
});
}
}
全局過濾:作為網關的基礎能力;
@Component
public class GatewayFilter implements GlobalFilter {
private static final Logger logger = LoggerFactory.getLogger(GatewayFilter.class);
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String uri = request.getURI().getPath() ;
String host = String.valueOf(request.getHeaders().getHost()) ;
logger.info("request host : {} , uri : {}",host,uri);
return chain.filter(exchange);
}
}
3、Feign
Feign組件是聲明式的WebService客戶端,使微服務之間的調用變得更簡單,Feign通過注解手段,將請求進行模板化和接口化管理,可以更加標准的管理各個服務間的通信交互。
響應解碼:定義Feign接口響應時解碼邏輯,校驗和控制統一的接口風格;
public class FeignDecode extends ResponseEntityDecoder {
public FeignDecode(Decoder decoder) {
super(decoder);
}
@Override
public Object decode(Response response, Type type) {
if (!type.getTypeName().startsWith(Rep.class.getName())) {
throw new RuntimeException("響應格式異常");
}
try {
return super.decode(response, type);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
}
4、Seata
Seata組件是開源的分布式事務解決方案,致力於提供高性能和簡單易用的分布式事務服務,實現AT、TCC、SAGA、XA事務模式,支持一站式的分布式解決方案。
事務配置:基於nacos管理Seata組件的參數定義;

服務注冊:在需要管理分布式事務的服務中連接和使用Seata服務;
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: butte-seata-group
config:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.config.server-addr}
group: DEFAULT_GROUP
registry:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.config.server-addr}
application: seata-server
group: DEFAULT_GROUP
五、中間件集成
1、Kafka
Kafka是由Apache開源,具有分布式、分區的、多副本的、多訂閱者,基於Zookeeper協調的分布式消息處理平台,由Scala和Java語言編寫。還常用於搜集用戶在應用服務中產生的日志數據。
消息發送:封裝消息發送的基礎能力;
@Component
public class KafkaSendOperate {
@Resource
private KafkaTemplate<String, String> kafkaTemplate ;
public void send (SendMsgVO entry) {
kafkaTemplate.send(entry.getTopic(),entry.getKey(),entry.getMsgBody()) ;
}
}
消息消費:消費監聽時有兩種策略;
- 消息生產方自己消費,通過Feign接口去執行具體消費服務的邏輯,這樣有利於流程跟蹤排查;
- 消息消費方直接監聽,減少消息處理的流程節點,當然也可以打造統一的MQ總線服務(文尾);
public class KafkaListen {
private static final Logger logger = LoggerFactory.getLogger(KafkaListen.class);
/**
* Kafka消息監聽
* @since 2021-11-06 16:47
*/
@KafkaListener(topics = KafkaTopic.USER_TOPIC)
public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {
try {
String key = String.valueOf(record.key());
String body = record.value();
switch (key){ }
} catch (Exception e){
e.printStackTrace();
} finally {
acknowledgment.acknowledge();
}
}
}
2、Redis
Redis是一款開源組件,基於內存的高性能的key-value數據結構存儲系統,它可以用作數據庫、緩存和消息中間件,支持多種類型的數據結構,如字符串、集合等。在實際應用中,通常用來做變動頻率低的熱點數據緩存和加鎖機制。
KV數據緩存:作為Redis最常用的功能,即緩存一個指定有效期的鍵和值,在使用時直接獲取;
@Component
public class RedisKvOperate {
@Resource
private StringRedisTemplate stringRedisTemplate ;
/**
* 創建緩存,必須帶緩存時長
* @param key 緩存Key
* @param value 緩存Value
* @param expire 單位秒
* @return boolean
* @since 2021-08-07 21:12
*/
public boolean set (String key, String value, long expire) {
try {
stringRedisTemplate.opsForValue().set(key,value,expire, TimeUnit.SECONDS);
} catch (Exception e){
e.printStackTrace();
return Boolean.FALSE ;
}
return Boolean.TRUE ;
}
}
Lock加鎖機制:基於spring-integration-redis中RedisLockRegistry,實現分布式鎖;
@Component
public class RedisLockOperate {
@Resource
protected RedisLockRegistry redisLockRegistry;
/**
* 嘗試一次加鎖,采用默認時間
* @param lockKey 加鎖Key
* @return java.lang.Boolean
* @since 2021-09-12 13:14
*/
@SneakyThrows
public <T> Boolean tryLock(T lockKey) {
return redisLockRegistry.obtain(lockKey).tryLock(time, TimeUnit.MILLISECONDS);
}
/**
* 釋放鎖
* @param lockKey 解鎖Key
* @since 2021-09-12 13:32
*/
public <T> void unlock(T lockKey) {
redisLockRegistry.obtain(lockKey).unlock();
}
}
3、ElasticSearch
ElasticSearch是一個基於Lucene的搜索服務器,它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口,Elasticsearch是用Java開發的,是當前流行的企業級搜索引擎。
索引管理:索引的創建和刪除,結構添加和查詢;
基於ElasticsearchRestTemplate的模板方法操作;
@Component
public class TemplateOperate {
@Resource
private ElasticsearchRestTemplate template ;
/**
* 創建索引和結構
* @param clazz 基於注解類實體
* @return java.lang.Boolean
* @since 2021-08-15 19:25
*/
public <T> Boolean createPut (Class<T> clazz){
boolean createIf = template.createIndex(clazz) ;
if (createIf){
return template.putMapping(clazz) ;
}
return Boolean.FALSE ;
}
}
基於RestHighLevelClient原生API操作;
@Component
public class IndexOperate {
@Resource
private RestHighLevelClient client ;
/**
* 判斷索引是否存在
* @return boolean
* @since 2021-08-07 18:57
*/
public boolean exists (IndexVO entry) {
GetIndexRequest getReq = new GetIndexRequest (entry.getIndexName()) ;
try {
return client.indices().exists(getReq, entry.getOptions());
} catch (Exception e) {
e.printStackTrace();
}
return Boolean.FALSE ;
}
}
數據管理:數據新增、主鍵查詢、修改、批量操作,業務性質的搜索封裝復雜度很高;
數據的增刪改方法;
@Component
public class DataOperate {
@Resource
private RestHighLevelClient client ;
/**
* 批量更新數據
* @param entry 對象主體
* @since 2021-08-07 18:16
*/
public void bulkUpdate (DataVO entry){
if (CollUtil.isEmpty(entry.getDataList())){
return ;
}
// 請求條件
BulkRequest bulkUpdate = new BulkRequest(entry.getIndexName(),entry.getType()) ;
bulkUpdate.setRefreshPolicy(entry.getRefresh()) ;
entry.getDataList().forEach(dataMap -> {
UpdateRequest updateReq = new UpdateRequest() ;
updateReq.id(String.valueOf(dataMap.get("id"))) ;
updateReq.doc(dataMap) ;
bulkUpdate.add(updateReq) ;
});
try {
// 執行請求
client.bulk(bulkUpdate, entry.getOptions());
} catch (IOException e) {
e.printStackTrace();
}
}
}
索引主鍵查詢,分組查詢方法;
@Component
public class QueryOperate {
@Resource
private RestHighLevelClient client ;
/**
* 指定字段分組查詢
* @since 2021-10-07 19:00
*/
public Map<String,Object> groupByField (QueryVO entry){
Map<String,Object> groupMap = new HashMap<>() ;
// 分組API
String groupName = entry.getGroupField()+"_group" ;
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.size(0) ;
TermsAggregationBuilder termAgg = AggregationBuilders.terms(groupName)
.field(entry.getGroupField()) ;
sourceBuilder.aggregation(termAgg);
// 查詢API
SearchRequest searchRequest = new SearchRequest(entry.getIndexName());
searchRequest.source(sourceBuilder) ;
try {
// 執行API
SearchResponse response = client.search(searchRequest, entry.getOptions());
// 響應結果
Terms groupTerm = response.getAggregations().get(groupName) ;
if (CollUtil.isNotEmpty(groupTerm.getBuckets())){
for (Terms.Bucket bucket:groupTerm.getBuckets()){
groupMap.put(bucket.getKeyAsString(),bucket.getDocCount()) ;
}
}
} catch (IOException e) {
e.printStackTrace();
}
return groupMap ;
}
}
4、Logstash
Logstash是一款開源的數據采集組件,具有實時管道功能。Logstash能夠動態的從多個來源采集數據,進行標准化轉換數據,並將數據傳輸到所選擇的存儲容器。

- Sleuth:管理服務鏈路,提供核心TraceId和SpanId生成;
- ElasticSearch:基於ES引擎做日志聚合存儲和查詢;
- Logstash:提供日志采集服務,和數據發送ES的能力;
logback.xml:服務連接Logstash地址,並加載核心配置;
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml" />
<springProperty scope="context" name="APP_NAME" source="spring.application.name" defaultValue="butte_app" />
<springProperty scope="context" name="DES_URI" source="logstash.destination.uri" />
<springProperty scope="context" name="DES_PORT" source="logstash.destination.port" />
<!-- 輸出到LogStash配置,需要啟動LogStash服務 -->
<appender name="LogStash"
class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>${DES_URI:- }:${DES_PORT:- }</destination>
<encoder
class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"severity": "%level",
"service": "${APP_NAME:-}",
"trace": "%X{X-B3-TraceId:-}",
"span": "%X{X-B3-SpanId:-}",
"exportable": "%X{X-Span-Export:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
</configuration>

5、Quartz
Quartz是一個完全由java編寫的開源作業調度框架,用來執行各個服務中的定時調度任務,在微服務體系架構下,通常開發一個獨立的Quartz服務,通過Feign接口去觸發各個服務的任務執行。
配置參數:定時任務基礎信息,數據庫表,線程池;
spring:
quartz:
job-store-type: jdbc
properties:
org:
quartz:
scheduler:
instanceName: ButteScheduler
instanceId: AUTO
jobStore:
class: org.quartz.impl.jdbcjobstore.JobStoreTX
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
tablePrefix: qrtz_
isClustered: true
clusterCheckinInterval: 15000
useProperties: false
threadPool:
class: org.quartz.simpl.SimpleThreadPool
threadPriority: 5
threadCount: 10
threadsInheritContextClassLoaderOfInitializingThread: true
6、Swagger
Swagger是常用的接口文檔管理組件,通過對API接口和對象的簡單注釋,快速生成接口描述信息,並且提供可視化界面可以快速對接口發送請求和調試,該組件在前后端聯調中,極大的提高效率。
配置基本的包掃描能力即可;
@Configuration
public class SwaggerConfig {
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.butte"))
.paths(PathSelectors.any())
.build();
}
}
訪問:服務:端口/swagger-ui.html即可打開接口文檔;

六、數據庫配置
1、MySQL
微服務架構下,不同的服務對應不同的MySQL庫,基於業務模塊做庫的划分是當前常用的方式,可以對各自業務下的服務做迭代升級,同時可以避免單點故障導致雪崩效應。

2、HikariCP
HikariCP作為SpringBoot2版本推薦和默認采用的數據庫連接池,具有速度極快、輕量簡單的特點。
spring:
datasource:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/${data.name.mysql}?${spring.datasource.db-param}
username: root
password: 123456
db-param: useUnicode=true&characterEncoding=UTF8&zeroDateTimeBehavior=convertToNull&useSSL=false
hikari:
minimumIdle: 5
maximumPoolSize: 10
idleTimeout: 300000
maxLifetime: 500000
connectionTimeout: 30000
連接池的配置根據業務的並發需求量,做適當的調優即可。
3、Mybatis
Mybatis持久層的框架組件,支持定制化SQL、存儲過程以及高級映射,MyBatis-Plus是一個MyBatis的增強工具,在MyBatis的基礎上只做增強不做改變,可以簡化開發、提高效率。
mybatis-plus:
mapper-locations: classpath*:/mapper/**/*.xml
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
七、源代碼地址
應用倉庫:
https://gitee.com/cicadasmile/butte-flyer-parent
組件封裝:
https://gitee.com/cicadasmile/butte-frame-parent
