Sharding-Jdbc 源碼分析
Apache Sharding-Sphere 系列目錄(https://www.cnblogs.com/binarylei/p/12217637.html)
在看 Sharding-Jdbc 源碼之前,強烈建議先閱讀一直官網的文章:
JDBC 調用過程如下:APP -> ORM -> JDBC -> PROXY -> MySQL。如果要完成數據的分庫分表,可以在這五層任意地方進行,Sharding-Jdbc 是在 JDBC 層進行分庫分表,Sharding-Proxy 是在 PROXY 進行分庫分表。
Sharding-Jdbc 是一個輕量級的分庫分表框架,使用時最關鍵的是配制分庫分表策略,其余的和使用普通的 MySQL 驅動一樣,幾乎不用改代碼。具體使用方法參考:Apache Sharding-Jdbc 使用示例
try(DataSource dataSource = ShardingDataSourceFactory.createDataSource(
createDataSourceMap(), shardingRuleConfig, new Properties()) {
connection Connection = dataSource.getConnection();
...
}
1. Sharding-Jdbc 包結構
sharding-jdbc
├── sharding-jdbc-core 重寫DataSource/Connection/Statement/ResultSet四大對象
└── sharding-jdbc-orchestration 配置中心
sharding-core
├── sharding-core-api 接口和配置類
├── sharding-core-common 通用分片策略實現...
├── sharding-core-entry SQL解析、路由、改寫,核心類BaseShardingEngine
├── sharding-core-route SQL路由,核心類StatementRoutingEngine
├── sharding-core-rewrite SQL改寫,核心類ShardingSQLRewriteEngine
├── sharding-core-execute SQL執行,核心類ShardingExecuteEngine
└── sharding-core-merge 結果合並,核心類MergeEngine
shardingsphere-sql-parser
├── shardingsphere-sql-parser-spi SQLParserEntry,用於初始化SQLParser
├── shardingsphere-sql-parser-engine SQL解析,核心類SQLParseEngine
├── shardingsphere-sql-parser-relation
└── shardingsphere-sql-parser-mysql MySQL解析器,核心類MySQLParserEntry和MySQLParser
shardingsphere-underlying 基礎接口和api
├── shardingsphere-rewrite SQLRewriteEngine接口
├── shardingsphere-execute QueryResult查詢結果
└── shardingsphere-merge MergeEngine接口
shardingsphere-spi SPI加載工具類
sharding-transaction
├── sharding-transaction-core 接口ShardingTransactionManager,SPI加載
├── sharding-transaction-2pc 實現類XAShardingTransactionManager
└── sharding-transaction-base 實現類SeataATShardingTransactionManager
2. JDBC 四大對象
所有的一切都從 ShardingDataSourceFactory 開始的,創建了一個 ShardingDataSource 的分片數據源。除了 ShardingDataSource(分片數據源),在 Sharding-Sphere 中還有 MasterSlaveDataSourceFactory(主從數據源)、EncryptDataSourceFactory(脫敏數據源)。
public static DataSource createDataSource(
final Map<String, DataSource> dataSourceMap,
final ShardingRuleConfiguration shardingRuleConfig,
final Properties props) throws SQLException {
return new ShardingDataSource(dataSourceMap,
new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
}
說明: 本文主要以 ShardingDataSource 為切入點分析 Sharding-Sphere 是如何對 JDBC 四大對象 DataSource、Connection、Statement、ResultSet 進行封裝的。
2.1 DataSource


DataSource 和 Connection 都比較簡單,沒有處理過多的邏輯,只是 dataSourceMap, shardingRule 進行簡單的封裝。
ShardingDataSource 持有對數據源和分片規則,可以通過 getConnection 方法獲取 ShardingConnection 連接。
private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(
dataSourceMap, shardingRule, props, getDatabaseType());
@Override
public final ShardingConnection getConnection() {
return new ShardingConnection(getDataSourceMap(), runtimeContext,
TransactionTypeHolder.get());
}
ShardingDataSource 的功能非常簡單,就不多說了。
2.2 Connection
ShardingConnection 可以創建 Statement 和 PrepareStatement 兩種運行方式:
@Override
public Statement createStatement(final int resultSetType,
final int resultSetConcurrency, final int resultSetHoldability) {
return new ShardingStatement(this, resultSetType,
resultSetConcurrency, resultSetHoldability);
}
@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType,
final int resultSetConcurrency, final int resultSetHoldability)
throws SQLException {
return new ShardingPreparedStatement(this, sql, resultSetType,
resultSetConcurrency, resultSetHoldability);
}
說明: ShardingConnection 主要是將創建 ShardingStatement 和 ShardingPreparedStatement 兩個對象,主要的執行邏輯都在 Statement 對象中。當然 ShardingConnection 還有兩個重要的功能,一個是獲取真正的數據庫連接,一個是事務提交功能,事務不在本文討論范圍中。
protected Connection createConnection(final String dataSourceName,
final DataSource dataSource) throws SQLException {
return isInShardingTransaction()
? shardingTransactionManager.getConnection(dataSourceName)
: dataSource.getConnection();
}
說明: 如果有事務,則需要通過事務管理器獲取連接,關於事務不在本文討論范圍中。
2.3 Statement
Statement 相對來說比較復雜,因為它都是 JDBC 的真正執行器,所有邏輯都封裝在 Statement 中。

說明: Statement 分為 ShardingStatement 和 ShardingPrepareStatement 兩種情況。本文以 ShardingStatement 為例分析 Sharding-Jdbc 執行過程。下一節會重點為分析 ShardingStatement 的執行流程。
2.4 ResultSet

說明: ShardingResultSet 只是對 MergedResult 的簡單封裝。
private final MergedResult mergeResultSet;
@Override
public boolean next() throws SQLException {
return mergeResultSet.next();
}
3. Sharding-Jdbc 執行流程分析

總結: ShardingStatement 執行過程如下:
- SimpleQueryShardingEngine(或 PreparedQueryShardingEngine):完成 SQL 解析、路由、改寫,位於 sharding-jdbc-core 工程中。SimpleQueryShardingEngine 直接將路由的功能委托給 StatementRoutingEngine(或 PreparedQueryShardingEngine),本質是對 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine 的封裝。
- StatementExecutor(或 PreparedStatementExecutor): 提供 SQL 執行的操作,位於 sharding-jdbc-core 工程中。本質是對 ShardingExecuteEngine 的封裝。
- StatementRoutingEngine:SQL 路由引擎,位於 sharding-core-route 工程中。路由引擎包裝了 SQL 解析、路由、改寫三點。SQL 路由分兩步,先進行數據分片路由(ShardingRouter),再進行主從路由(ShardingMasterSlaveRouter)。
- SQLParseEngine:SQL 解析引擎,位於 shardingsphere-sql-parser 工程中。目前有 MySQL和 PostgreSQL 兩種。
- ShardingSQLRewriteEngine:SQL 改寫引擎,位於 sharding-core-rewrite 工程中。
- ShardingExecuteEngine:執行引擎,位於 sharding-core-execute 工程中。StatementExecutor 對
- MergeEngine:結果合並引擎,位於 sharding-core-merge 工程中。
接下來一下會對 ShardingStatement 深入分析,之后會對 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine、ShardingExecuteEngine、MergeEngine 一個引擎進行分析。
4. sharding-jdbc-core 任務執行分析
ShardingStatement 內部有三個核心的類,一是 SimpleQueryShardingEngine 完成 SQL 解析、路由、改寫;一是 StatementExecutor 進行 SQL 執行;最后調用 MergeEngine 對結果進行合並處理。
4.1 ShardingStatement
4.1.2 初始化
private final ShardingConnection connection;
private final StatementExecutor statementExecutor;
public ShardingStatement(final ShardingConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public ShardingStatement(final ShardingConnection connection, final int resultSetType,
final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.connection = connection;
statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency,
resultSetHoldability, connection);
}
說明: ShardingStatement 內部執行 SQL 委托給了 statementExecutor。關於 ResultSet.CONCUR_READ_ONLY 等參考這里。
4.1.2 執行
(1)executeQuery 執行過程
@Override
public ResultSet executeQuery(final String sql) throws SQLException {
ResultSet result;
try {
clearPrevious();
// 1. SQL 解析、路由、改寫,最終生成 SQLRouteResult
shard(sql);
// 2. 生成執行計划 SQLRouteResult -> StatementExecuteUnit
initStatementExecutor();
// 3. statementExecutor.executeQuery() 執行任務
MergeEngine mergeEngine = MergeEngineFactory.newInstance(
connection.getRuntimeContext().getDatabaseType(),
connection.getRuntimeContext().getRule(), sqlRouteResult,
connection.getRuntimeContext().getMetaData().getRelationMetas(),
statementExecutor.executeQuery());
// 4. 結果合並
result = getResultSet(mergeEngine);
} finally {
currentResultSet = null;
}
currentResultSet = result;
return result;
}
(2)SQL 路由(包括 SQL 解析、路由、改寫)
private SQLRouteResult sqlRouteResult;
private void shard(final String sql) {
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(
runtimeContext.getRule(), runtimeContext.getProps(),
runtimeContext.getMetaData(), runtimeContext.getParseEngine());
sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
}
說明: SimpleQueryShardingEngine 進行 SQL 路由(包括 SQL 解析、路由、改寫),生成 SQLRouteResult。之后會有一節專門分析 SQL 路由過程。
當 ShardingStatement 完成 SQL 的路由,生成 SQLRouteResult 后,剩下的執行任務就全部交給 StatementExecutor 完成。
4.2 StatementExecutor
StatementExecutor 內部封裝了 SQL 任務的執行過程,包括:SqlExecutePrepareTemplate 類生成執行計划 StatementExecuteUnit,以及 SQLExecuteTemplate 用於執行 StatementExecuteUnit。
4.2.1 類結構

4.2.2 重要屬性
AbstractStatementExecutor 類中重要的屬性:
// SQLExecutePrepareTemplate用於生成執行計划StatementExecuteUnit
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
// 保存生成的執行計划StatementExecuteUnit
private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =
new LinkedList<>();
// SQLExecuteTemplate用於執行StatementExecuteUnit
private final SQLExecuteTemplate sqlExecuteTemplate;
// 保存查詢結果
private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
4.2.3 生成執行計划
// 執行前清理狀態
private void clearPrevious() throws SQLException {
statementExecutor.clear();
}
// 執行時初始化
private void initStatementExecutor() throws SQLException {
statementExecutor.init(sqlRouteResult);
replayMethodForStatements();
}
說明: StatementExecutor 是有狀態的,每次執行前都要調用 statementExecutor.clear() 清理上一次執行的狀態,並調用 statementExecutor.init() 重新初始化。下面我們看一下 init 主要做了些什么事。
statementExecutor.init() 初始化主要是生成執行計划 StatementExecuteUnit。
public void init(final SQLRouteResult routeResult) throws SQLException {
setSqlStatementContext(routeResult.getSqlStatementContext());
getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
cacheStatements();
}
private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(
final Collection<RouteUnit> routeUnits) throws SQLException {
return getSqlExecutePrepareTemplate().getExecuteUnitGroups(
routeUnits, new SQLExecutePrepareCallback() {
// 獲取連接
@Override
public List<Connection> getConnections(
final ConnectionMode connectionMode,
final String dataSourceName, final int connectionSize)
throws SQLException {
return StatementExecutor.super.getConnection().getConnections(
connectionMode, dataSourceName, connectionSize);
}
// 生成執行計划RouteUnit -> StatementExecuteUnit
@Override
public StatementExecuteUnit createStatementExecuteUnit(
final Connection connection, final RouteUnit routeUnit,
final ConnectionMode connectionMode) throws SQLException {
return new StatementExecuteUnit(
routeUnit, connection.createStatement(
getResultSetType(), getResultSetConcurrency(),
getResultSetHoldability()), connectionMode);
}
});
}
說明: SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一個工具類,專門用於生成執行計划,將 RouteUnit 轉化為 StatementExecuteUnit。同時還提供了另一個工具類 SQLExecuteTemplate 用於執行 StatementExecuteUnit,在任務執行時我們會看到這個類。
4.2.4 任務執行
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<QueryResult> executeCallback =
new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
protected QueryResult executeSQL(final String sql, final Statement statement,
final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(sql, statement, connectionMode);
}
};
// 執行StatementExecuteUnit
return executeCallback(executeCallback);
}
// sqlExecuteTemplate 執行 executeGroups(即StatementExecuteUnit)
protected final <T> List<T> executeCallback(
final SQLExecuteCallback<T> executeCallback) throws SQLException {
// 執行所有的任務 StatementExecuteUnit
List<T> result = sqlExecuteTemplate.executeGroup(
(Collection) executeGroups, executeCallback);
refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
return result;
}
說明: SqlExecuteTemplate 執行 StatementExecuteUnit 會回調 SQLExecuteCallback#executeSQL 方法,最終調用 getQueryResult 方法。
private QueryResult getQueryResult(final String sql, final Statement statement,
final ConnectionMode connectionMode) throws SQLException {
ResultSet resultSet = statement.executeQuery(sql);
getResultSets().add(resultSet);
return ConnectionMode.MEMORY_STRICTLY == connectionMode
? new StreamQueryResult(resultSet)
: new MemoryQueryResult(resultSet);
}
說明: ConnectionMode 有兩種模式:內存限制(MEMORY_STRICTLY)和連接限制(CONNECTION_STRICTLY),本質是一種資源隔離,保護服務器資源不被消耗殆盡。
如果一個連接執行多個 StatementExecuteUnit 則為內存限制(MEMORY_STRICTLY),采用流式處理,即 StreamQueryResult ,反之則為連接限制(CONNECTION_STRICTLY),此時會將所有從 MySQL 服務器返回的數據都加載到內存中。特別是在 Sharding-Proxy 中特別有用,避免將代理服務器撐爆,見 Apache Sharding-Proxy 核心原理。
每天用心記錄一點點。內容也許不重要,但習慣很重要!