shardingsphere分析


1、前言

鑒於最近shardingSphere非常火爆,分析分析下其中的實現

2、常見的sharding的策略

實現層面 應用框架 優劣分析
數據訪問層(DAO層) 適合在該層進行自主研發 不受ORM框架的制約、實現起來較為簡單、易於根據系統特點進行靈活的定制、無需SQL解析和路由規則匹配,性能上表現會稍好一些;劣勢在於:有一定的技術門檻,工作量比依靠框架實現要大(反過來看,框架會有學習成本)、不通用,只能在特定系統里工作。當然,在DAO層同樣可以通過XML配置或是注解將sharding邏輯抽離到“外部”,形成一套通用的框架. 不過目前還沒有出現此類的框架
ORM 框架層 Guzz、Hibernate Shards、MybatisSharding  目前的hibernate shards來看,表現還算不上令人滿意,主要是它對使用hibernate的限制過多,比如它對HQL的支持就非常有限,針對Mybatis,可以在攔截器層做數據的分表,通過改寫SqlSession 去實現數據源的路由(該方式的路由只能通過MapperId,MapperNameSpace和參數等幾種方式實現路由數據源,不能通過表路由數據源),但是對simple和batch模式對處理不太友好。 
JDBC API 層  dbShards、sharding-sphere JDBC API層是很多人都會想到的一個實現sharding的絕佳場所,工作量較大,幸好有sharding-sphere幫我們做了這些
DAO與JDBC之間的Spring 數據庫訪問封裝層 CobarClient 或在 該層自主開發 該層和ORM 框架層 差不多,需要根據封裝做業務等出來
應用服務器與數據庫之間的代理層 Mysql Proxy、Amoeba,mycat,sharding-Sphere 該方式對業務來說完全隔離,開發人員不太需要關注分表分庫對事兒,但是開發需要知道有哪些是不能做的,哪些是可以做的,在開發出現問題時,需要相關負責代理層的人協助排查。

 

3、JDBC 連接過程

 

//創建數據源
DataSource dataSource = new com.zaxxer.hikari.HikariDataSource();

// 獲取連接
Connetion conn =dataSource.getConnetion();
// 建立PreparedStatement對象 
Statement stmt=conn.createStatement(); 
//執行SQL查詢
String sql="select * from users"; 
ResultSet rs=stmt.executeQuery(sql);

//建立PreparedStatement對象 
String sql="select * from user where userName=? and password=?"; 
PreparedStatement pstmt=Conn.prepareStatement(sql); 
pstmt.setString(1,"admin"); 
pstmt.setString(2,"liubin"); 
//執行動態SQL查詢 
ResultSet rs=pstmt.executeQuery();

//執行insert update delete等語句,先定義sql 
stmt.executeUpdate(sql);

 

通過上面的代碼可以發現DataSource,Connetion,ResultSet,PreparedStatement 這幾個核心類,於是sharding-sphere重新實現上面幾個接口,實現分表分庫。


4、sharding-sphere實現的JDBC核心類

  • MasterSlaveConnection
  • ShardingConnection
  • MasterSlaveDataSource
  • ShardingDataSource
  • MasterSlavePreparedStatement
  • MasterSlaveStatement
  • ShardingPreparedStatement
  • ShardingStatement

根據名稱我們可以看出
1. MasterSlaveConnection、MasterSlaveDataSource、MasterSlavePreparedStatement、MasterSlaveStatement 根據主從數據庫實現的類
2. ShardingConnection、ShardingDataSource、ShardingPreparedStatement、ShardingStatement 根據多數據庫實現的類。

    先看看主從相關的幾個類的源碼
   

package io.shardingsphere.shardingjdbc.jdbc.core.connection;

import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter;
import io.shardingsphere.shardingjdbc.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingsphere.shardingjdbc.jdbc.core.statement.MasterSlavePreparedStatement;
import io.shardingsphere.shardingjdbc.jdbc.core.statement.MasterSlaveStatement;
import io.shardingsphere.transaction.api.TransactionType;
import lombok.Getter;

import javax.sql.DataSource;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;

/**
* Connection that support master-slave.
* 
* @author zhangliang
* @author zhaojun
*/
@Getter
public final class MasterSlaveConnection extends AbstractConnectionAdapter {

private final MasterSlaveDataSource masterSlaveDataSource;

private final Map<String, DataSource> dataSourceMap;

public MasterSlaveConnection(final MasterSlaveDataSource masterSlaveDataSource, final Map<String, DataSource> dataSourceMap) {
this(masterSlaveDataSource, dataSourceMap, TransactionType.LOCAL);
}

public MasterSlaveConnection(final MasterSlaveDataSource masterSlaveDataSource, final Map<String, DataSource> dataSourceMap, final TransactionType transactionType) {
super(transactionType);
this.masterSlaveDataSource = masterSlaveDataSource;
this.dataSourceMap = dataSourceMap;
}

@Override
public DatabaseMetaData getMetaData() {
return masterSlaveDataSource.getDatabaseMetaData();
}

// 創建主從的分表分庫Statement
@Override
public Statement createStatement() {
return new MasterSlaveStatement(this);
}

// 創建主從的分表分庫Statement
@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency);
}

// 創建主從的分表分庫Statement
@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);
}

// 創建主從的分表分庫prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql) throws SQLException {
return new MasterSlavePreparedStatement(this, sql);
}

// 創建主從的分表分庫prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
return new MasterSlavePreparedStatement(this, sql, resultSetType, resultSetConcurrency);
}

// 創建主從的分表分庫prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
return new MasterSlavePreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability);
}

// 創建主從的分表分庫prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
return new MasterSlavePreparedStatement(this, sql, autoGeneratedKeys);
}

// 創建主從的分表分庫prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException {
return new MasterSlavePreparedStatement(this, sql, columnIndexes);
}

// 創建主從的分表分庫prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {
return new MasterSlavePreparedStatement(this, sql, columnNames);
}
}

 

package io.shardingsphere.shardingjdbc.jdbc.core.datasource;

import io.shardingsphere.api.ConfigMapContext;
import io.shardingsphere.api.config.rule.MasterSlaveRuleConfiguration;
import io.shardingsphere.core.constant.properties.ShardingProperties;
import io.shardingsphere.core.rule.MasterSlaveRule;
import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractDataSourceAdapter;
import io.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;

/**
* Master-slave data source.
*
* @author zhangliang
* @author panjuan
* @author zhaojun
*/
@Getter
@Slf4j
public class MasterSlaveDataSource extends AbstractDataSourceAdapter {

private final DatabaseMetaData databaseMetaData;

private final MasterSlaveRule masterSlaveRule;

private final ShardingProperties shardingProperties;

public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig,
final Map<String, Object> configMap, final Properties props) throws SQLException {
super(dataSourceMap);
databaseMetaData = getDatabaseMetaData(dataSourceMap);
if (!configMap.isEmpty()) {
ConfigMapContext.getInstance().getConfigMap().putAll(configMap);
}
this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig);
shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
}

public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRule masterSlaveRule, final Map<String, Object> configMap, final Properties props) throws SQLException {
super(dataSourceMap);
databaseMetaData = getDatabaseMetaData(dataSourceMap);
if (!configMap.isEmpty()) {
ConfigMapContext.getInstance().getConfigMap().putAll(configMap);
}
this.masterSlaveRule = masterSlaveRule;
shardingProperties = new ShardingProperties(null == props ? new Properties() : props);
}

private DatabaseMetaData getDatabaseMetaData(final Map<String, DataSource> dataSourceMap) throws SQLException {
try (Connection connection = dataSourceMap.values().iterator().next().getConnection()) {
return connection.getMetaData();
}
}

@Override
public final MasterSlaveConnection getConnection() {
return new MasterSlaveConnection(this, getShardingTransactionalDataSources().getDataSourceMap(), TransactionTypeHolder.get());
}
}

 

package io.shardingsphere.shardingjdbc.jdbc.core.statement;

import com.google.common.base.Preconditions;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.routing.router.masterslave.MasterSlaveRouter;
import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractMasterSlavePreparedStatementAdapter;
import io.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection;
import lombok.AccessLevel;
import lombok.Getter;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;

/**
* PreparedStatement that support master-slave.
* 
* @author zhangliang
* @author panjuan
*/
@Getter
public final class MasterSlavePreparedStatement extends AbstractMasterSlavePreparedStatementAdapter {

private final MasterSlaveConnection connection;

@Getter(AccessLevel.NONE)
private final MasterSlaveRouter masterSlaveRouter;

private final Collection<PreparedStatement> routedStatements = new LinkedList<>();

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
this(connection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public MasterSlavePreparedStatement(
final MasterSlaveConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(),
connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
for (String each : masterSlaveRouter.route(sql)) {
PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(preparedStatement);
}
}

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(),
connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
for (String each : masterSlaveRouter.route(sql)) {
PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, autoGeneratedKeys);
routedStatements.add(preparedStatement);
}
}

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int[] columnIndexes) throws SQLException {
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(),
connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
for (String each : masterSlaveRouter.route(sql)) {
PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, columnIndexes);
routedStatements.add(preparedStatement);
}
}

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final String[] columnNames) throws SQLException {
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(),
connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
for (String each : masterSlaveRouter.route(sql)) {
PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, columnNames);
routedStatements.add(preparedStatement);
}
}

@Override
public ResultSet executeQuery() throws SQLException {
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support executeQuery for DDL");
return routedStatements.iterator().next().executeQuery();
}

@Override
public int executeUpdate() throws SQLException {
int result = 0;
for (PreparedStatement each : routedStatements) {
result += each.executeUpdate();
}
return result;
}

@Override
public boolean execute() throws SQLException {
boolean result = false;
for (PreparedStatement each : routedStatements) {
result = each.execute();
}
return result;
}

@Override
public void clearBatch() throws SQLException {
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support clearBatch for DDL");
routedStatements.iterator().next().clearBatch();
}

@Override
public void addBatch() throws SQLException {
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support addBatch for DDL");
routedStatements.iterator().next().addBatch();
}

@Override
public int[] executeBatch() throws SQLException {
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support executeBatch for DDL");
return routedStatements.iterator().next().executeBatch();
}

@Override
public ResultSet getResultSet() throws SQLException {
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support getResultSet for DDL");
return routedStatements.iterator().next().getResultSet();
}

@Override
public ResultSet getGeneratedKeys() throws SQLException {
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support getGeneratedKeys for DDL");
return routedStatements.iterator().next().getGeneratedKeys();
}

@Override
public int getResultSetHoldability() throws SQLException {
return routedStatements.iterator().next().getResultSetHoldability();
}

@Override
public int getResultSetConcurrency() throws SQLException {
return routedStatements.iterator().next().getResultSetConcurrency();
}

@Override
public int getResultSetType() throws SQLException {
return routedStatements.iterator().next().getResultSetType();
}
}

 

package io.shardingsphere.shardingjdbc.jdbc.core.statement;

import com.google.common.base.Preconditions;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.routing.router.masterslave.MasterSlaveRouter;
import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter;
import io.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection;
import lombok.AccessLevel;
import lombok.Getter;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.LinkedList;

/**
* Statement that support master-slave.
* 
* @author zhangliang
* @author panjuan
*/
@Getter
public final class MasterSlaveStatement extends AbstractStatementAdapter {

private final MasterSlaveConnection connection;

@Getter(AccessLevel.NONE)
private final MasterSlaveRouter masterSlaveRouter;

private final int resultSetType;

private final int resultSetConcurrency;

private final int resultSetHoldability;

private final Collection<Statement> routedStatements = new LinkedList<>();

public MasterSlaveStatement(final MasterSlaveConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency) {
this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(),
connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;
}

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
clearPrevious();
Collection<String> dataSourceNames = masterSlaveRouter.route(sql);
Preconditions.checkState(1 == dataSourceNames.size(), "Cannot support executeQuery for DML or DDL");
Statement statement = connection.getConnection(dataSourceNames.iterator().next()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
return statement.executeQuery(sql);
}

@Override
public int executeUpdate(final String sql) throws SQLException {
clearPrevious();
int result = 0;
for (String each : masterSlaveRouter.route(sql)) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql);
}
return result;
}

@Override
public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
clearPrevious();
int result = 0;
for (String each : masterSlaveRouter.route(sql)) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, autoGeneratedKeys);
}
return result;
}

@Override
public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException {
clearPrevious();
int result = 0;
for (String each : masterSlaveRouter.route(sql)) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, columnIndexes);
}
return result;
}

@Override
public int executeUpdate(final String sql, final String[] columnNames) throws SQLException {
clearPrevious();
int result = 0;
for (String each : masterSlaveRouter.route(sql)) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, columnNames);
}
return result;
}

@Override
public boolean execute(final String sql) throws SQLException {
clearPrevious();
boolean result = false;
for (String each : masterSlaveRouter.route(sql)) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql);
}
return result;
}

@Override
public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException {
clearPrevious();
boolean result = false;
for (String each : masterSlaveRouter.route(sql)) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, autoGeneratedKeys);
}
return result;
}

@Override
public boolean execute(final String sql, final int[] columnIndexes) throws SQLException {
clearPrevious();
boolean result = false;
for (String each : masterSlaveRouter.route(sql)) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, columnIndexes);
}
return result;
}

@Override
public boolean execute(final String sql, final String[] columnNames) throws SQLException {
clearPrevious();
boolean result = false;
for (String each : masterSlaveRouter.route(sql)) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, columnNames);
}
return result;
}

@Override
public ResultSet getGeneratedKeys() throws SQLException {
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getGeneratedKeys();
}

@Override
public ResultSet getResultSet() throws SQLException {
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getResultSet();
}

private void clearPrevious() throws SQLException {
for (Statement each : routedStatements) {
each.close();
}
routedStatements.clear();
}
}

 

根據源碼我們可以發現MasterSlaveConnection、MasterSlaveDataSource 只是拼裝數據,MasterSlavePreparedStatement、MasterSlaveStatement和路由方式差不多,我們現在重點分析MasterSlaveStatement

我們現在重點分析下面方法

@Override
public boolean execute(final String sql) throws SQLException {
clearPrevious();
boolean result = false;
for (String each : masterSlaveRouter.route(sql)) {
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql);
}
return result;
}

其中的路由主要通過masterSlaveRouter和Sql進行處理的

 

package io.shardingsphere.core.routing.router.masterslave;

import io.shardingsphere.core.constant.SQLType;
import io.shardingsphere.core.hint.HintManagerHolder;
import io.shardingsphere.core.parsing.SQLJudgeEngine;
import io.shardingsphere.core.rule.MasterSlaveRule;
import io.shardingsphere.core.util.SQLLogger;
import lombok.RequiredArgsConstructor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;

/**
* Master slave router interface.
* 
* @author zhangliang
* @author panjuan
*/
@RequiredArgsConstructor
public final class MasterSlaveRouter {

private final MasterSlaveRule masterSlaveRule;

private final boolean showSQL;

/**
* Route Master slave.
*
* @param sql SQL
* @return data source names
*/
// TODO for multiple masters may return more than one data source
public Collection<String> route(final String sql) {
Collection<String> result = route(new SQLJudgeEngine(sql).judge().getType());
if (showSQL) {
SQLLogger.logSQL(sql, result);
}
return result;
}

private Collection<String> route(final SQLType sqlType) {
if (isMasterRoute(sqlType)) {
MasterVisitedManager.setMasterVisited();
return Collections.singletonList(masterSlaveRule.getMasterDataSourceName());
}
return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())));
}

private boolean isMasterRoute(final SQLType sqlType) {
return SQLType.DQL != sqlType || MasterVisitedManager.isMasterVisited() || HintManagerHolder.isMasterRouteOnly();
}
}

 

我們看下是否是Master路由的方式

 

package io.shardingsphere.core.constant;

/**
* SQL Type.
* 
* @author zhangliang
*/
public enum SQLType {

/**
* Data Query Language.
* 
* <p>Such as {@code SELECT}.</p>
*/
DQL,

/**
* Data Manipulation Language.
*
* <p>Such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.</p>
*/
DML,

/**
* Data Definition Language.
*
* <p>Such as {@code CREATE}, {@code ALTER}, {@code DROP}, {@code TRUNCATE}.</p>
*/
DDL,

/**
* Transaction Control Language.
*
* <p>Such as {@code SET}, {@code COMMIT}, {@code ROLLBACK}, {@code SAVEPOIINT}, {@code BEGIN}.</p>
*/
TCL,

/**
* Database administrator Language.
*/
DAL,

/**
* Database control Language.
*/
DCL
}

 

通過SQLType的類型說明和SQLType.DQL != sqlType 我們知道非查詢的請求都是按Master路由的方式進行路由,
查詢的SQL通過 MasterVisitedManager.isMasterVisited() || HintManagerHolder.isMasterRouteOnly() 這兩個條件決定是否是Master路由,這兩個方法之后在分析。

 


這代碼也非常清晰,如果是主路由,直接用主庫,如果非主路由,通過配置的LoadBalanceAlgorithm算法進行路由,這就不細說了。


現在重點看看sharding 這塊路由的實現,同理,我們只看ShardingStatement



package io.shardingsphere.shardingjdbc.jdbc.core.statement;

import com.google.common.base.Optional;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.executor.sql.execute.result.StreamQueryResult;
import io.shardingsphere.core.merger.MergeEngine;
import io.shardingsphere.core.merger.MergeEngineFactory;
import io.shardingsphere.core.merger.QueryResult;
import io.shardingsphere.core.parsing.parser.sql.dal.DALStatement;
import io.shardingsphere.core.parsing.parser.sql.dml.insert.InsertStatement;
import io.shardingsphere.core.parsing.parser.sql.dql.DQLStatement;
import io.shardingsphere.core.parsing.parser.sql.dql.select.SelectStatement;
import io.shardingsphere.core.routing.SQLRouteResult;
import io.shardingsphere.core.routing.StatementRoutingEngine;
import io.shardingsphere.core.routing.router.sharding.GeneratedKey;
import io.shardingsphere.shardingjdbc.executor.StatementExecutor;
import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter;
import io.shardingsphere.shardingjdbc.jdbc.core.ShardingContext;
import io.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.shardingjdbc.jdbc.core.resultset.GeneratedKeysResultSet;
import io.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSet;
import lombok.Getter;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
* Statement that support sharding.
*
* @author gaohongtao
* @author caohao
* @author zhangliang
* @author zhaojun
* @author panjuan
*/
public final class ShardingStatement extends AbstractStatementAdapter {

@Getter
private final ShardingConnection connection;

private final StatementExecutor statementExecutor;

private boolean returnGeneratedKeys;

private SQLRouteResult routeResult;

private ResultSet currentResultSet;

public ShardingStatement(final ShardingConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency) {
this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.connection = connection;
statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection);
}

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
ResultSet result;
try {
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getDatabaseType(), connection.getShardingContext().getShardingRule(), 
routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable(), statementExecutor.executeQuery());
result = new ShardingResultSet(statementExecutor.getResultSets(), mergeEngine.merge(), this);
} finally {
currentResultSet = null;
}
currentResultSet = result;
return result;
}

@Override
public int executeUpdate(final String sql) throws SQLException {
try {
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.executeUpdate();
} finally {
refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
currentResultSet = null;
}
}

@Override
public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
returnGeneratedKeys = true;
}
try {
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.executeUpdate(autoGeneratedKeys);
} finally {
currentResultSet = null;
}
}

@Override
public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException {
returnGeneratedKeys = true;
try {
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.executeUpdate(columnIndexes);
} finally {
currentResultSet = null;
}
}

@Override
public int executeUpdate(final String sql, final String[] columnNames) throws SQLException {
returnGeneratedKeys = true;
try {
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.executeUpdate(columnNames);
} finally {
currentResultSet = null;
}
}

@Override
public boolean execute(final String sql) throws SQLException {
try {
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.execute();
} finally {
refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
currentResultSet = null;
}
}

@Override
public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException {
if (RETURN_GENERATED_KEYS == autoGeneratedKeys) {
returnGeneratedKeys = true;
}
try {
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.execute(autoGeneratedKeys);
} finally {
currentResultSet = null;
}
}

@Override
public boolean execute(final String sql, final int[] columnIndexes) throws SQLException {
returnGeneratedKeys = true;
try {
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.execute(columnIndexes);
} finally {
currentResultSet = null;
}
}

@Override
public boolean execute(final String sql, final String[] columnNames) throws SQLException {
returnGeneratedKeys = true;
try {
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.execute(columnNames);
} finally {
currentResultSet = null;
}
}

@Override
public ResultSet getResultSet() throws SQLException {
if (null != currentResultSet) {
return currentResultSet;
}
if (1 == statementExecutor.getStatements().size() && routeResult.getSqlStatement() instanceof DQLStatement) {
currentResultSet = statementExecutor.getStatements().iterator().next().getResultSet();
return currentResultSet;
}
List<ResultSet> resultSets = new ArrayList<>(statementExecutor.getStatements().size());
List<QueryResult> queryResults = new ArrayList<>(statementExecutor.getStatements().size());
for (Statement each : statementExecutor.getStatements()) {
ResultSet resultSet = each.getResultSet();
resultSets.add(resultSet);
queryResults.add(new StreamQueryResult(resultSet));
}
if (routeResult.getSqlStatement() instanceof SelectStatement || routeResult.getSqlStatement() instanceof DALStatement) {
MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getDatabaseType(), 
connection.getShardingContext().getShardingRule(), routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable(), queryResults);
currentResultSet = new ShardingResultSet(resultSets, mergeEngine.merge(), this);
}
return currentResultSet;
}

private void initStatementExecutor() throws SQLException {
statementExecutor.init(routeResult);
replayMethodForStatements();
}

private void replayMethodForStatements() {
for (Statement each : statementExecutor.getStatements()) {
replayMethodsInvocation(each);
}
}

private void sqlRoute(final String sql) {
ShardingContext shardingContext = connection.getShardingContext();
routeResult = new StatementRoutingEngine(shardingContext.getShardingRule(), 
shardingContext.getMetaData(), shardingContext.getDatabaseType(), shardingContext.getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)).route(sql);
}

private void clearPrevious() throws SQLException {
statementExecutor.clear();
}

@SuppressWarnings("MagicConstant")
@Override
public int getResultSetType() {
return statementExecutor.getResultSetType();
}

@SuppressWarnings("MagicConstant")
@Override
public int getResultSetConcurrency() {
return statementExecutor.getResultSetConcurrency();
}

@Override
public int getResultSetHoldability() {
return statementExecutor.getResultSetHoldability();
}

@Override
public Collection<Statement> getRoutedStatements() {
return statementExecutor.getStatements();
}

@Override
public ResultSet getGeneratedKeys() throws SQLException {
Optional<GeneratedKey> generatedKey = getGeneratedKey();
if (returnGeneratedKeys && generatedKey.isPresent()) {
return new GeneratedKeysResultSet(routeResult.getGeneratedKey().getGeneratedKeys().iterator(), generatedKey.get().getColumn().getName(), this);
}
if (1 == getRoutedStatements().size()) {
return getRoutedStatements().iterator().next().getGeneratedKeys();
}
return new GeneratedKeysResultSet();
}

private Optional<GeneratedKey> getGeneratedKey() {
if (null != routeResult && routeResult.getSqlStatement() instanceof InsertStatement) {
return Optional.fromNullable(routeResult.getGeneratedKey());
}
return Optional.absent();
}
}

 

 

同樣我們只分析

@Override
public boolean execute(final String sql) throws SQLException {
try {
//清理緩存
clearPrevious();
//路由
sqlRoute(sql);
//初始化statementExecutor
initStatementExecutor();
//執行
return statementExecutor.execute();
} finally {
refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
currentResultSet = null;
}
}

 

clearPrevious 沒必要多解釋,清理上次執行的的數據

private void sqlRoute(final String sql) {
ShardingContext shardingContext = connection.getShardingContext();
routeResult = new StatementRoutingEngine(shardingContext.getShardingRule(), 
shardingContext.getMetaData(), shardingContext.getDatabaseType(), shardingContext.getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)).route(sql);
}

 

public final class StatementRoutingEngine {

private final ShardingRouter shardingRouter;

private final ShardingMasterSlaveRouter masterSlaveRouter;

public StatementRoutingEngine(final ShardingRule shardingRule, final ShardingMetaData shardingMetaData, final DatabaseType databaseType, final boolean showSQL) {
shardingRouter = ShardingRouterFactory.newInstance(shardingRule, shardingMetaData, databaseType, showSQL);
masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());
}

/**
* SQL route.
*
* @param logicSQL logic SQL
* @return route result
*/
public SQLRouteResult route(final String logicSQL) {
SQLStatement sqlStatement = shardingRouter.parse(logicSQL, false);
//為什么用shardingRouter還要使用masterSlaveRouter,因為shardingDatasource里面可能包括多個MasterSlaveDataSource
return masterSlaveRouter.route(shardingRouter.route(logicSQL, Collections.emptyList(), sqlStatement));
}
}

 

public final class ShardingRouterFactory {

/**
* Create new instance of sharding router.
* 
* @param shardingRule sharding rule
* @param shardingMetaData sharding meta data
* @param databaseType database type
* @param showSQL show SQL or not
* @return sharding router instance
*/
public static ShardingRouter newInstance(final ShardingRule shardingRule, final ShardingMetaData shardingMetaData, final DatabaseType databaseType, final boolean showSQL) {
// 如果只是分庫使用DatabaseHintSQLRouter 路由,否則使用ParsingSQLRouter
return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingRule, showSQL) : new ParsingSQLRouter(shardingRule, shardingMetaData, databaseType, showSQL);
}
}

 

如果只是DatabaseHintSQLRouter 則通過shardingRule.getDefaultDatabaseShardingStrategy() 路由。



ParsingSQLRouter 路由如下,可以自己去閱讀
```

@Override
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement ? getGenerateKey(shardingRule, (InsertStatement) sqlStatement, parameters) : Optional.<GeneratedKey>absent();
SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey.orNull());
ShardingConditions shardingConditions = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey.orNull()).optimize();
if (generatedKey.isPresent()) {
setGeneratedKeys(result, generatedKey.get());
}
if (sqlStatement instanceof SelectStatement && !sqlStatement.getTables().isEmpty() && !((SelectStatement) sqlStatement).getSubQueryConditions().isEmpty()) {
mergeShardingValueForSubQuery(sqlStatement.getConditions(), shardingConditions);
}
RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), sqlStatement, shardingConditions).route();
SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement, shardingConditions, parameters);
if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) {
processLimit(parameters, (SelectStatement) sqlStatement);
}
SQLBuilder sqlBuilder = rewriteEngine.rewrite(routingResult.isSingleRouting());
for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
result.getRouteUnits().add(new RouteUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder, shardingMetaData.getDataSource())));
}
if (showSQL) {
SQLLogger.logSQL(logicSQL, sqlStatement, result.getRouteUnits());
}
return result;
}




 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM