sharding-jdbc-core 源碼分析


Sharding-Jdbc 源碼分析

Apache Sharding-Sphere 系列目錄(https://www.cnblogs.com/binarylei/p/12217637.html)

在看 Sharding-Jdbc 源碼之前,強烈建議先閱讀一直官網的文章:

  1. Apache Sharding-Jdbc 數據分片

JDBC 調用過程如下:APP -> ORM -> JDBC -> PROXY -> MySQL。如果要完成數據的分庫分表,可以在這五層任意地方進行,Sharding-Jdbc 是在 JDBC 層進行分庫分表,Sharding-Proxy 是在 PROXY 進行分庫分表。

Sharding-Jdbc 是一個輕量級的分庫分表框架,使用時最關鍵的是配制分庫分表策略,其余的和使用普通的 MySQL 驅動一樣,幾乎不用改代碼。具體使用方法參考:Apache Sharding-Jdbc 使用示例

try(DataSource dataSource =  ShardingDataSourceFactory.createDataSource(
    createDataSourceMap(), shardingRuleConfig, new Properties()) {
    connection Connection = dataSource.getConnection();
    ...
}

1. Sharding-Jdbc 包結構

sharding-jdbc  
    ├── sharding-jdbc-core      重寫DataSource/Connection/Statement/ResultSet四大對象
    └── sharding-jdbc-orchestration        配置中心
sharding-core
    ├── sharding-core-api       接口和配置類	
    ├── sharding-core-common    通用分片策略實現...
    ├── sharding-core-entry     SQL解析、路由、改寫,核心類BaseShardingEngine
    ├── sharding-core-route     SQL路由,核心類StatementRoutingEngine
    ├── sharding-core-rewrite   SQL改寫,核心類ShardingSQLRewriteEngine
    ├── sharding-core-execute   SQL執行,核心類ShardingExecuteEngine
    └── sharding-core-merge     結果合並,核心類MergeEngine
shardingsphere-sql-parser 
    ├── shardingsphere-sql-parser-spi       SQLParserEntry,用於初始化SQLParser
    ├── shardingsphere-sql-parser-engine    SQL解析,核心類SQLParseEngine
    ├── shardingsphere-sql-parser-relation
    └── shardingsphere-sql-parser-mysql     MySQL解析器,核心類MySQLParserEntry和MySQLParser
shardingsphere-underlying           基礎接口和api
    ├── shardingsphere-rewrite      SQLRewriteEngine接口
    ├── shardingsphere-execute      QueryResult查詢結果
    └── shardingsphere-merge        MergeEngine接口
shardingsphere-spi                  SPI加載工具類
sharding-transaction
    ├── sharding-transaction-core   接口ShardingTransactionManager,SPI加載		
    ├── sharding-transaction-2pc    實現類XAShardingTransactionManager
    └── sharding-transaction-base   實現類SeataATShardingTransactionManager

2. JDBC 四大對象

所有的一切都從 ShardingDataSourceFactory 開始的,創建了一個 ShardingDataSource 的分片數據源。除了 ShardingDataSource(分片數據源),在 Sharding-Sphere 中還有 MasterSlaveDataSourceFactory(主從數據源)、EncryptDataSourceFactory(脫敏數據源)。

public static DataSource createDataSource(
        final Map<String, DataSource> dataSourceMap,
        final ShardingRuleConfiguration shardingRuleConfig,
        final Properties props) throws SQLException {
    return new ShardingDataSource(dataSourceMap,
               new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
}

說明: 本文主要以 ShardingDataSource 為切入點分析 Sharding-Sphere 是如何對 JDBC 四大對象 DataSource、Connection、Statement、ResultSet 進行封裝的。

2.1 DataSource

DataSource、Connection

DataSource 和 Connection 都比較簡單,沒有處理過多的邏輯,只是 dataSourceMap, shardingRule 進行簡單的封裝。

ShardingDataSource 持有對數據源和分片規則,可以通過 getConnection 方法獲取 ShardingConnection 連接。

private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(
                dataSourceMap, shardingRule, props, getDatabaseType());
@Override
public final ShardingConnection getConnection() {
    return new ShardingConnection(getDataSourceMap(), runtimeContext,
            TransactionTypeHolder.get());
}

ShardingDataSource 的功能非常簡單,就不多說了。

2.2 Connection

ShardingConnection 可以創建 Statement 和 PrepareStatement 兩種運行方式:

@Override
public Statement createStatement(final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability) {
    return new ShardingStatement(this, resultSetType,
            resultSetConcurrency, resultSetHoldability);
}

@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability)
        throws SQLException {
    return new ShardingPreparedStatement(this, sql, resultSetType,
            resultSetConcurrency, resultSetHoldability);
}

說明: ShardingConnection 主要是將創建 ShardingStatement 和 ShardingPreparedStatement 兩個對象,主要的執行邏輯都在 Statement 對象中。當然 ShardingConnection 還有兩個重要的功能,一個是獲取真正的數據庫連接,一個是事務提交功能,事務不在本文討論范圍中。

protected Connection createConnection(final String dataSourceName,
        final DataSource dataSource) throws SQLException {
    return isInShardingTransaction()
            ? shardingTransactionManager.getConnection(dataSourceName)
            : dataSource.getConnection();
}

說明: 如果有事務,則需要通過事務管理器獲取連接,關於事務不在本文討論范圍中。

2.3 Statement

Statement 相對來說比較復雜,因為它都是 JDBC 的真正執行器,所有邏輯都封裝在 Statement 中。

Statement

說明: Statement 分為 ShardingStatement 和 ShardingPrepareStatement 兩種情況。本文以 ShardingStatement 為例分析 Sharding-Jdbc 執行過程。下一節會重點為分析 ShardingStatement 的執行流程。

2.4 ResultSet

ResultSet

說明: ShardingResultSet 只是對 MergedResult 的簡單封裝。

private final MergedResult mergeResultSet;
@Override
public boolean next() throws SQLException {
    return mergeResultSet.next();
}

3. Sharding-Jdbc 執行流程分析

ShardingStatement 執行時序圖

總結: ShardingStatement 執行過程如下:

  1. SimpleQueryShardingEngine(或 PreparedQueryShardingEngine):完成 SQL 解析、路由、改寫,位於 sharding-jdbc-core 工程中。SimpleQueryShardingEngine 直接將路由的功能委托給 StatementRoutingEngine(或 PreparedQueryShardingEngine),本質是對 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine 的封裝。
  2. StatementExecutor(或 PreparedStatementExecutor): 提供 SQL 執行的操作,位於 sharding-jdbc-core 工程中。本質是對 ShardingExecuteEngine 的封裝。
  3. StatementRoutingEngine:SQL 路由引擎,位於 sharding-core-route 工程中。路由引擎包裝了 SQL 解析、路由、改寫三點。SQL 路由分兩步,先進行數據分片路由(ShardingRouter),再進行主從路由(ShardingMasterSlaveRouter)。
  4. SQLParseEngine:SQL 解析引擎,位於 shardingsphere-sql-parser 工程中。目前有 MySQL和 PostgreSQL 兩種。
  5. ShardingSQLRewriteEngine:SQL 改寫引擎,位於 sharding-core-rewrite 工程中。
  6. ShardingExecuteEngine:執行引擎,位於 sharding-core-execute 工程中。StatementExecutor 對
  7. MergeEngine:結果合並引擎,位於 sharding-core-merge 工程中。

接下來一下會對 ShardingStatement 深入分析,之后會對 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine、ShardingExecuteEngine、MergeEngine 一個引擎進行分析。

4. sharding-jdbc-core 任務執行分析

ShardingStatement 內部有三個核心的類,一是 SimpleQueryShardingEngine 完成 SQL 解析、路由、改寫;一是 StatementExecutor 進行 SQL 執行;最后調用 MergeEngine 對結果進行合並處理。

4.1 ShardingStatement

4.1.2 初始化

private final ShardingConnection connection;
private final StatementExecutor statementExecutor;

public ShardingStatement(final ShardingConnection connection) {
    this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
            ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public ShardingStatement(final ShardingConnection connection, final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability) {
    super(Statement.class);
    this.connection = connection;
    statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency,
            resultSetHoldability, connection);
}

說明: ShardingStatement 內部執行 SQL 委托給了 statementExecutor。關於 ResultSet.CONCUR_READ_ONLY 等參考這里

4.1.2 執行

(1)executeQuery 執行過程

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
    ResultSet result;
    try {
        clearPrevious();
        // 1. SQL 解析、路由、改寫,最終生成 SQLRouteResult
        shard(sql);
        // 2. 生成執行計划 SQLRouteResult -> StatementExecuteUnit
        initStatementExecutor();
        // 3. statementExecutor.executeQuery() 執行任務
        MergeEngine mergeEngine = MergeEngineFactory.newInstance(
                connection.getRuntimeContext().getDatabaseType(),
                connection.getRuntimeContext().getRule(), sqlRouteResult,
                connection.getRuntimeContext().getMetaData().getRelationMetas(),
                statementExecutor.executeQuery());
        // 4. 結果合並
        result = getResultSet(mergeEngine);
    } finally {
        currentResultSet = null;
    }
    currentResultSet = result;
    return result;
}

(2)SQL 路由(包括 SQL 解析、路由、改寫)

private SQLRouteResult sqlRouteResult;
private void shard(final String sql) {
    ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
    SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(
            runtimeContext.getRule(), runtimeContext.getProps(),
            runtimeContext.getMetaData(), runtimeContext.getParseEngine());
    sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
}

說明: SimpleQueryShardingEngine 進行 SQL 路由(包括 SQL 解析、路由、改寫),生成 SQLRouteResult。之后會有一節專門分析 SQL 路由過程。

當 ShardingStatement 完成 SQL 的路由,生成 SQLRouteResult 后,剩下的執行任務就全部交給 StatementExecutor 完成。

4.2 StatementExecutor

StatementExecutor 內部封裝了 SQL 任務的執行過程,包括:SqlExecutePrepareTemplate 類生成執行計划 StatementExecuteUnit,以及 SQLExecuteTemplate 用於執行 StatementExecuteUnit。

4.2.1 類結構

StatementExecutor 類圖

4.2.2 重要屬性

AbstractStatementExecutor 類中重要的屬性:

// SQLExecutePrepareTemplate用於生成執行計划StatementExecuteUnit
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
// 保存生成的執行計划StatementExecuteUnit
private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =
            new LinkedList<>();

// SQLExecuteTemplate用於執行StatementExecuteUnit
private final SQLExecuteTemplate sqlExecuteTemplate;
// 保存查詢結果
private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();

4.2.3 生成執行計划

// 執行前清理狀態
private void clearPrevious() throws SQLException {
    statementExecutor.clear();
}
// 執行時初始化
private void initStatementExecutor() throws SQLException {
    statementExecutor.init(sqlRouteResult);
    replayMethodForStatements();
}

說明: StatementExecutor 是有狀態的,每次執行前都要調用 statementExecutor.clear() 清理上一次執行的狀態,並調用 statementExecutor.init() 重新初始化。下面我們看一下 init 主要做了些什么事。

statementExecutor.init() 初始化主要是生成執行計划 StatementExecuteUnit。

public void init(final SQLRouteResult routeResult) throws SQLException {
    setSqlStatementContext(routeResult.getSqlStatementContext());
    getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
    cacheStatements();
}

private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(
        final Collection<RouteUnit> routeUnits) throws SQLException {
    return getSqlExecutePrepareTemplate().getExecuteUnitGroups(
            routeUnits, new SQLExecutePrepareCallback() {
                // 獲取連接
                @Override
                public List<Connection> getConnections(
                        final ConnectionMode connectionMode,
                        final String dataSourceName, final int connectionSize)
                        throws SQLException {
                    return StatementExecutor.super.getConnection().getConnections(
                            connectionMode, dataSourceName, connectionSize);
                }

                // 生成執行計划RouteUnit -> StatementExecuteUnit
                @Override
                public StatementExecuteUnit createStatementExecuteUnit(
                        final Connection connection, final RouteUnit routeUnit,
                        final ConnectionMode connectionMode) throws SQLException {
                    return new StatementExecuteUnit(
                            routeUnit, connection.createStatement(
                            getResultSetType(), getResultSetConcurrency(),
                            getResultSetHoldability()), connectionMode);
                }
            });
}

說明: SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一個工具類,專門用於生成執行計划,將 RouteUnit 轉化為 StatementExecuteUnit。同時還提供了另一個工具類 SQLExecuteTemplate 用於執行 StatementExecuteUnit,在任務執行時我們會看到這個類。

4.2.4 任務執行

public List<QueryResult> executeQuery() throws SQLException {
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    SQLExecuteCallback<QueryResult> executeCallback = 
        new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
        @Override
        protected QueryResult executeSQL(final String sql, final Statement statement,
                final ConnectionMode connectionMode) throws SQLException {
            return getQueryResult(sql, statement, connectionMode);
        }
    };
    // 執行StatementExecuteUnit
    return executeCallback(executeCallback);
}

// sqlExecuteTemplate 執行 executeGroups(即StatementExecuteUnit)
protected final <T> List<T> executeCallback(
        final SQLExecuteCallback<T> executeCallback) throws SQLException {
    // 執行所有的任務 StatementExecuteUnit
    List<T> result = sqlExecuteTemplate.executeGroup(
            (Collection) executeGroups, executeCallback);
    refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
    return result;
}

說明: SqlExecuteTemplate 執行 StatementExecuteUnit 會回調 SQLExecuteCallback#executeSQL 方法,最終調用 getQueryResult 方法。

private QueryResult getQueryResult(final String sql, final Statement statement,
        final ConnectionMode connectionMode) throws SQLException {
    ResultSet resultSet = statement.executeQuery(sql);
    getResultSets().add(resultSet);
    return ConnectionMode.MEMORY_STRICTLY == connectionMode
            ? new StreamQueryResult(resultSet)
            : new MemoryQueryResult(resultSet);
}

說明: ConnectionMode 有兩種模式:內存限制(MEMORY_STRICTLY)和連接限制(CONNECTION_STRICTLY),本質是一種資源隔離,保護服務器資源不被消耗殆盡。

如果一個連接執行多個 StatementExecuteUnit 則為內存限制(MEMORY_STRICTLY),采用流式處理,即 StreamQueryResult ,反之則為連接限制(CONNECTION_STRICTLY),此時會將所有從 MySQL 服務器返回的數據都加載到內存中。特別是在 Sharding-Proxy 中特別有用,避免將代理服務器撐爆,見 Apache Sharding-Proxy 核心原理


每天用心記錄一點點。內容也許不重要,但習慣很重要!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM