前言
sharding-jdbc主要功能是分片,我們實現不同分片算法來進行分庫分表,另外一個擴展點就是主鍵生成, 本文主要記錄下sharding-jdbc執行流程和分片路由具體實現以及主鍵生成,在工作中方便排查問題。
主要記錄三個問題:
1.sharding-jdbc執行流程
2.自定義分片算法是如何被sharding-jdbc框架調用的
3.主鍵是在何處何時生成
4.擴展機制spi
1. sharding-jdbc處理流程
操作數據庫套路是:數據源獲取數據庫連接,數據庫連接生成Statement,然后執行Statement,獲取sql執行結果。
那么對於sharding來說
入口獲取數據庫連接就是ShardingDataSource.getConnection()
接着生成PreparedStatement:ShardingConnection.prepareStatement(String)
,生成ShardingPreparedStatement
對於增刪改查就是ShardingPreparedStatement的execute()、executeUpdate()、executeQuery()、、
以execute()
為例:
//org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement.execute()
@Override
public boolean execute() throws SQLException {
try {
clearPrevious();//本地緩存清空
shard();//路由,路由結果保存到this.routeResult。核心功能
initPreparedStatementExecutor();//初始化執行器
return preparedStatementExecutor.execute();//真實sql執行jdbc操作
} finally {
clearBatch();
}
}
分析核心路由功能shard()
//org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement.shard()
private void shard() {
routeResult = shardingEngine.shard(sql, getParameters());
}
//org.apache.shardingsphere.core.BaseShardingEngine.shard(String, List<Object>)
public SQLRouteResult shard(final String sql, final List<Object> parameters) {
List<Object> clonedParameters = cloneParameters(parameters);
SQLRouteResult result = route(sql, clonedParameters);//路由核心實現
result.getRouteUnits().addAll(HintManager.isDatabaseShardingOnly() ? convert(sql, clonedParameters, result) : rewriteAndConvert(sql, clonedParameters, result));//非hint,重寫sql
if (shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW)) {
boolean showSimple = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SIMPLE);
SQLLogger.logSQL(sql, showSimple, result.getSqlStatement(), result.getRouteUnits());//打印真實sql
}
return result;
}
//org.apache.shardingsphere.core.PreparedQueryShardingEngine.route(String, List<Object>)
@Override
protected SQLRouteResult route(final String sql, final List<Object> parameters) {
return routingEngine.route(parameters);
}
//org.apache.shardingsphere.core.route.PreparedStatementRoutingEngine.route(List<Object>)
public SQLRouteResult route(final List<Object> parameters) {
if (null == sqlStatement) {
sqlStatement = shardingRouter.parse(logicSQL, true);//代碼@1
}
return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));//代碼@2
}
代碼@1
//org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.parse(String, boolean)
//解析sql
@Override
public SQLStatement parse(final String logicSQL, final boolean useCache) {
parsingHook.start(logicSQL);//sharding-jdbc為開發預留的鈎子,我們可以實現鈎子接口在解析sql前后做一些擴展
try {
SQLStatement result = new SQLParsingEngine(databaseType, logicSQL, shardingRule, shardingMetaData.getTable(), parsingResultCache).parse(useCache);//代碼@1.1,解析sql的核心
parsingHook.finishSuccess(result, shardingMetaData.getTable());
return result;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
parsingHook.finishFailure(ex);
throw ex;
}
}
代碼@1處解析sql比較復雜,只需要知道是解析sql,解析結果SQLStatement,這個是也不是我們的關注點,知道有個hook接口可以在sql解析前后進行擴展即可,比如通過該Hook可以用作計算sql執行時長。
知道增刪改查對對應的SQLStatement如下:
對於insert來說SQLStatement是InsertStatement。DML
對於update delete語句來說SQLStatement是DMLStatement。DML
對於select語句來說SQLStatement是SelectStatement。 DQL
SQLStatement是個邏輯sql。
類關系圖如下:
代碼@2
masterSlaveRouter是讀寫分離路由,不使用的情況下,可以忽略。
分片的路由核心實現在shardingRouter.route(logicSQL, parameters, sqlStatement)
,下面分析這個
//org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.route(String, List<Object>, SQLStatement)
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement
? GeneratedKey.getGenerateKey(shardingRule, parameters, (InsertStatement) sqlStatement) : Optional.<GeneratedKey>absent();//代碼@2.1
SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey.orNull());//代碼@2.2
OptimizeResult optimizeResult = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey.orNull()).optimize();//代碼@2.3
if (generatedKey.isPresent()) {
setGeneratedKeys(result, generatedKey.get());//代碼@2.4
}
boolean needMerge = false;
if (sqlStatement instanceof SelectStatement) {
needMerge = isNeedMergeShardingValues((SelectStatement) sqlStatement);//代碼@2.5
}
if (needMerge) {
checkSubqueryShardingValues(sqlStatement, optimizeResult.getShardingConditions());
mergeShardingValues(optimizeResult.getShardingConditions());//代碼@2.6
}
RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), sqlStatement, optimizeResult).route();//代碼@2.7
if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit() && !routingResult.isSingleRouting()) {
result.setLimit(getProcessedLimit(parameters, (SelectStatement) sqlStatement));//代碼@2.8
}
if (needMerge) {
Preconditions.checkState(1 == routingResult.getTableUnits().getTableUnits().size(), "Must have one sharding with subquery.");//代碼@2.9
}
result.setRoutingResult(routingResult);//代碼@2.10
result.setOptimizeResult(optimizeResult);
return result;//代碼@2.11
}
從上面路由核心代碼可以看出,ShardingRouter是解析和路由的核心接口,其實現類為ParsingSQLRouter,它使用四個引擎對sql進行解析、解析和重寫,這四個引擎為:
-
SQLParsingEngine
解析sql,返回SQLStatement作為解析的結果。 -
OptimizeEngine
對SQLStatement進行優化,返回ShardingConditions對象。 -
RoutingEngine
根據庫表分片配置以及ShardingConditions找到目標庫表,返回RoutingResult對象。 -
SQLRewriteEngine
根據路由結果重寫sql。
代碼@2.1處:如果是insert,則生成分布式主鍵,GeneratedKey封裝了分片鍵和分布式主鍵值。比如,insert語句,這里使用Snowflake算法生成分布式主鍵。
代碼@2.2處:創建sql路由結果對象SQLRouteResult,封裝SQLStatement和分布式主鍵對象GeneratedKey。此時SQLRouteResult只是包含了sql語句和主鍵值,並沒有生成實際待執行sql。
代碼@2.3處:使用OptimizeEngine對SQLStatement進行優化,返回OptimizeResult對象。該對象在重寫sql時候用到,作用就是對SQLStatement進行優化,返回ShardingConditions對象。
代碼@2.4處:保存生成的分布式主鍵。
代碼@2.5處:select語句是否需要合並結果
代碼@2.6處:需要合並查詢結果,則合並
代碼@2.7處:使用不同的RoutingEngine生成路由結果RoutingResult。比如標准分片是StandardRoutingEngine、復合分片是ComplexRoutingEngine、廣播是DatabaseBroadcastRoutingEngine、不分片是DefaultDatabaseRoutingEngine等。 這里是核心代碼,總體功能就是路由,找到實際的數據源和真實表
代碼@2.8處:select語句設置limit。既然分庫分表了,通常也就不使用分頁了。
代碼@2.9處:預檢,需要合並結果,需要分片鍵在查詢結果上。
代碼@2.10處:把路由結果RoutingResult、優化結果OptimizeResult保存到SQLRouteResult。
代碼@2.11處:返回sql路由結果對象SQLRouteResult,該對象封裝了路由結果,知道要到哪個真實庫去執行哪個真實表。
核心代碼@2.7處分析
RoutingEngineFactory.newInstance()根據不同的分片規則采用對應的RoutingEngine生成路由結果RoutingResult,以標准分片路由為例
//org.apache.shardingsphere.core.route.type.standard.StandardRoutingEngine.route()
@Override
public RoutingResult route() {
return generateRoutingResult(getDataNodes(shardingRule.getTableRule(logicTableName)));//1.getTableRule根據邏輯表獲取TableRule,2.getDataNodes根據TableRule和分片算法獲取真實的數據源和真實表Collection<DataNode>
}
//shardingRule.getTableRule(logicTableName)根據邏輯表從分片規則ShardingRule獲取表規則TableRule,TableRule信息封裝的較多,有邏輯表、全部數據源等
//org.apache.shardingsphere.core.route.type.standard.StandardRoutingEngine.getDataNodes(TableRule)
//獲取真實節點,真實數據源和真實表。DataNode封裝了真實數據源和真實表
private Collection<DataNode> getDataNodes(final TableRule tableRule) {
if (shardingRule.isRoutingByHint(tableRule)) {//hint路由
return routeByHint(tableRule);
}
if (isRoutingByShardingConditions(tableRule)) {//條件路由,即非hint路由
return routeByShardingConditions(tableRule);
}
return routeByMixedConditions(tableRule);
}
private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
return optimizeResult.getShardingConditions().getShardingConditions().isEmpty() ? route(tableRule, Collections.<RouteValue>emptyList(), Collections.<RouteValue>emptyList())
: routeByShardingConditionsWithCondition(tableRule);
}
private Collection<DataNode> route(final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues) {
Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);//獲取真實數據源
Collection<DataNode> result = new LinkedList<>();
for (String each : routedDataSources) {
result.addAll(routeTables(tableRule, each, tableShardingValues));//獲取真實表
}
return result;
}
//根據分片鍵獲取數據源
private Collection<String> routeDataSources(final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
Collection<String> availableTargetDatabases = tableRule.getActualDatasourceNames();
if (databaseShardingValues.isEmpty()) {
return availableTargetDatabases;
}
Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(availableTargetDatabases, databaseShardingValues));//這里通過分片策略調用自定義的分片算法
Preconditions.checkState(!result.isEmpty(), "no database route info");
return result;
}
//根據分片鍵獲取DataNode,即數據源+真實表
private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
: shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues));//這里通過分片策略調用自定義的分片算法
Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
Collection<DataNode> result = new LinkedList<>();
for (String each : routedTables) {
result.add(new DataNode(routedDataSource, each));
}
return result;
}
可以看到route()方法是入口,此方法首先通過ShardingRule獲取到邏輯表所對應的TableRule對象,在sharding-jdbc啟動階段,TableRule保存了邏輯表對應的實際的庫表關系集合,接着根據庫和表的ShardingStrategy的類型走了三個不同的方法:routeByHint()、routeByShardingConditions()、routeByMixedConditions(),不管走哪個方法最終都會執行到含有三個參數的route()方法,此方法先調用routeDataSources()方法路由數據源(庫),接着調用routeTables()方法路由表,路由庫表的方法也很簡單:
從TableRule中獲取可用的庫表集合。
從TableRule中獲取庫表的分片策略ShardingStrategy對象。
執行ShardingStrategy持有的分片算法ShardingAlgorithm的doSharding()方法返回路由到的庫表。
路由的結果以RoutingResult的形式返回,接着調用SQLRewriteEngine重寫sql,因為此時sql中的表還只是邏輯表名,並不是具體的哪個表,接着生成SQLUnit,並最終以SQLRouteResult形式返回路由結果。
重點是個SQLRouteResult,關系較復雜,類圖封裝關系如下
使用xmind畫出的處理流程
思維導圖地址:https://gitee.com/yulewo123/mdpicture/blob/master/document/sharding-jdbc%E6%89%A7%E8%A1%8C%E6%B5%81%E7%A8%8B.xmind
總結:
sharding-jdbc的處理流程核心就是路由,即根據分片鍵以及算法從從TableRule.actualDataNodes獲取真實庫表對象DataNode。那么TableRule是怎么來的呢?是ShardingRule根據邏輯表獲取,而ShardingRule是核心,在sharding-jdbc啟動時候就創建完成。
路由獲取后,就可以重寫sql,然后通過jdbc執行sql到真實的數據源執行真實sql。
關鍵debug點記錄如下,工作中遇到問題,方便快速回顧debug定為問題
org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.route(String, List