1、前言
鑒於最近shardingSphere非常火爆,分析分析下其中的實現
2、常見的sharding的策略
| 實現層面 | 應用框架 | 優劣分析 |
| 數據訪問層(DAO層) | 適合在該層進行自主研發 | 不受ORM框架的制約、實現起來較為簡單、易於根據系統特點進行靈活的定制、無需SQL解析和路由規則匹配,性能上表現會稍好一些;劣勢在於:有一定的技術門檻,工作量比依靠框架實現要大(反過來看,框架會有學習成本)、不通用,只能在特定系統里工作。當然,在DAO層同樣可以通過XML配置或是注解將sharding邏輯抽離到“外部”,形成一套通用的框架. 不過目前還沒有出現此類的框架 |
| ORM 框架層 | Guzz、Hibernate Shards、MybatisSharding | 目前的hibernate shards來看,表現還算不上令人滿意,主要是它對使用hibernate的限制過多,比如它對HQL的支持就非常有限,針對Mybatis,可以在攔截器層做數據的分表,通過改寫SqlSession 去實現數據源的路由(該方式的路由只能通過MapperId,MapperNameSpace和參數等幾種方式實現路由數據源,不能通過表路由數據源),但是對simple和batch模式對處理不太友好。 |
| JDBC API 層 | dbShards、sharding-sphere | JDBC API層是很多人都會想到的一個實現sharding的絕佳場所,工作量較大,幸好有sharding-sphere幫我們做了這些 |
| DAO與JDBC之間的Spring 數據庫訪問封裝層 | CobarClient 或在 該層自主開發 | 該層和ORM 框架層 差不多,需要根據封裝做業務等出來 |
| 應用服務器與數據庫之間的代理層 | Mysql Proxy、Amoeba,mycat,sharding-Sphere | 該方式對業務來說完全隔離,開發人員不太需要關注分表分庫對事兒,但是開發需要知道有哪些是不能做的,哪些是可以做的,在開發出現問題時,需要相關負責代理層的人協助排查。 |
3、JDBC 連接過程
//創建數據源 DataSource dataSource = new com.zaxxer.hikari.HikariDataSource(); // 獲取連接 Connetion conn =dataSource.getConnetion(); // 建立PreparedStatement對象 Statement stmt=conn.createStatement(); //執行SQL查詢 String sql="select * from users"; ResultSet rs=stmt.executeQuery(sql); //建立PreparedStatement對象 String sql="select * from user where userName=? and password=?"; PreparedStatement pstmt=Conn.prepareStatement(sql); pstmt.setString(1,"admin"); pstmt.setString(2,"liubin"); //執行動態SQL查詢 ResultSet rs=pstmt.executeQuery(); //執行insert update delete等語句,先定義sql stmt.executeUpdate(sql);
通過上面的代碼可以發現DataSource,Connetion,ResultSet,PreparedStatement 這幾個核心類,於是sharding-sphere重新實現上面幾個接口,實現分表分庫。
4、sharding-sphere實現的JDBC核心類
- MasterSlaveConnection
- ShardingConnection
- MasterSlaveDataSource
- ShardingDataSource
- MasterSlavePreparedStatement
- MasterSlaveStatement
- ShardingPreparedStatement
- ShardingStatement
根據名稱我們可以看出
1. MasterSlaveConnection、MasterSlaveDataSource、MasterSlavePreparedStatement、MasterSlaveStatement 根據主從數據庫實現的類
2. ShardingConnection、ShardingDataSource、ShardingPreparedStatement、ShardingStatement 根據多數據庫實現的類。
先看看主從相關的幾個類的源碼
package io.shardingsphere.shardingjdbc.jdbc.core.connection; import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter; import io.shardingsphere.shardingjdbc.jdbc.core.datasource.MasterSlaveDataSource; import io.shardingsphere.shardingjdbc.jdbc.core.statement.MasterSlavePreparedStatement; import io.shardingsphere.shardingjdbc.jdbc.core.statement.MasterSlaveStatement; import io.shardingsphere.transaction.api.TransactionType; import lombok.Getter; import javax.sql.DataSource; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.util.Map; /** * Connection that support master-slave. * * @author zhangliang * @author zhaojun */ @Getter public final class MasterSlaveConnection extends AbstractConnectionAdapter { private final MasterSlaveDataSource masterSlaveDataSource; private final Map<String, DataSource> dataSourceMap; public MasterSlaveConnection(final MasterSlaveDataSource masterSlaveDataSource, final Map<String, DataSource> dataSourceMap) { this(masterSlaveDataSource, dataSourceMap, TransactionType.LOCAL); } public MasterSlaveConnection(final MasterSlaveDataSource masterSlaveDataSource, final Map<String, DataSource> dataSourceMap, final TransactionType transactionType) { super(transactionType); this.masterSlaveDataSource = masterSlaveDataSource; this.dataSourceMap = dataSourceMap; } @Override public DatabaseMetaData getMetaData() { return masterSlaveDataSource.getDatabaseMetaData(); } // 創建主從的分表分庫Statement @Override public Statement createStatement() { return new MasterSlaveStatement(this); } // 創建主從的分表分庫Statement @Override public Statement createStatement(final int resultSetType, final int resultSetConcurrency) { return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency); } // 創建主從的分表分庫Statement @Override public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability); } // 創建主從的分表分庫prepareStatement @Override public PreparedStatement prepareStatement(final String sql) throws SQLException { return new MasterSlavePreparedStatement(this, sql); } // 創建主從的分表分庫prepareStatement @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException { return new MasterSlavePreparedStatement(this, sql, resultSetType, resultSetConcurrency); } // 創建主從的分表分庫prepareStatement @Override public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { return new MasterSlavePreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability); } // 創建主從的分表分庫prepareStatement @Override public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException { return new MasterSlavePreparedStatement(this, sql, autoGeneratedKeys); } // 創建主從的分表分庫prepareStatement @Override public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException { return new MasterSlavePreparedStatement(this, sql, columnIndexes); } // 創建主從的分表分庫prepareStatement @Override public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException { return new MasterSlavePreparedStatement(this, sql, columnNames); } }
package io.shardingsphere.shardingjdbc.jdbc.core.datasource; import io.shardingsphere.api.ConfigMapContext; import io.shardingsphere.api.config.rule.MasterSlaveRuleConfiguration; import io.shardingsphere.core.constant.properties.ShardingProperties; import io.shardingsphere.core.rule.MasterSlaveRule; import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractDataSourceAdapter; import io.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection; import io.shardingsphere.transaction.api.TransactionTypeHolder; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import javax.sql.DataSource; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.SQLException; import java.util.Map; import java.util.Properties; /** * Master-slave data source. * * @author zhangliang * @author panjuan * @author zhaojun */ @Getter @Slf4j public class MasterSlaveDataSource extends AbstractDataSourceAdapter { private final DatabaseMetaData databaseMetaData; private final MasterSlaveRule masterSlaveRule; private final ShardingProperties shardingProperties; public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Map<String, Object> configMap, final Properties props) throws SQLException { super(dataSourceMap); databaseMetaData = getDatabaseMetaData(dataSourceMap); if (!configMap.isEmpty()) { ConfigMapContext.getInstance().getConfigMap().putAll(configMap); } this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig); shardingProperties = new ShardingProperties(null == props ? new Properties() : props); } public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRule masterSlaveRule, final Map<String, Object> configMap, final Properties props) throws SQLException { super(dataSourceMap); databaseMetaData = getDatabaseMetaData(dataSourceMap); if (!configMap.isEmpty()) { ConfigMapContext.getInstance().getConfigMap().putAll(configMap); } this.masterSlaveRule = masterSlaveRule; shardingProperties = new ShardingProperties(null == props ? new Properties() : props); } private DatabaseMetaData getDatabaseMetaData(final Map<String, DataSource> dataSourceMap) throws SQLException { try (Connection connection = dataSourceMap.values().iterator().next().getConnection()) { return connection.getMetaData(); } } @Override public final MasterSlaveConnection getConnection() { return new MasterSlaveConnection(this, getShardingTransactionalDataSources().getDataSourceMap(), TransactionTypeHolder.get()); } }
package io.shardingsphere.shardingjdbc.jdbc.core.statement; import com.google.common.base.Preconditions; import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import io.shardingsphere.core.routing.router.masterslave.MasterSlaveRouter; import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractMasterSlavePreparedStatementAdapter; import io.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection; import lombok.AccessLevel; import lombok.Getter; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; import java.util.LinkedList; /** * PreparedStatement that support master-slave. * * @author zhangliang * @author panjuan */ @Getter public final class MasterSlavePreparedStatement extends AbstractMasterSlavePreparedStatementAdapter { private final MasterSlaveConnection connection; @Getter(AccessLevel.NONE) private final MasterSlaveRouter masterSlaveRouter; private final Collection<PreparedStatement> routedStatements = new LinkedList<>(); public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql) throws SQLException { this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); } public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException { this(connection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT); } public MasterSlavePreparedStatement( final MasterSlaveConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException { this.connection = connection; masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)); for (String each : masterSlaveRouter.route(sql)) { PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(preparedStatement); } } public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException { this.connection = connection; masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)); for (String each : masterSlaveRouter.route(sql)) { PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, autoGeneratedKeys); routedStatements.add(preparedStatement); } } public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int[] columnIndexes) throws SQLException { this.connection = connection; masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)); for (String each : masterSlaveRouter.route(sql)) { PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, columnIndexes); routedStatements.add(preparedStatement); } } public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final String[] columnNames) throws SQLException { this.connection = connection; masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)); for (String each : masterSlaveRouter.route(sql)) { PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, columnNames); routedStatements.add(preparedStatement); } } @Override public ResultSet executeQuery() throws SQLException { Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support executeQuery for DDL"); return routedStatements.iterator().next().executeQuery(); } @Override public int executeUpdate() throws SQLException { int result = 0; for (PreparedStatement each : routedStatements) { result += each.executeUpdate(); } return result; } @Override public boolean execute() throws SQLException { boolean result = false; for (PreparedStatement each : routedStatements) { result = each.execute(); } return result; } @Override public void clearBatch() throws SQLException { Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support clearBatch for DDL"); routedStatements.iterator().next().clearBatch(); } @Override public void addBatch() throws SQLException { Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support addBatch for DDL"); routedStatements.iterator().next().addBatch(); } @Override public int[] executeBatch() throws SQLException { Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support executeBatch for DDL"); return routedStatements.iterator().next().executeBatch(); } @Override public ResultSet getResultSet() throws SQLException { Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support getResultSet for DDL"); return routedStatements.iterator().next().getResultSet(); } @Override public ResultSet getGeneratedKeys() throws SQLException { Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support getGeneratedKeys for DDL"); return routedStatements.iterator().next().getGeneratedKeys(); } @Override public int getResultSetHoldability() throws SQLException { return routedStatements.iterator().next().getResultSetHoldability(); } @Override public int getResultSetConcurrency() throws SQLException { return routedStatements.iterator().next().getResultSetConcurrency(); } @Override public int getResultSetType() throws SQLException { return routedStatements.iterator().next().getResultSetType(); } }
package io.shardingsphere.shardingjdbc.jdbc.core.statement; import com.google.common.base.Preconditions; import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import io.shardingsphere.core.routing.router.masterslave.MasterSlaveRouter; import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter; import io.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection; import lombok.AccessLevel; import lombok.Getter; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Collection; import java.util.LinkedList; /** * Statement that support master-slave. * * @author zhangliang * @author panjuan */ @Getter public final class MasterSlaveStatement extends AbstractStatementAdapter { private final MasterSlaveConnection connection; @Getter(AccessLevel.NONE) private final MasterSlaveRouter masterSlaveRouter; private final int resultSetType; private final int resultSetConcurrency; private final int resultSetHoldability; private final Collection<Statement> routedStatements = new LinkedList<>(); public MasterSlaveStatement(final MasterSlaveConnection connection) { this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); } public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency) { this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT); } public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { super(Statement.class); this.connection = connection; masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)); this.resultSetType = resultSetType; this.resultSetConcurrency = resultSetConcurrency; this.resultSetHoldability = resultSetHoldability; } @Override public ResultSet executeQuery(final String sql) throws SQLException { clearPrevious(); Collection<String> dataSourceNames = masterSlaveRouter.route(sql); Preconditions.checkState(1 == dataSourceNames.size(), "Cannot support executeQuery for DML or DDL"); Statement statement = connection.getConnection(dataSourceNames.iterator().next()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); return statement.executeQuery(sql); } @Override public int executeUpdate(final String sql) throws SQLException { clearPrevious(); int result = 0; for (String each : masterSlaveRouter.route(sql)) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result += statement.executeUpdate(sql); } return result; } @Override public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException { clearPrevious(); int result = 0; for (String each : masterSlaveRouter.route(sql)) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result += statement.executeUpdate(sql, autoGeneratedKeys); } return result; } @Override public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException { clearPrevious(); int result = 0; for (String each : masterSlaveRouter.route(sql)) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result += statement.executeUpdate(sql, columnIndexes); } return result; } @Override public int executeUpdate(final String sql, final String[] columnNames) throws SQLException { clearPrevious(); int result = 0; for (String each : masterSlaveRouter.route(sql)) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result += statement.executeUpdate(sql, columnNames); } return result; } @Override public boolean execute(final String sql) throws SQLException { clearPrevious(); boolean result = false; for (String each : masterSlaveRouter.route(sql)) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result = statement.execute(sql); } return result; } @Override public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException { clearPrevious(); boolean result = false; for (String each : masterSlaveRouter.route(sql)) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result = statement.execute(sql, autoGeneratedKeys); } return result; } @Override public boolean execute(final String sql, final int[] columnIndexes) throws SQLException { clearPrevious(); boolean result = false; for (String each : masterSlaveRouter.route(sql)) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result = statement.execute(sql, columnIndexes); } return result; } @Override public boolean execute(final String sql, final String[] columnNames) throws SQLException { clearPrevious(); boolean result = false; for (String each : masterSlaveRouter.route(sql)) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result = statement.execute(sql, columnNames); } return result; } @Override public ResultSet getGeneratedKeys() throws SQLException { Preconditions.checkState(1 == routedStatements.size()); return routedStatements.iterator().next().getGeneratedKeys(); } @Override public ResultSet getResultSet() throws SQLException { Preconditions.checkState(1 == routedStatements.size()); return routedStatements.iterator().next().getResultSet(); } private void clearPrevious() throws SQLException { for (Statement each : routedStatements) { each.close(); } routedStatements.clear(); } }
根據源碼我們可以發現MasterSlaveConnection、MasterSlaveDataSource 只是拼裝數據,MasterSlavePreparedStatement、MasterSlaveStatement和路由方式差不多,我們現在重點分析MasterSlaveStatement
我們現在重點分析下面方法
@Override public boolean execute(final String sql) throws SQLException { clearPrevious(); boolean result = false; for (String each : masterSlaveRouter.route(sql)) { Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); routedStatements.add(statement); result = statement.execute(sql); } return result; }
其中的路由主要通過masterSlaveRouter和Sql進行處理的
package io.shardingsphere.core.routing.router.masterslave; import io.shardingsphere.core.constant.SQLType; import io.shardingsphere.core.hint.HintManagerHolder; import io.shardingsphere.core.parsing.SQLJudgeEngine; import io.shardingsphere.core.rule.MasterSlaveRule; import io.shardingsphere.core.util.SQLLogger; import lombok.RequiredArgsConstructor; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; /** * Master slave router interface. * * @author zhangliang * @author panjuan */ @RequiredArgsConstructor public final class MasterSlaveRouter { private final MasterSlaveRule masterSlaveRule; private final boolean showSQL; /** * Route Master slave. * * @param sql SQL * @return data source names */ // TODO for multiple masters may return more than one data source public Collection<String> route(final String sql) { Collection<String> result = route(new SQLJudgeEngine(sql).judge().getType()); if (showSQL) { SQLLogger.logSQL(sql, result); } return result; } private Collection<String> route(final SQLType sqlType) { if (isMasterRoute(sqlType)) { MasterVisitedManager.setMasterVisited(); return Collections.singletonList(masterSlaveRule.getMasterDataSourceName()); } return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource( masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()))); } private boolean isMasterRoute(final SQLType sqlType) { return SQLType.DQL != sqlType || MasterVisitedManager.isMasterVisited() || HintManagerHolder.isMasterRouteOnly(); } }
我們看下是否是Master路由的方式
package io.shardingsphere.core.constant; /** * SQL Type. * * @author zhangliang */ public enum SQLType { /** * Data Query Language. * * <p>Such as {@code SELECT}.</p> */ DQL, /** * Data Manipulation Language. * * <p>Such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.</p> */ DML, /** * Data Definition Language. * * <p>Such as {@code CREATE}, {@code ALTER}, {@code DROP}, {@code TRUNCATE}.</p> */ DDL, /** * Transaction Control Language. * * <p>Such as {@code SET}, {@code COMMIT}, {@code ROLLBACK}, {@code SAVEPOIINT}, {@code BEGIN}.</p> */ TCL, /** * Database administrator Language. */ DAL, /** * Database control Language. */ DCL }
通過SQLType的類型說明和SQLType.DQL != sqlType 我們知道非查詢的請求都是按Master路由的方式進行路由,
查詢的SQL通過 MasterVisitedManager.isMasterVisited() || HintManagerHolder.isMasterRouteOnly() 這兩個條件決定是否是Master路由,這兩個方法之后在分析。
這代碼也非常清晰,如果是主路由,直接用主庫,如果非主路由,通過配置的LoadBalanceAlgorithm算法進行路由,這就不細說了。
現在重點看看sharding 這塊路由的實現,同理,我們只看ShardingStatement
package io.shardingsphere.shardingjdbc.jdbc.core.statement; import com.google.common.base.Optional; import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant; import io.shardingsphere.core.executor.sql.execute.result.StreamQueryResult; import io.shardingsphere.core.merger.MergeEngine; import io.shardingsphere.core.merger.MergeEngineFactory; import io.shardingsphere.core.merger.QueryResult; import io.shardingsphere.core.parsing.parser.sql.dal.DALStatement; import io.shardingsphere.core.parsing.parser.sql.dml.insert.InsertStatement; import io.shardingsphere.core.parsing.parser.sql.dql.DQLStatement; import io.shardingsphere.core.parsing.parser.sql.dql.select.SelectStatement; import io.shardingsphere.core.routing.SQLRouteResult; import io.shardingsphere.core.routing.StatementRoutingEngine; import io.shardingsphere.core.routing.router.sharding.GeneratedKey; import io.shardingsphere.shardingjdbc.executor.StatementExecutor; import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter; import io.shardingsphere.shardingjdbc.jdbc.core.ShardingContext; import io.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection; import io.shardingsphere.shardingjdbc.jdbc.core.resultset.GeneratedKeysResultSet; import io.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSet; import lombok.Getter; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; import java.util.List; /** * Statement that support sharding. * * @author gaohongtao * @author caohao * @author zhangliang * @author zhaojun * @author panjuan */ public final class ShardingStatement extends AbstractStatementAdapter { @Getter private final ShardingConnection connection; private final StatementExecutor statementExecutor; private boolean returnGeneratedKeys; private SQLRouteResult routeResult; private ResultSet currentResultSet; public ShardingStatement(final ShardingConnection connection) { this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); } public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency) { this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT); } public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) { super(Statement.class); this.connection = connection; statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection); } @Override public ResultSet executeQuery(final String sql) throws SQLException { ResultSet result; try { clearPrevious(); sqlRoute(sql); initStatementExecutor(); MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getDatabaseType(), connection.getShardingContext().getShardingRule(), routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable(), statementExecutor.executeQuery()); result = new ShardingResultSet(statementExecutor.getResultSets(), mergeEngine.merge(), this); } finally { currentResultSet = null; } currentResultSet = result; return result; } @Override public int executeUpdate(final String sql) throws SQLException { try { clearPrevious(); sqlRoute(sql); initStatementExecutor(); return statementExecutor.executeUpdate(); } finally { refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement()); currentResultSet = null; } } @Override public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException { if (RETURN_GENERATED_KEYS == autoGeneratedKeys) { returnGeneratedKeys = true; } try { clearPrevious(); sqlRoute(sql); initStatementExecutor(); return statementExecutor.executeUpdate(autoGeneratedKeys); } finally { currentResultSet = null; } } @Override public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException { returnGeneratedKeys = true; try { clearPrevious(); sqlRoute(sql); initStatementExecutor(); return statementExecutor.executeUpdate(columnIndexes); } finally { currentResultSet = null; } } @Override public int executeUpdate(final String sql, final String[] columnNames) throws SQLException { returnGeneratedKeys = true; try { clearPrevious(); sqlRoute(sql); initStatementExecutor(); return statementExecutor.executeUpdate(columnNames); } finally { currentResultSet = null; } } @Override public boolean execute(final String sql) throws SQLException { try { clearPrevious(); sqlRoute(sql); initStatementExecutor(); return statementExecutor.execute(); } finally { refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement()); currentResultSet = null; } } @Override public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException { if (RETURN_GENERATED_KEYS == autoGeneratedKeys) { returnGeneratedKeys = true; } try { clearPrevious(); sqlRoute(sql); initStatementExecutor(); return statementExecutor.execute(autoGeneratedKeys); } finally { currentResultSet = null; } } @Override public boolean execute(final String sql, final int[] columnIndexes) throws SQLException { returnGeneratedKeys = true; try { clearPrevious(); sqlRoute(sql); initStatementExecutor(); return statementExecutor.execute(columnIndexes); } finally { currentResultSet = null; } } @Override public boolean execute(final String sql, final String[] columnNames) throws SQLException { returnGeneratedKeys = true; try { clearPrevious(); sqlRoute(sql); initStatementExecutor(); return statementExecutor.execute(columnNames); } finally { currentResultSet = null; } } @Override public ResultSet getResultSet() throws SQLException { if (null != currentResultSet) { return currentResultSet; } if (1 == statementExecutor.getStatements().size() && routeResult.getSqlStatement() instanceof DQLStatement) { currentResultSet = statementExecutor.getStatements().iterator().next().getResultSet(); return currentResultSet; } List<ResultSet> resultSets = new ArrayList<>(statementExecutor.getStatements().size()); List<QueryResult> queryResults = new ArrayList<>(statementExecutor.getStatements().size()); for (Statement each : statementExecutor.getStatements()) { ResultSet resultSet = each.getResultSet(); resultSets.add(resultSet); queryResults.add(new StreamQueryResult(resultSet)); } if (routeResult.getSqlStatement() instanceof SelectStatement || routeResult.getSqlStatement() instanceof DALStatement) { MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getDatabaseType(), connection.getShardingContext().getShardingRule(), routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable(), queryResults); currentResultSet = new ShardingResultSet(resultSets, mergeEngine.merge(), this); } return currentResultSet; } private void initStatementExecutor() throws SQLException { statementExecutor.init(routeResult); replayMethodForStatements(); } private void replayMethodForStatements() { for (Statement each : statementExecutor.getStatements()) { replayMethodsInvocation(each); } } private void sqlRoute(final String sql) { ShardingContext shardingContext = connection.getShardingContext(); routeResult = new StatementRoutingEngine(shardingContext.getShardingRule(), shardingContext.getMetaData(), shardingContext.getDatabaseType(), shardingContext.getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)).route(sql); } private void clearPrevious() throws SQLException { statementExecutor.clear(); } @SuppressWarnings("MagicConstant") @Override public int getResultSetType() { return statementExecutor.getResultSetType(); } @SuppressWarnings("MagicConstant") @Override public int getResultSetConcurrency() { return statementExecutor.getResultSetConcurrency(); } @Override public int getResultSetHoldability() { return statementExecutor.getResultSetHoldability(); } @Override public Collection<Statement> getRoutedStatements() { return statementExecutor.getStatements(); } @Override public ResultSet getGeneratedKeys() throws SQLException { Optional<GeneratedKey> generatedKey = getGeneratedKey(); if (returnGeneratedKeys && generatedKey.isPresent()) { return new GeneratedKeysResultSet(routeResult.getGeneratedKey().getGeneratedKeys().iterator(), generatedKey.get().getColumn().getName(), this); } if (1 == getRoutedStatements().size()) { return getRoutedStatements().iterator().next().getGeneratedKeys(); } return new GeneratedKeysResultSet(); } private Optional<GeneratedKey> getGeneratedKey() { if (null != routeResult && routeResult.getSqlStatement() instanceof InsertStatement) { return Optional.fromNullable(routeResult.getGeneratedKey()); } return Optional.absent(); } }
同樣我們只分析
@Override public boolean execute(final String sql) throws SQLException { try { //清理緩存 clearPrevious(); //路由 sqlRoute(sql); //初始化statementExecutor initStatementExecutor(); //執行 return statementExecutor.execute(); } finally { refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement()); currentResultSet = null; } }
clearPrevious 沒必要多解釋,清理上次執行的的數據
private void sqlRoute(final String sql) { ShardingContext shardingContext = connection.getShardingContext(); routeResult = new StatementRoutingEngine(shardingContext.getShardingRule(), shardingContext.getMetaData(), shardingContext.getDatabaseType(), shardingContext.getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)).route(sql); }
public final class StatementRoutingEngine { private final ShardingRouter shardingRouter; private final ShardingMasterSlaveRouter masterSlaveRouter; public StatementRoutingEngine(final ShardingRule shardingRule, final ShardingMetaData shardingMetaData, final DatabaseType databaseType, final boolean showSQL) { shardingRouter = ShardingRouterFactory.newInstance(shardingRule, shardingMetaData, databaseType, showSQL); masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules()); } /** * SQL route. * * @param logicSQL logic SQL * @return route result */ public SQLRouteResult route(final String logicSQL) { SQLStatement sqlStatement = shardingRouter.parse(logicSQL, false); //為什么用shardingRouter還要使用masterSlaveRouter,因為shardingDatasource里面可能包括多個MasterSlaveDataSource return masterSlaveRouter.route(shardingRouter.route(logicSQL, Collections.emptyList(), sqlStatement)); } }
public final class ShardingRouterFactory { /** * Create new instance of sharding router. * * @param shardingRule sharding rule * @param shardingMetaData sharding meta data * @param databaseType database type * @param showSQL show SQL or not * @return sharding router instance */ public static ShardingRouter newInstance(final ShardingRule shardingRule, final ShardingMetaData shardingMetaData, final DatabaseType databaseType, final boolean showSQL) { // 如果只是分庫使用DatabaseHintSQLRouter 路由,否則使用ParsingSQLRouter return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingRule, showSQL) : new ParsingSQLRouter(shardingRule, shardingMetaData, databaseType, showSQL); } }
如果只是DatabaseHintSQLRouter 則通過shardingRule.getDefaultDatabaseShardingStrategy() 路由。
ParsingSQLRouter 路由如下,可以自己去閱讀
```
@Override public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) { Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement ? getGenerateKey(shardingRule, (InsertStatement) sqlStatement, parameters) : Optional.<GeneratedKey>absent(); SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey.orNull()); ShardingConditions shardingConditions = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey.orNull()).optimize(); if (generatedKey.isPresent()) { setGeneratedKeys(result, generatedKey.get()); } if (sqlStatement instanceof SelectStatement && !sqlStatement.getTables().isEmpty() && !((SelectStatement) sqlStatement).getSubQueryConditions().isEmpty()) { mergeShardingValueForSubQuery(sqlStatement.getConditions(), shardingConditions); } RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), sqlStatement, shardingConditions).route(); SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement, shardingConditions, parameters); if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) { processLimit(parameters, (SelectStatement) sqlStatement); } SQLBuilder sqlBuilder = rewriteEngine.rewrite(routingResult.isSingleRouting()); for (TableUnit each : routingResult.getTableUnits().getTableUnits()) { result.getRouteUnits().add(new RouteUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder, shardingMetaData.getDataSource()))); } if (showSQL) { SQLLogger.logSQL(logicSQL, sqlStatement, result.getRouteUnits()); } return result; }

