sharding-jdbc處理流程源碼分析


前言

sharding-jdbc主要功能是分片,我們實現不同分片算法來進行分庫分表,另外一個擴展點就是主鍵生成, 本文主要記錄下sharding-jdbc執行流程和分片路由具體實現以及主鍵生成,在工作中方便排查問題。

主要記錄三個問題:

1.sharding-jdbc執行流程

2.自定義分片算法是如何被sharding-jdbc框架調用的

3.主鍵是在何處何時生成

4.擴展機制spi

1. sharding-jdbc處理流程

操作數據庫套路是:數據源獲取數據庫連接,數據庫連接生成Statement,然后執行Statement,獲取sql執行結果。

那么對於sharding來說

入口獲取數據庫連接就是ShardingDataSource.getConnection()

接着生成PreparedStatement:ShardingConnection.prepareStatement(String),生成ShardingPreparedStatement

對於增刪改查就是ShardingPreparedStatement的execute()、executeUpdate()、executeQuery()、、

execute()為例:

//org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement.execute()
@Override
public boolean execute() throws SQLException {
    try {
        clearPrevious();//本地緩存清空
        shard();//路由,路由結果保存到this.routeResult。核心功能
        initPreparedStatementExecutor();//初始化執行器
        return preparedStatementExecutor.execute();//真實sql執行jdbc操作
    } finally {
        clearBatch();
    }
}

分析核心路由功能shard()

//org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement.shard()
private void shard() {
        routeResult = shardingEngine.shard(sql, getParameters());
    }

//org.apache.shardingsphere.core.BaseShardingEngine.shard(String, List<Object>)
public SQLRouteResult shard(final String sql, final List<Object> parameters) {
    List<Object> clonedParameters = cloneParameters(parameters);
    SQLRouteResult result = route(sql, clonedParameters);//路由核心實現
    result.getRouteUnits().addAll(HintManager.isDatabaseShardingOnly() ? convert(sql, clonedParameters, result) : rewriteAndConvert(sql, clonedParameters, result));//非hint,重寫sql
    if (shardingProperties.getValue(ShardingPropertiesConstant.SQL_SHOW)) {
        boolean showSimple = shardingProperties.getValue(ShardingPropertiesConstant.SQL_SIMPLE);
        SQLLogger.logSQL(sql, showSimple, result.getSqlStatement(), result.getRouteUnits());//打印真實sql
    }
    return result;
}

//org.apache.shardingsphere.core.PreparedQueryShardingEngine.route(String, List<Object>)
@Override
protected SQLRouteResult route(final String sql, final List<Object> parameters) {
    return routingEngine.route(parameters);
}
//org.apache.shardingsphere.core.route.PreparedStatementRoutingEngine.route(List<Object>)
public SQLRouteResult route(final List<Object> parameters) {
    if (null == sqlStatement) {
        sqlStatement = shardingRouter.parse(logicSQL, true);//代碼@1
    }
    return masterSlaveRouter.route(shardingRouter.route(logicSQL, parameters, sqlStatement));//代碼@2
}

代碼@1

//org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.parse(String, boolean) 
//解析sql
@Override
public SQLStatement parse(final String logicSQL, final boolean useCache) {
    parsingHook.start(logicSQL);//sharding-jdbc為開發預留的鈎子,我們可以實現鈎子接口在解析sql前后做一些擴展
    try {
        SQLStatement result = new SQLParsingEngine(databaseType, logicSQL, shardingRule, shardingMetaData.getTable(), parsingResultCache).parse(useCache);//代碼@1.1,解析sql的核心
        parsingHook.finishSuccess(result, shardingMetaData.getTable());
        return result;
        // CHECKSTYLE:OFF
    } catch (final Exception ex) {
        // CHECKSTYLE:ON
        parsingHook.finishFailure(ex);
        throw ex;
    }
}

代碼@1處解析sql比較復雜,只需要知道是解析sql,解析結果SQLStatement,這個是也不是我們的關注點,知道有個hook接口可以在sql解析前后進行擴展即可,比如通過該Hook可以用作計算sql執行時長。

知道增刪改查對對應的SQLStatement如下:

對於insert來說SQLStatement是InsertStatement。DML

對於update delete語句來說SQLStatement是DMLStatement。DML

對於select語句來說SQLStatement是SelectStatement。 DQL

SQLStatement是個邏輯sql。

類關系圖如下:

image-20210420232603003

代碼@2

masterSlaveRouter是讀寫分離路由,不使用的情況下,可以忽略。

分片的路由核心實現在shardingRouter.route(logicSQL, parameters, sqlStatement),下面分析這個

//org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.route(String, List<Object>, SQLStatement)
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
        Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement
                ? GeneratedKey.getGenerateKey(shardingRule, parameters, (InsertStatement) sqlStatement) : Optional.<GeneratedKey>absent();//代碼@2.1
        SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey.orNull());//代碼@2.2
        OptimizeResult optimizeResult = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey.orNull()).optimize();//代碼@2.3
        if (generatedKey.isPresent()) {
            setGeneratedKeys(result, generatedKey.get());//代碼@2.4
        }
        boolean needMerge = false;
        if (sqlStatement instanceof SelectStatement) {
            needMerge = isNeedMergeShardingValues((SelectStatement) sqlStatement);//代碼@2.5
        }
        if (needMerge) {
            checkSubqueryShardingValues(sqlStatement, optimizeResult.getShardingConditions());
            mergeShardingValues(optimizeResult.getShardingConditions());//代碼@2.6
        }
        RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), sqlStatement, optimizeResult).route();//代碼@2.7
        if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit() && !routingResult.isSingleRouting()) {
            result.setLimit(getProcessedLimit(parameters, (SelectStatement) sqlStatement));//代碼@2.8
        }
        if (needMerge) {
            Preconditions.checkState(1 == routingResult.getTableUnits().getTableUnits().size(), "Must have one sharding with subquery.");//代碼@2.9
        }
        result.setRoutingResult(routingResult);//代碼@2.10
        result.setOptimizeResult(optimizeResult);
        return result;//代碼@2.11
    }

從上面路由核心代碼可以看出,ShardingRouter是解析和路由的核心接口,其實現類為ParsingSQLRouter,它使用四個引擎對sql進行解析、解析和重寫,這四個引擎為:

  • SQLParsingEngine
    解析sql,返回SQLStatement作為解析的結果。

  • OptimizeEngine
    對SQLStatement進行優化,返回ShardingConditions對象。

  • RoutingEngine
    根據庫表分片配置以及ShardingConditions找到目標庫表,返回RoutingResult對象。

  • SQLRewriteEngine
    根據路由結果重寫sql。

代碼@2.1處:如果是insert,則生成分布式主鍵,GeneratedKey封裝了分片鍵和分布式主鍵值。比如,insert語句,這里使用Snowflake算法生成分布式主鍵。

代碼@2.2處:創建sql路由結果對象SQLRouteResult,封裝SQLStatement和分布式主鍵對象GeneratedKey。此時SQLRouteResult只是包含了sql語句和主鍵值,並沒有生成實際待執行sql。

代碼@2.3處:使用OptimizeEngine對SQLStatement進行優化,返回OptimizeResult對象。該對象在重寫sql時候用到,作用就是對SQLStatement進行優化,返回ShardingConditions對象。

代碼@2.4處:保存生成的分布式主鍵。

代碼@2.5處:select語句是否需要合並結果

代碼@2.6處:需要合並查詢結果,則合並

代碼@2.7處:使用不同的RoutingEngine生成路由結果RoutingResult。比如標准分片是StandardRoutingEngine、復合分片是ComplexRoutingEngine、廣播是DatabaseBroadcastRoutingEngine、不分片是DefaultDatabaseRoutingEngine等。 這里是核心代碼,總體功能就是路由,找到實際的數據源和真實表

image-20210414221409197

代碼@2.8處:select語句設置limit。既然分庫分表了,通常也就不使用分頁了。

代碼@2.9處:預檢,需要合並結果,需要分片鍵在查詢結果上。

代碼@2.10處:把路由結果RoutingResult、優化結果OptimizeResult保存到SQLRouteResult。

代碼@2.11處:返回sql路由結果對象SQLRouteResult,該對象封裝了路由結果,知道要到哪個真實庫去執行哪個真實表。

核心代碼@2.7處分析

RoutingEngineFactory.newInstance()根據不同的分片規則采用對應的RoutingEngine生成路由結果RoutingResult,以標准分片路由為例

//org.apache.shardingsphere.core.route.type.standard.StandardRoutingEngine.route()
@Override
public RoutingResult route() {
    return generateRoutingResult(getDataNodes(shardingRule.getTableRule(logicTableName)));//1.getTableRule根據邏輯表獲取TableRule,2.getDataNodes根據TableRule和分片算法獲取真實的數據源和真實表Collection<DataNode>
}
//shardingRule.getTableRule(logicTableName)根據邏輯表從分片規則ShardingRule獲取表規則TableRule,TableRule信息封裝的較多,有邏輯表、全部數據源等

//org.apache.shardingsphere.core.route.type.standard.StandardRoutingEngine.getDataNodes(TableRule)
//獲取真實節點,真實數據源和真實表。DataNode封裝了真實數據源和真實表
private Collection<DataNode> getDataNodes(final TableRule tableRule) {
    if (shardingRule.isRoutingByHint(tableRule)) {//hint路由
        return routeByHint(tableRule);
    }
    if (isRoutingByShardingConditions(tableRule)) {//條件路由,即非hint路由
        return routeByShardingConditions(tableRule);
    }
    return routeByMixedConditions(tableRule);
}

private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
    return optimizeResult.getShardingConditions().getShardingConditions().isEmpty() ? route(tableRule, Collections.<RouteValue>emptyList(), Collections.<RouteValue>emptyList())
        : routeByShardingConditionsWithCondition(tableRule);
}

private Collection<DataNode> route(final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues) {
    Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues);//獲取真實數據源
    Collection<DataNode> result = new LinkedList<>();
    for (String each : routedDataSources) {
        result.addAll(routeTables(tableRule, each, tableShardingValues));//獲取真實表
    }
    return result;
}

//根據分片鍵獲取數據源
private Collection<String> routeDataSources(final TableRule tableRule, final List<RouteValue> databaseShardingValues) {
    Collection<String> availableTargetDatabases = tableRule.getActualDatasourceNames();
    if (databaseShardingValues.isEmpty()) {
        return availableTargetDatabases;
    }
    Collection<String> result = new LinkedHashSet<>(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(availableTargetDatabases, databaseShardingValues));//這里通過分片策略調用自定義的分片算法
    Preconditions.checkState(!result.isEmpty(), "no database route info");
    return result;
}

//根據分片鍵獲取DataNode,即數據源+真實表
private Collection<DataNode> routeTables(final TableRule tableRule, final String routedDataSource, final List<RouteValue> tableShardingValues) {
    Collection<String> availableTargetTables = tableRule.getActualTableNames(routedDataSource);
    Collection<String> routedTables = new LinkedHashSet<>(tableShardingValues.isEmpty() ? availableTargetTables
                                                          : shardingRule.getTableShardingStrategy(tableRule).doSharding(availableTargetTables, tableShardingValues));//這里通過分片策略調用自定義的分片算法
    Preconditions.checkState(!routedTables.isEmpty(), "no table route info");
    Collection<DataNode> result = new LinkedList<>();
    for (String each : routedTables) {
        result.add(new DataNode(routedDataSource, each));
    }
    return result;
}

可以看到route()方法是入口,此方法首先通過ShardingRule獲取到邏輯表所對應的TableRule對象,在sharding-jdbc啟動階段,TableRule保存了邏輯表對應的實際的庫表關系集合,接着根據庫和表的ShardingStrategy的類型走了三個不同的方法:routeByHint()、routeByShardingConditions()、routeByMixedConditions(),不管走哪個方法最終都會執行到含有三個參數的route()方法,此方法先調用routeDataSources()方法路由數據源(庫),接着調用routeTables()方法路由表,路由庫表的方法也很簡單:

從TableRule中獲取可用的庫表集合。
從TableRule中獲取庫表的分片策略ShardingStrategy對象。
執行ShardingStrategy持有的分片算法ShardingAlgorithm的doSharding()方法返回路由到的庫表。
路由的結果以RoutingResult的形式返回,接着調用SQLRewriteEngine重寫sql,因為此時sql中的表還只是邏輯表名,並不是具體的哪個表,接着生成SQLUnit,並最終以SQLRouteResult形式返回路由結果。

重點是個SQLRouteResult,關系較復雜,類圖封裝關系如下

image-20210421004127246

使用xmind畫出的處理流程

sharding-jdbc處理流程

思維導圖地址:https://gitee.com/yulewo123/mdpicture/blob/master/document/sharding-jdbc%E6%89%A7%E8%A1%8C%E6%B5%81%E7%A8%8B.xmind

總結:

sharding-jdbc的處理流程核心就是路由,即根據分片鍵以及算法從從TableRule.actualDataNodes獲取真實庫表對象DataNode。那么TableRule是怎么來的呢?是ShardingRule根據邏輯表獲取,而ShardingRule是核心,在sharding-jdbc啟動時候就創建完成。

路由獲取后,就可以重寫sql,然后通過jdbc執行sql到真實的數據源執行真實sql。

關鍵debug點記錄如下,工作中遇到問題,方便快速回顧debug定為問題

org.apache.shardingsphere.core.route.router.sharding.ParsingSQLRouter.route(String, List, SQLStatement) 路由核心

2.sharding-jdbc的擴展點

sharding-jdbc設計采用jdk的spi進行擴展,所有擴展注冊都會調用org.apache.shardingsphere.core.spi.NewInstanceServiceLoader.register(Class<T>),因此跟蹤這個方法調用如下

image-20210424234705670

前面三個是在啟動過程進行注冊,后面五個是在首次運行過程中進行注冊。

列舉出sharding的spi擴展點

啟動過程

org.apache.shardingsphere.spi.masterslave.MasterSlaveLoadBalanceAlgorithm 讀寫分離擴展,可以擴展使用什么算法來選擇對應的數據源。

org.apache.shardingsphere.spi.encrypt.ShardingEncryptor 加密方式擴展,擴展加密方式

interface org.apache.shardingsphere.spi.keygen.ShardingKeyGenerator 分布式主鍵生成,可以通過實行這個接口增加分布式主鍵生成算法

首次運行

interface org.apache.shardingsphere.core.execute.hook.RootInvokeHook 根調用鈎子,具體就是在創建shardingConnection時候就啟用,關閉時候完成,可以用於統計一個sharding從開始執行到執行結束耗時,可用於監控

interface org.apache.shardingsphere.core.parse.hook.ParsingHook sql解析鈎子,比如可以用於統計sql解析耗時等

interface org.apache.shardingsphere.core.parse.spi.ShardingParseEngine 解析引擎,sharding-jdbc已經針對每個數據庫類型增加了對應的解析引擎

interface org.apache.shardingsphere.core.rewrite.hook.RewriteHook 重寫sql鈎子

interface org.apache.shardingsphere.core.execute.hook.SQLExecutionHook sql執行鈎子,可以計算sql執行時長,用於監控

以上這些spi接口,斷點打在這些spi接口的子類hook上即可,觀測到調用。

比如添加一個自定義打印sql執行耗時:

public class CustomShardingSQLExecutionHook implements SQLExecutionHook {

	@Override
	public void start(RouteUnit routeUnit, DataSourceMetaData dataSourceMetaData, boolean isTrunkThread,
			Map<String, Object> shardingExecuteDataMap) {
		// TODO Auto-generated method stub
		System.err.println("start");//控制台紅色打印
	}

	@Override
	public void finishSuccess() {
		// TODO Auto-generated method stub
		System.err.println("finishSuccess");

	}

	@Override
	public void finishFailure(Exception cause) {
		// TODO Auto-generated method stub
		System.err.println("finishFailure");
	}
}
//同時在resources下創建META-INF/services/org.apache.shardingsphere.core.execute.hook.SQLExecutionHook,內容如下
//com.zyj.sharding.hook.CustomShardingSQLExecutionHook
//這樣自定義的sql執行鈎子就生效了,就是jdk的spi寫法。

參考 https://www.jianshu.com/p/4cb5b2b68f8e


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM