Sharding-Proxy 源碼分析
在看 Sharding-Proxy 源碼之前,強烈建議先閱讀一直官網的兩篇文章:
Sharding-Proxy 包結構
sharding-proxy
├── sharding-proxy-backend 負責與底層mysql通信
├── sharding-proxy-bootstrap 啟動sharding-proxy
├── sharding-proxy-common yaml配置文件加載...
├── sharding-proxy-frontend 啟動socket,代理mysql/pg
│ ├── sharding-proxy-frontend-core 啟動sokcet
│ ├── sharding-proxy-frontend-mysql 實現類MySQLProtocolFrontendEngine
│ ├── sharding-proxy-frontend-postgresql 實現類PostgreSQLProtocolFrontendEngine
│ └── sharding-proxy-frontend-spi 核心spi,DatabaseProtocolFrontendEngine
└── sharding-proxy-transport 代理數據庫對應的編解碼
├── sharding-proxy-transport-core 核心api,DatabasePacket和PacketPayload
├── sharding-proxy-transport-mysql MySQL協議編解碼
└── sharding-proxy-transport-postgresql pg協議編解碼
總結: Sharding-Proxy 包功能說明
-
sharding-proxy-bootstrap:啟動入口,調用 LogicSchemas 加載配置,ShardingProxy 啟動程序,綁定 socket。
-
sharding-proxy-frontend-core:啟動netty,hander 的初始化類為 ServerHandlerInitializer,編解碼對應的 Handler 為 PacketCodec,業務處理對應的 Handler 為 FrontendChannelInboundHandler。這兩個 Handler 實際的工作都委托給了 DatabaseProtocolFrontendEngine。
-
sharding-proxy-frontend-spi:核心 spi,DatabaseProtocolFrontendEngine 包含編解碼,執行器。 DatabaseProtocolFrontendEngine 目前有 MySQL 和 PG 兩個實現。
-
sharding-proxy-frontend-mysql:實現類 MySQLProtocolFrontendEngine
-
sharding-proxy-transport-mysql:mysql 報文解析,主要接口為 MySQLPacket。
Sharding-Proxy 啟動流程
總結: Sharding-Proxy 啟動流程最核心的是通過 ServerHandlerInitializer 加載了 PacketCodec(編解碼) 和 FrontendChannelInboundHandler(業務處理器) 兩個處理器。這兩個處理的具體工作都委托給了 DatabaseProtocolFrontendEngine 完成,有 MySQL 和 PostgreSQL 兩個實現。
Bootstrap
啟動入口位於 sharding-proxy-bootstrap 工程中。Bootstrap 提供了有注冊中心和無注冊中心兩種啟動方式,以無注冊中心的啟動方式為例:
private static void startWithoutRegistryCenter(
final Map<String, YamlProxyRuleConfiguration> ruleConfigs,
final YamlAuthenticationConfiguration authentication,
final Properties prop, final int port) throws SQLException {
Authentication authenticationConfiguration = getAuthentication(authentication);
ShardingProxyContext.getInstance().init(authenticationConfiguration, prop);
// 加載配置規則
LogicSchemas.getInstance().init(
getDataSourceParameterMap(ruleConfigs),
getRuleConfiguration(ruleConfigs));
initOpenTracing();
// 啟動 sharding-proxy
ShardingProxy.getInstance().start(port);
}
Bootstrap 啟動最核心的步驟是 ShardingProxy 啟動代理。Sharding-Proxy 會啟動一個 Netty 服務器,默認端口為 3307。
ShardingProxy
程序啟動入口位於 sharding-proxy-frontend-core 工程中。Netty 服務器通過 ServerHandlerInitializer 加載對應的 Handler,包括 PacketCodec(編解碼) 和 FrontendChannelInboundHandler(業務處理器) 兩個處理器。
DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine =
DatabaseProtocolFrontendEngineFactory.newInstance(
LogicSchemas.getInstance().getDatabaseType());
pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
pipeline.addLast(new FrontendChannelInboundHandler(databaseProtocolFrontendEngine));
總結: 這兩個 Handler 的實際工作都是委托給 DatabaseProtocolFrontendEngine 完成,目前有 MySQL 和 PG 兩個實現。
DatabaseProtocolFrontendEngine
位於 sharding-proxy-frontend-spi工程中。DatabaseProtocolFrontendEngine 是一個 SPI 接口,目前提供了 MySQL 和 PostgreSQL 兩種實現,分別位於 sharding-proxy-frontend-mysql 和 sharding-proxy-frontend-postgresql 工程中。
public interface DatabaseProtocolFrontendEngine extends DatabaseTypeAwareSPI {
FrontendContext getFrontendContext();
AuthenticationEngine getAuthEngine();
void release(BackendConnection backendConnection);
// 編解碼器
DatabasePacketCodecEngine getCodecEngine();
// SQL執行引擎
CommandExecuteEngine getCommandExecuteEngine();
}
總結: DatabaseProtocolFrontendEngine 方法最重要的兩個屬性是 DatabasePacketCodecEngine 解碼器和 CommandExecuteEngine SQL執行引擎。
Sharding-Proxy 請求接入
總結: Sharding-Proxy 接收到消息后處理過程有如下幾步:
- PacketCodec:將從 client 接收的請求按長度解碼成 ByteBuf,實際由解碼器 DatabasePacketCodecEngine#decode 完成。
- FrontendChannelInboundHandler:將請求 ByteBuf 交給 CommandExecutorTask 處理。
- CommandExecutorTask :消息處理核心類。
- 第一步:調用 DatabasePacketCodecEngine#createPacketPayload 將消息包裝成 PacketPayload。
- 第二步:調用 CommandExecuteEngine.getCommandPacketType 將消息解碼成具體的 CommandPacket。
- 第三步:調用 CommandExecutor#getCommandExecutor 方法,根據消息類別獲取不同的執行器。
- 第四步:調用 CommandExecutor#execute 執行任務。
- 第五步:將處理后的結果偽裝成 MySQL 服務器的協議,返回給 client。
FrontendChannelInboundHandler
消息處理的入口 FrontendChannelInboundHandler 位於 sharding-proxy-frontend-core 工程中。Sharding-Proxy 接收到請求后,先由 PacketCodec 按長度解碼,然后由 FrontendChannelInboundHandler 進行處理,代碼如下:
@Override
public void channelRead(final ChannelHandlerContext context, final Object message) {
if (!authorized) {
authorized = auth(context, (ByteBuf) message);
return;
}
// CommandExecutorSelector 返回 ExecutorService,任務執行 CommandExecutorTask
CommandExecutorSelector.getExecutor(
databaseProtocolFrontendEngine.getFrontendContext()
.isOccupyThreadForPerConnection(),
backendConnection.isSupportHint(),
backendConnection.getTransactionType(),
context.channel().id())
.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
}
CommandExecutorTask
// 核心api,處理編解碼,sql執行
private final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine;
// 管理后台 MySQL 連接
private final BackendConnection backendConnection;
// 按長度解碼后的client請求信息,ByteBuf
private final Object message;
@Override
public void run() {
// 按包長度解碼成 ByteBuf,client
PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine()
.createPacketPayload((ByteBuf) message));
// 將 ByteBuf 解析成具體的命令,並轉發到 backendConnection,響應 client
isNeedFlush = executeCommand(context, payload, backendConnection);
}
private boolean executeCommand(final ChannelHandlerContext context,
final PacketPayload payload, final BackendConnection backendConnection)
throws SQLException {
// 執行引擎
CommandExecuteEngine commandExecuteEngine = databaseProtocolFrontendEngine
.getCommandExecuteEngine();
// mysql命令類型
CommandPacketType type = commandExecuteEngine.getCommandPacketType(payload);
// 解碼
CommandPacket commandPacket = commandExecuteEngine.getCommandPacket(
payload, type, backendConnection);
// 執行器
CommandExecutor commandExecutor = commandExecuteEngine.getCommandExecutor(
type, commandPacket, backendConnection);
// 向真實 mysql 服務器發送 sql,並返回結果 responsePackets
Collection<DatabasePacket> responsePackets = commandExecutor.execute();
if (responsePackets.isEmpty()) {
return false;
}
// 將返回結果偽裝成 mysql 協議,返回給客戶端 client
for (DatabasePacket each : responsePackets) {
context.write(each);
}
if (commandExecutor instanceof QueryCommandExecutor) {
commandExecuteEngine.writeQueryData(context, backendConnection,
(QueryCommandExecutor) commandExecutor, responsePackets.size());
return true;
}
return databaseProtocolFrontendEngine.getFrontendContext()
.isFlushForPerCommandPacket();
}
總結: CommandExecutorTask 內部很多工作都委托給了 CommandExecuteEngine 完成,CommandExecuteEngine 也有 MySQL 和 PostgreSQL 兩個實現。CommandExecuteEngine 主要是對具體的協議解碼 CommandPacket,並獲取具體的執行器 CommandExecutor。
MySQL 報文解析器
位於 sharding-proxy-transport-mysql 工程中。
-
MySQLPacketCodecEngine:實現 DatabasePacketCodecEngine 接口,根據包長度解析報文,並將解析的 ByteBuf 包裝成 MySQLPacketPayload。 -
MySQLPacketPayload:實現 PacketPayload 接口,本質是對 ByteBuf 的包裝,提供對 ByteBuf 的 read/write 字段。 -
MySQLCommandPacketFactory:將 MySQLPacketPayload 解析成具體協議的報文 MySQLPacket。 -
MySQLPacket:實現了 DatabasePacket 接口。將 ByteBuf 解析成具體的命令,主要分兩大類:- 一是 Statement,代表實現是 MySQLComQueryPacket,
- 二是 PrepareStatement,代表實現是 MySQLComStmtExecutePacket。
MySQL 執行器
位於 sharding-proxy-frontend-mysql 工程中。Sharding-Sphere 將客戶端發送的 SQL 命令解析后,轉發給底層的 MySQL 服務器,核心的接口類如下:
- CommandExecutor:核心接口,SQL 執行器。Sharding-Proxy 解析 client 的命令,轉發給 MySQL 服務器,並將 MySQL 服務器返回的結果按 MySQL 協議包偽裝后響應給 client。
- MySQLCommandExecutorFactory:根據請求的類型不同(eg: COM_QUERY, COM_STMT_EXECUTE),初始化不同的執行器,主要分為兩類:
- 一是 Statement,不使用預解析功能,代表實現是 MySQLComQueryPacketExecutor,最終調用 TextProtocolBackendHandler 執行。
- 二是 PrepareStatement,使用預解析功能,代表實現是 MySQLComStmtExecuteExecutor,最終調用 DatabaseCommunicationEngine 執行。
注意: 並不是所有的 client 請求都轉發到 mysql 服務器上了。如 MySQL 預解析操作分為 prepare、execute、close、reset 四步,分別對應 MySQLComStmtPrepareExecutor、MySQLComStmtExecuteExecutor、MySQLComStmtCloseExecutor、MySQLComStmtResetExecutor 四個類。除了 execute 會將請求轉發給底層 mysql 服務器外,其它的解析是在代理層(sharding-proxy)完成的,將 SQLParseEngine 解析后結果緩存在 MySQLBinaryStatementRegistry 實例中,這樣能避免重復解析 SQL 提高性能。
MySQLComQueryPacketExecutor 執行流程
public MySQLComQueryPacketExecutor(final MySQLComQueryPacket comQueryPacket,
final BackendConnection backendConnection) {
// 包含 SQL 和 connection,textProtocolBackendHandler 可以執行 SQL
textProtocolBackendHandler = TextProtocolBackendHandlerFactory.newInstance(
DatabaseTypes.getActualDatabaseType("MySQL"),
comQueryPacket.getSql(), backendConnection);
}
@Override
public Collection<DatabasePacket> execute() {
...
// 委托給 textProtocolBackendHandler 執行
BackendResponse backendResponse = textProtocolBackendHandler.execute();
// 包裝返回的結果
// error
if (backendResponse instanceof ErrorResponse) {
return Collections.<DatabasePacket>singletonList(createErrorPacket(
((ErrorResponse) backendResponse).getCause()));
}
// update
if (backendResponse instanceof UpdateResponse) {
return Collections.<DatabasePacket>singletonList(createUpdatePacket(
(UpdateResponse) backendResponse));
}
// query
isQuery = true;
return createQueryPackets((QueryResponse) backendResponse);
}
總結: MySQLComQueryPacketExecutor 總體過程非常清晰,解析、轉發、響應。
- 解析:按 MySQL 協議解析 client 發送的請求。MySQL 解析的核心邏輯在 sharding-proxy-transport-mysql 包中,主要接口類是 MySQLPacketCodecEngine、MySQLPacketPayload、MySQLPacket。
- 轉發:將解析后的 SQL 轉發給 MySQL 服務器,並返加響應結果。實際轉發委托給了 TextProtocolBackendHandler,這個類的功能會在后面繼續分析。
- 響應:將處理后的結果偽裝成 MySQL 服務器,響應客戶端。這個主要是偽裝成 MySQL 協議。MySQL 協議參考:https://dev.mysql.com/doc/internals/en/client-server-protocol.html
MySQLComStmtExecuteExecutor 執行流程
public MySQLComStmtExecuteExecutor(
final MySQLComStmtExecutePacket comStmtExecutePacket,
final BackendConnection backendConnection) {
databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance()
.newBinaryProtocolInstance(backendConnection.getLogicSchema(),
comStmtExecutePacket.getSql(), comStmtExecutePacket.getParameters(),
backendConnection);
}
@Override
public Collection<DatabasePacket> execute() {
// 委托給 databaseCommunicationEngine 執行
BackendResponse backendResponse = databaseCommunicationEngine.execute();
// 包裝返回的結果,同 MySQLComQueryPacketExecutor
...
}
總結: 可以看到,和 MySQLComQueryPacketExecutor 基本類似,唯一不同的在於MySQLComQueryPacketExecutor 真正調用 TextProtocolBackendHandler 執行,而 MySQLComStmtExecuteExecutor 調用 DatabaseCommunicationEngine 執行。
那問題就來了,為什么會有 TextProtocolBackendHandler 和 DatabaseCommunicationEngine 兩個執行器?它們到底是什么關系呢?TextProtocolBackendHandler 的實現類其實就是調用 DatabaseCommunicationEngine。
總結: 無論是 MySQLComQueryPacketExecutor 還是 MySQLComStmtExecuteExecutor 最終都是調用 DatabaseCommunicationEngine 執行。
Sharding-Proxy 消息處理
位於 sharding-proxy-backend 工程中。
總結: Sharding-Proxy 消息處理過程和 Sharding-Jdbc 處理過程差不多,也要經過 SQL 解析、路由、改寫、合並這四個核心過程。前面三個類是 Sharding-Proxy 中的,后面四個類則是 Sharding-Jdbc 的,兩套邏輯共用一套核心代碼。
DatabaseCommunicationEngine
DatabaseCommunicationEngine 是 Sharding-Proxy 內部轉發執行器,負責將請求轉發給底層 MySQL 服務器。
我們看一下 QueryBackendHandler 的實現類。
public final class QueryBackendHandler implements TextProtocolBackendHandler {
@Override
public BackendResponse execute() {
...
databaseCommunicationEngine = databaseCommunicationEngineFactory
.newTextProtocolInstance(backendConnection.getLogicSchema(),
sql, backendConnection);
return databaseCommunicationEngine.execute();
}
}
說明: TextProtocolBackendHandler 是不使用預解析的執行器,調用 databaseCommunicationEngineFactoy.newTextProtocolInstance,而使用預解析的 MySQLComStmtExecuteExecutor 內部調用 DatabaseCommunicationEngineFactory.newBinaryProtocolInstance。我們看一下這兩個方法的內部實現。
public DatabaseCommunicationEngine newTextProtocolInstance(final LogicSchema logicSchema,
final String sql, final BackendConnection backendConnection) {
return new JDBCDatabaseCommunicationEngine(logicSchema, sql,
new JDBCExecuteEngine(backendConnection,
new StatementExecutorWrapper(logicSchema)));
}
public DatabaseCommunicationEngine newBinaryProtocolInstance(
final LogicSchema logicSchema, final String sql,
final List<Object> parameters,
final BackendConnection backendConnection) {
return new JDBCDatabaseCommunicationEngine(logicSchema, sql,
new JDBCExecuteEngine(backendConnection,
new PreparedStatementExecutorWrapper(logicSchema, parameters)));
}
說明: 在 Sharding-Proxy 中 TextProtocol 代表的是不使用預解析,而 BinaryProtocol 代表使用預解析。JDBCDatabaseCommunicationEngine 內部直接委托給 JDBCExecuteEngine 完成。
JDBCDatabaseCommunicationEngine
- LogicSchema:配置類解析規則。
- JDBCExecuteEngine:SQL 執行器,向 MySQL 服務器下發請求並獲取查詢結果。
- JDBCBackendDataSource:內部是一個 Map,維護了真實服務器的連接池,可以從中獲取 MySQL 服務器的連接。
- BackendConnection:用於管理底層 MySQL 連接,分為事務和非事務連接,如果是事務連接,則在獲取連接時調用 connection.setAutoCommit(false) 開啟一個事務。
- StatementExecutorWrapper:不使用預解析。
- PreparedStatementExecutorWrapper:使用預解析。
JDBCDatabaseCommunicationEngine 執行過程的代碼如下:
private final String sql;
private final JDBCExecuteEngine executeEngine;
@Override
public BackendResponse execute() {
try {
// 1. SQL 路由、改寫
SQLRouteResult routeResult = executeEngine.getJdbcExecutorWrapper().route(sql);
return execute(routeResult);
} catch (final SQLException ex) {
return new ErrorResponse(ex);
}
}
private BackendResponse execute(final SQLRouteResult routeResult) throws SQLException {
...
// 2. SQL 執行
response = executeEngine.execute(routeResult);
if (logicSchema instanceof ShardingSchema) {
logicSchema.refreshTableMetaData(routeResult.getSqlStatementContext());
}
// 4. 結果合並
return merge(routeResult);
}
總結: JDBCDatabaseCommunicationEngine 執行 SQL 過程包括:SQL 路由、改寫、執行、結果合並,其中前三步都是委托 JDBCExecuteEngine 完成的。
JDBCExecuteEngine
// 管理底層 MySQL 連接
private final BackendConnection backendConnection;
// ①根據SQL生成執行計划(包括SQL解析、路由、改寫);②生成Statement;③執行SQL
private final JDBCExecutorWrapper jdbcExecutorWrapper;
// 生成執行計划 RouteUnit -> StatementExecuteUnit
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
// 執行 StatementExecuteUnit
private final SQLExecuteTemplate sqlExecuteTemplate;
@Override
public BackendResponse execute(final SQLRouteResult routeResult) throws SQLException {
final SQLStatementContext sqlStatementContext = routeResult.getSqlStatementContext();
boolean isReturnGeneratedKeys = sqlStatementContext.getSqlStatement()
instanceof InsertStatement;
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
// 執行計划,ProxyJDBCExecutePrepareCallback用於創建執行計划
Collection<ShardingExecuteGroup<StatementExecuteUnit>> sqlExecuteGroups =
sqlExecutePrepareTemplate.getExecuteUnitGroups(
routeResult.getRouteUnits(),
new ProxyJDBCExecutePrepareCallback(
backendConnection, jdbcExecutorWrapper, isReturnGeneratedKeys));
// 執行SQL,ProxySQLExecuteCallback用於執行SQL
Collection<ExecuteResponse> executeResponses = sqlExecuteTemplate.executeGroup(
(Collection) sqlExecuteGroups,
new ProxySQLExecuteCallback(backendConnection, jdbcExecutorWrapper,
isExceptionThrown, isReturnGeneratedKeys, true),
new ProxySQLExecuteCallback(backendConnection, jdbcExecutorWrapper,
isExceptionThrown, isReturnGeneratedKeys, false));
ExecuteResponse executeResponse = executeResponses.iterator().next();
// 組裝結果
return executeResponse instanceof ExecuteQueryResponse
? getExecuteQueryResponse(((ExecuteQueryResponse) executeResponse)
.getQueryHeaders(), executeResponses)
: new UpdateResponse(executeResponses);
}
每天用心記錄一點點。內容也許不重要,但習慣很重要!
