Sharding-Proxy 源碼分析


Sharding-Proxy 源碼分析

在看 Sharding-Proxy 源碼之前,強烈建議先閱讀一直官網的兩篇文章:

  1. Apache Sharding-Proxy 使用手冊
  2. Apache Sharding-Proxy 設計原理

Sharding-Proxy 包結構

sharding-proxy
    ├── sharding-proxy-backend		負責與底層mysql通信
    ├── sharding-proxy-bootstrap	啟動sharding-proxy
    ├── sharding-proxy-common		yaml配置文件加載...
    ├── sharding-proxy-frontend		啟動socket,代理mysql/pg
    │       ├── sharding-proxy-frontend-core		啟動sokcet
    │       ├── sharding-proxy-frontend-mysql		實現類MySQLProtocolFrontendEngine
    │       ├── sharding-proxy-frontend-postgresql	實現類PostgreSQLProtocolFrontendEngine
    │       └── sharding-proxy-frontend-spi     	核心spi,DatabaseProtocolFrontendEngine
    └── sharding-proxy-transport	代理數據庫對應的編解碼
            ├── sharding-proxy-transport-core		核心api,DatabasePacket和PacketPayload
            ├── sharding-proxy-transport-mysql		MySQL協議編解碼
            └── sharding-proxy-transport-postgresql	pg協議編解碼

總結: Sharding-Proxy 包功能說明

  • sharding-proxy-bootstrap:啟動入口,調用 LogicSchemas 加載配置,ShardingProxy 啟動程序,綁定 socket。

  • sharding-proxy-frontend-core:啟動netty,hander 的初始化類為 ServerHandlerInitializer,編解碼對應的 Handler 為 PacketCodec,業務處理對應的 Handler 為 FrontendChannelInboundHandler。這兩個 Handler 實際的工作都委托給了 DatabaseProtocolFrontendEngine。

  • sharding-proxy-frontend-spi:核心 spi,DatabaseProtocolFrontendEngine 包含編解碼,執行器。 DatabaseProtocolFrontendEngine 目前有 MySQL 和 PG 兩個實現。

  • sharding-proxy-frontend-mysql:實現類 MySQLProtocolFrontendEngine

  • sharding-proxy-transport-mysql:mysql 報文解析,主要接口為 MySQLPacket。

Sharding-Proxy 啟動流程

Sharding-Proxy 啟動流程

總結: Sharding-Proxy 啟動流程最核心的是通過 ServerHandlerInitializer 加載了 PacketCodec(編解碼) 和 FrontendChannelInboundHandler(業務處理器) 兩個處理器。這兩個處理的具體工作都委托給了 DatabaseProtocolFrontendEngine 完成,有 MySQL 和 PostgreSQL 兩個實現。

Bootstrap

啟動入口位於 sharding-proxy-bootstrap 工程中。Bootstrap 提供了有注冊中心和無注冊中心兩種啟動方式,以無注冊中心的啟動方式為例:

private static void startWithoutRegistryCenter(
        final Map<String, YamlProxyRuleConfiguration> ruleConfigs,
        final YamlAuthenticationConfiguration authentication,
        final Properties prop, final int port) throws SQLException {
    Authentication authenticationConfiguration = getAuthentication(authentication);
    ShardingProxyContext.getInstance().init(authenticationConfiguration, prop);
    // 加載配置規則
    LogicSchemas.getInstance().init(
        getDataSourceParameterMap(ruleConfigs),
        getRuleConfiguration(ruleConfigs));
    initOpenTracing();
    // 啟動 sharding-proxy
    ShardingProxy.getInstance().start(port);
}

Bootstrap 啟動最核心的步驟是 ShardingProxy 啟動代理。Sharding-Proxy 會啟動一個 Netty 服務器,默認端口為 3307。

ShardingProxy

程序啟動入口位於 sharding-proxy-frontend-core 工程中。Netty 服務器通過 ServerHandlerInitializer 加載對應的 Handler,包括 PacketCodec(編解碼) 和 FrontendChannelInboundHandler(業務處理器) 兩個處理器。

DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine =
                DatabaseProtocolFrontendEngineFactory.newInstance(
                        LogicSchemas.getInstance().getDatabaseType());
pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
pipeline.addLast(new FrontendChannelInboundHandler(databaseProtocolFrontendEngine));

總結: 這兩個 Handler 的實際工作都是委托給 DatabaseProtocolFrontendEngine 完成,目前有 MySQL 和 PG 兩個實現。

DatabaseProtocolFrontendEngine

位於 sharding-proxy-frontend-spi工程中。DatabaseProtocolFrontendEngine 是一個 SPI 接口,目前提供了 MySQL 和 PostgreSQL 兩種實現,分別位於 sharding-proxy-frontend-mysql 和 sharding-proxy-frontend-postgresql 工程中。

public interface DatabaseProtocolFrontendEngine extends DatabaseTypeAwareSPI {
    FrontendContext getFrontendContext();
    AuthenticationEngine getAuthEngine();
    void release(BackendConnection backendConnection);
    
    // 編解碼器
    DatabasePacketCodecEngine getCodecEngine();
    // SQL執行引擎
    CommandExecuteEngine getCommandExecuteEngine();
}

總結: DatabaseProtocolFrontendEngine 方法最重要的兩個屬性是 DatabasePacketCodecEngine 解碼器和 CommandExecuteEngine SQL執行引擎。

Sharding-Proxy 請求接入

Sharding-Proxy 消息處理

總結: Sharding-Proxy 接收到消息后處理過程有如下幾步:

  1. PacketCodec:將從 client 接收的請求按長度解碼成 ByteBuf,實際由解碼器 DatabasePacketCodecEngine#decode 完成。
  2. FrontendChannelInboundHandler:將請求 ByteBuf 交給 CommandExecutorTask 處理。
  3. CommandExecutorTask :消息處理核心類。
    • 第一步:調用 DatabasePacketCodecEngine#createPacketPayload 將消息包裝成 PacketPayload。
    • 第二步:調用 CommandExecuteEngine.getCommandPacketType 將消息解碼成具體的 CommandPacket。
    • 第三步:調用 CommandExecutor#getCommandExecutor 方法,根據消息類別獲取不同的執行器。
    • 第四步:調用 CommandExecutor#execute 執行任務。
    • 第五步:將處理后的結果偽裝成 MySQL 服務器的協議,返回給 client。

FrontendChannelInboundHandler

消息處理的入口 FrontendChannelInboundHandler 位於 sharding-proxy-frontend-core 工程中。Sharding-Proxy 接收到請求后,先由 PacketCodec 按長度解碼,然后由 FrontendChannelInboundHandler 進行處理,代碼如下:

@Override
public void channelRead(final ChannelHandlerContext context, final Object message) {
    if (!authorized) {
        authorized = auth(context, (ByteBuf) message);
        return;
    }
    // CommandExecutorSelector 返回 ExecutorService,任務執行 CommandExecutorTask
    CommandExecutorSelector.getExecutor(
        databaseProtocolFrontendEngine.getFrontendContext()
            .isOccupyThreadForPerConnection(),
        backendConnection.isSupportHint(), 
        backendConnection.getTransactionType(),
        context.channel().id())
        .execute(new CommandExecutorTask(databaseProtocolFrontendEngine,                                      backendConnection, context, message));
}

CommandExecutorTask

// 核心api,處理編解碼,sql執行
private final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine;
// 管理后台 MySQL 連接
private final BackendConnection backendConnection;
// 按長度解碼后的client請求信息,ByteBuf
private final Object message;

@Override
public void run() {
    // 按包長度解碼成 ByteBuf,client
    PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine()
        .createPacketPayload((ByteBuf) message));
    // 將 ByteBuf 解析成具體的命令,並轉發到 backendConnection,響應 client
    isNeedFlush = executeCommand(context, payload, backendConnection);
}

private boolean executeCommand(final ChannelHandlerContext context,
        final PacketPayload payload, final BackendConnection backendConnection)
        throws SQLException {
    // 執行引擎
    CommandExecuteEngine commandExecuteEngine = databaseProtocolFrontendEngine
        .getCommandExecuteEngine();
    // mysql命令類型
    CommandPacketType type = commandExecuteEngine.getCommandPacketType(payload);
    // 解碼
    CommandPacket commandPacket = commandExecuteEngine.getCommandPacket(
        payload, type, backendConnection);
    // 執行器
    CommandExecutor commandExecutor = commandExecuteEngine.getCommandExecutor(
        type, commandPacket, backendConnection);
    // 向真實 mysql 服務器發送 sql,並返回結果 responsePackets
    Collection<DatabasePacket> responsePackets = commandExecutor.execute();
    if (responsePackets.isEmpty()) {
        return false;
    }
    // 將返回結果偽裝成 mysql 協議,返回給客戶端 client
    for (DatabasePacket each : responsePackets) {
        context.write(each);
    }
    if (commandExecutor instanceof QueryCommandExecutor) {
        commandExecuteEngine.writeQueryData(context, backendConnection,
            (QueryCommandExecutor) commandExecutor, responsePackets.size());
        return true;
    }
    return databaseProtocolFrontendEngine.getFrontendContext()
        .isFlushForPerCommandPacket();
}

總結: CommandExecutorTask 內部很多工作都委托給了 CommandExecuteEngine 完成,CommandExecuteEngine 也有 MySQL 和 PostgreSQL 兩個實現。CommandExecuteEngine 主要是對具體的協議解碼 CommandPacket,並獲取具體的執行器 CommandExecutor。

MySQL 報文解析器

位於 sharding-proxy-transport-mysql 工程中。

  • MySQLPacketCodecEngine:實現 DatabasePacketCodecEngine 接口,根據包長度解析報文,並將解析的 ByteBuf 包裝成 MySQLPacketPayload。

  • MySQLPacketPayload:實現 PacketPayload 接口,本質是對 ByteBuf 的包裝,提供對 ByteBuf 的 read/write 字段。

  • MySQLCommandPacketFactory:將 MySQLPacketPayload 解析成具體協議的報文 MySQLPacket。

  • MySQLPacket:實現了 DatabasePacket 接口。將 ByteBuf 解析成具體的命令,主要分兩大類:

    • 一是 Statement,代表實現是 MySQLComQueryPacket,
    • 二是 PrepareStatement,代表實現是 MySQLComStmtExecutePacket。
MySQL 協議解析

MySQL 執行器

位於 sharding-proxy-frontend-mysql 工程中。Sharding-Sphere 將客戶端發送的 SQL 命令解析后,轉發給底層的 MySQL 服務器,核心的接口類如下:

MySQL 執行器
  • CommandExecutor:核心接口,SQL 執行器。Sharding-Proxy 解析 client 的命令,轉發給 MySQL 服務器,並將 MySQL 服務器返回的結果按 MySQL 協議包偽裝后響應給 client。
  • MySQLCommandExecutorFactory:根據請求的類型不同(eg: COM_QUERY, COM_STMT_EXECUTE),初始化不同的執行器,主要分為兩類:
    • 一是 Statement,不使用預解析功能,代表實現是 MySQLComQueryPacketExecutor,最終調用 TextProtocolBackendHandler 執行。
    • 二是 PrepareStatement,使用預解析功能,代表實現是 MySQLComStmtExecuteExecutor,最終調用 DatabaseCommunicationEngine 執行。

注意: 並不是所有的 client 請求都轉發到 mysql 服務器上了。如 MySQL 預解析操作分為 prepare、execute、close、reset 四步,分別對應 MySQLComStmtPrepareExecutor、MySQLComStmtExecuteExecutor、MySQLComStmtCloseExecutor、MySQLComStmtResetExecutor 四個類。除了 execute 會將請求轉發給底層 mysql 服務器外,其它的解析是在代理層(sharding-proxy)完成的,將 SQLParseEngine 解析后結果緩存在 MySQLBinaryStatementRegistry 實例中,這樣能避免重復解析 SQL 提高性能。

MySQLComQueryPacketExecutor 執行流程

public MySQLComQueryPacketExecutor(final MySQLComQueryPacket comQueryPacket,
                                   final BackendConnection backendConnection) {
    // 包含 SQL 和 connection,textProtocolBackendHandler 可以執行 SQL
    textProtocolBackendHandler = TextProtocolBackendHandlerFactory.newInstance(
        DatabaseTypes.getActualDatabaseType("MySQL"),
        comQueryPacket.getSql(), backendConnection);
}

@Override
public Collection<DatabasePacket> execute() {
    ...
    // 委托給 textProtocolBackendHandler 執行
    BackendResponse backendResponse = textProtocolBackendHandler.execute();
    // 包裝返回的結果
    // error
    if (backendResponse instanceof ErrorResponse) {
        return Collections.<DatabasePacket>singletonList(createErrorPacket(
            ((ErrorResponse) backendResponse).getCause()));
    }
    // update
    if (backendResponse instanceof UpdateResponse) {
        return Collections.<DatabasePacket>singletonList(createUpdatePacket(
            (UpdateResponse) backendResponse));
    }
    // query
    isQuery = true;
    return createQueryPackets((QueryResponse) backendResponse);
}

總結: MySQLComQueryPacketExecutor 總體過程非常清晰,解析、轉發、響應。

  1. 解析:按 MySQL 協議解析 client 發送的請求。MySQL 解析的核心邏輯在 sharding-proxy-transport-mysql 包中,主要接口類是 MySQLPacketCodecEngine、MySQLPacketPayload、MySQLPacket。
  2. 轉發:將解析后的 SQL 轉發給 MySQL 服務器,並返加響應結果。實際轉發委托給了 TextProtocolBackendHandler,這個類的功能會在后面繼續分析。
  3. 響應:將處理后的結果偽裝成 MySQL 服務器,響應客戶端。這個主要是偽裝成 MySQL 協議。MySQL 協議參考:https://dev.mysql.com/doc/internals/en/client-server-protocol.html

MySQLComStmtExecuteExecutor 執行流程

public MySQLComStmtExecuteExecutor(
    final MySQLComStmtExecutePacket comStmtExecutePacket,
    final BackendConnection backendConnection) {
    databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance()
        .newBinaryProtocolInstance(backendConnection.getLogicSchema(),
            comStmtExecutePacket.getSql(), comStmtExecutePacket.getParameters(),
            backendConnection);
}

@Override
public Collection<DatabasePacket> execute() {
    // 委托給 databaseCommunicationEngine 執行
    BackendResponse backendResponse = databaseCommunicationEngine.execute();
    // 包裝返回的結果,同 MySQLComQueryPacketExecutor
    ...
}

總結: 可以看到,和 MySQLComQueryPacketExecutor 基本類似,唯一不同的在於MySQLComQueryPacketExecutor 真正調用 TextProtocolBackendHandler 執行,而 MySQLComStmtExecuteExecutor 調用 DatabaseCommunicationEngine 執行。

那問題就來了,為什么會有 TextProtocolBackendHandler 和 DatabaseCommunicationEngine 兩個執行器?它們到底是什么關系呢?TextProtocolBackendHandler 的實現類其實就是調用 DatabaseCommunicationEngine。

MySQL 執行過程

總結: 無論是 MySQLComQueryPacketExecutor 還是 MySQLComStmtExecuteExecutor 最終都是調用 DatabaseCommunicationEngine 執行。

Sharding-Proxy 消息處理

位於 sharding-proxy-backend 工程中。

Sharding-Proxy 消息處理時序圖

總結: Sharding-Proxy 消息處理過程和 Sharding-Jdbc 處理過程差不多,也要經過 SQL 解析、路由、改寫、合並這四個核心過程。前面三個類是 Sharding-Proxy 中的,后面四個類則是 Sharding-Jdbc 的,兩套邏輯共用一套核心代碼。

DatabaseCommunicationEngine

DatabaseCommunicationEngine 是 Sharding-Proxy 內部轉發執行器,負責將請求轉發給底層 MySQL 服務器。

DatabaseCommunicationEngine 類圖

我們看一下 QueryBackendHandler 的實現類。

public final class QueryBackendHandler implements TextProtocolBackendHandler {
    @Override
    public BackendResponse execute() {
        ...
        databaseCommunicationEngine = databaseCommunicationEngineFactory
                .newTextProtocolInstance(backendConnection.getLogicSchema(),
                        sql, backendConnection);
        return databaseCommunicationEngine.execute();
    }
}

說明: TextProtocolBackendHandler 是不使用預解析的執行器,調用 databaseCommunicationEngineFactoy.newTextProtocolInstance,而使用預解析的 MySQLComStmtExecuteExecutor 內部調用 DatabaseCommunicationEngineFactory.newBinaryProtocolInstance。我們看一下這兩個方法的內部實現。

public DatabaseCommunicationEngine newTextProtocolInstance(final LogicSchema logicSchema,
        final String sql, final BackendConnection backendConnection) {
    return new JDBCDatabaseCommunicationEngine(logicSchema, sql,
            new JDBCExecuteEngine(backendConnection,
                    new StatementExecutorWrapper(logicSchema)));
}
    
public DatabaseCommunicationEngine newBinaryProtocolInstance(
        final LogicSchema logicSchema, final String sql, 
        final List<Object> parameters,
        final BackendConnection backendConnection) {
    return new JDBCDatabaseCommunicationEngine(logicSchema, sql,
                new JDBCExecuteEngine(backendConnection,
                    new PreparedStatementExecutorWrapper(logicSchema, parameters)));
}

說明: 在 Sharding-Proxy 中 TextProtocol 代表的是不使用預解析,而 BinaryProtocol 代表使用預解析。JDBCDatabaseCommunicationEngine 內部直接委托給 JDBCExecuteEngine 完成。

JDBCDatabaseCommunicationEngine

JDBCDatabaseCommunicationEngine 類圖
  • LogicSchema:配置類解析規則。
  • JDBCExecuteEngine:SQL 執行器,向 MySQL 服務器下發請求並獲取查詢結果。
  • JDBCBackendDataSource:內部是一個 Map,維護了真實服務器的連接池,可以從中獲取 MySQL 服務器的連接。
  • BackendConnection:用於管理底層 MySQL 連接,分為事務和非事務連接,如果是事務連接,則在獲取連接時調用 connection.setAutoCommit(false) 開啟一個事務。
  • StatementExecutorWrapper:不使用預解析。
  • PreparedStatementExecutorWrapper:使用預解析。

JDBCDatabaseCommunicationEngine 執行過程的代碼如下:

private final String sql;
private final JDBCExecuteEngine executeEngine;

@Override
public BackendResponse execute() {
    try {
        // 1. SQL 路由、改寫
        SQLRouteResult routeResult = executeEngine.getJdbcExecutorWrapper().route(sql);
        return execute(routeResult);
    } catch (final SQLException ex) {
        return new ErrorResponse(ex);
    }
}

private BackendResponse execute(final SQLRouteResult routeResult) throws SQLException {
    ...
    // 2. SQL 執行
    response = executeEngine.execute(routeResult);
    if (logicSchema instanceof ShardingSchema) {
        logicSchema.refreshTableMetaData(routeResult.getSqlStatementContext());
    }
    // 4. 結果合並
    return merge(routeResult);
}

總結: JDBCDatabaseCommunicationEngine 執行 SQL 過程包括:SQL 路由、改寫、執行、結果合並,其中前三步都是委托 JDBCExecuteEngine 完成的。

JDBCExecuteEngine

// 管理底層 MySQL 連接
private final BackendConnection backendConnection;
// ①根據SQL生成執行計划(包括SQL解析、路由、改寫);②生成Statement;③執行SQL
private final JDBCExecutorWrapper jdbcExecutorWrapper;
// 生成執行計划 RouteUnit -> StatementExecuteUnit
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
// 執行 StatementExecuteUnit
private final SQLExecuteTemplate sqlExecuteTemplate;

@Override
public BackendResponse execute(final SQLRouteResult routeResult) throws SQLException {
    final SQLStatementContext sqlStatementContext = routeResult.getSqlStatementContext();
    boolean isReturnGeneratedKeys = sqlStatementContext.getSqlStatement()
        instanceof InsertStatement;
    boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    // 執行計划,ProxyJDBCExecutePrepareCallback用於創建執行計划
    Collection<ShardingExecuteGroup<StatementExecuteUnit>> sqlExecuteGroups =
        sqlExecutePrepareTemplate.getExecuteUnitGroups(
        routeResult.getRouteUnits(),
        new ProxyJDBCExecutePrepareCallback(
            backendConnection, jdbcExecutorWrapper, isReturnGeneratedKeys));
    
    // 執行SQL,ProxySQLExecuteCallback用於執行SQL
    Collection<ExecuteResponse> executeResponses = sqlExecuteTemplate.executeGroup(
        (Collection) sqlExecuteGroups,
        new ProxySQLExecuteCallback(backendConnection, jdbcExecutorWrapper,
                                    isExceptionThrown, isReturnGeneratedKeys, true),
        new ProxySQLExecuteCallback(backendConnection, jdbcExecutorWrapper,
                                    isExceptionThrown, isReturnGeneratedKeys, false));
    ExecuteResponse executeResponse = executeResponses.iterator().next();

    // 組裝結果
    return executeResponse instanceof ExecuteQueryResponse
        ? getExecuteQueryResponse(((ExecuteQueryResponse) executeResponse)
                                  .getQueryHeaders(), executeResponses)
        : new UpdateResponse(executeResponses);
}

每天用心記錄一點點。內容也許不重要,但習慣很重要!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM