源碼👇seata-golang
概述
我們知道 Seata Java Client 的 AT 模式,通過代理數據源,實現了對業務代碼無侵入的分布式事務協調機制,將與 Transaction Coordinator (TC) 交互的邏輯、Commit 的邏輯、Rollback 的邏輯,隱藏在切面和代理數據源相應的代碼中,使開發者無感知。那如果這個方法,要用 Golang 來實現一遍,應該如何操作呢?關於這個問題,我想了很久,最初的設想是,對 database/sql
的 mysql driver 進行增強,在對包 github.com/go-sql-driver/mysql
研究了一段時間后,還是沒有頭緒,不知如何下手,最后轉而增強 database/sql
包。由於 AT 模式必須保證本地事務的正確處理,在具體業務開發時,首先要通過 db.Begin()
獲得一個 Tx 對象,然后再 tx.Exec()
執行數據庫操作,最后 tx.Commit()
提交或 tx.Rollback()
回滾。這種處理方式算是一個 Golang 數據庫事務處理的基本操作。 所以對 database/sql
的增強,我們重點關注這幾個方法 db.Begin()
、 tx.Exec()
、tx.Commit()
、tx.Rollback
。
事務提交、回滾
通過 Seata Java Client 的相關代碼,我們知道,在本地事務提交的時候,主要是將分支事務注冊到 TC 上,並將數據庫操作產生的 undoLog 一起寫入到 undoLog 表;本地事務回滾的時候,需要將分支事務(即本地事務)的執行狀態報告給 TC,使 TC 好知道是否通知參與全局事務的其他分支回滾。
func (tx *Tx) Commit() error {
//注冊分支事務
branchId,err := tx.register()
if err != nil {
return errors.WithStack(err)
}
tx.tx.Context.BranchId = branchId
if tx.tx.Context.HasUndoLog() {
//將 undoLog 寫入 undoLog 表
err = manager.GetUndoLogManager().FlushUndoLogs(tx.tx)
if err != nil {
err1 := tx.report(false)
if err1 != nil {
return errors.WithStack(err1)
}
return errors.WithStack(err)
}
err = tx.tx.Commit()
if err != nil {
err1 := tx.report(false)
if err1 != nil {
return errors.WithStack(err1)
}
return errors.WithStack(err)
}
} else {
return tx.tx.Commit()
}
if tx.reportSuccessEnable {
tx.report(true)
}
tx.tx.Context.Reset()
return nil
}
db.Begin()
會產生一個 Tx 對象,tx.Exec()
會產生 undoLog,tx.Commit()
將 undoLog 刷到數據庫中。那么 undoLog 保存到哪里呢?答案是 TxContext 中。
type TxContext struct {
*context.RootContext
Xid string
BranchId int64
IsGlobalLockRequire bool
LockKeysBuffer *model.Set
SqlUndoItemsBuffer []*undo.SqlUndoLog
}
Commit() 方法中的 tx.tx.Context
,第一個 tx 是封裝的 Tx 對象,第二個 tx 是 database/sql
的 Tx,tx.tx.Context
則是 TxContext。UndoLogManager 則是操作 undoLog 的核心對象,處理 undoLog 的插入、刪除,並查詢出 undoLog 用於回滾。
func (tx *Tx) Rollback() error {
err := tx.tx.Rollback()
if tx.tx.Context.InGlobalTransaction() && tx.tx.Context.IsBranchRegistered() {
// 報告 TC 分支事務執行失敗
tx.report(false)
}
tx.tx.Context.Reset()
return err
}
通過上面的代碼呢,我們知道增強型 Tx 對象需要向 TC 注冊分支事務,並報告分支事務的執行狀態,相應代碼如下:
func (tx *Tx) register() (int64,error) {
return dataSourceManager.BranchRegister(meta.BranchTypeAT,tx.tx.ResourceId,"",tx.tx.Context.Xid,
nil,tx.tx.Context.BuildLockKeys())
}
func (tx *Tx) report(commitDone bool) error {
retry := tx.reportRetryCount
for retry > 0 {
var err error
if commitDone {
err = dataSourceManager.BranchReport(meta.BranchTypeAT, tx.tx.Context.Xid, tx.tx.Context.BranchId,
meta.BranchStatusPhaseoneDone,nil)
} else {
err = dataSourceManager.BranchReport(meta.BranchTypeAT, tx.tx.Context.Xid, tx.tx.Context.BranchId,
meta.BranchStatusPhaseoneFailed,nil)
}
if err != nil {
logging.Logger.Errorf("Failed to report [%d/%s] commit done [%t] Retry Countdown: %d",
tx.tx.Context.BranchId,tx.tx.Context.Xid,commitDone,retry)
retry = retry -1
if retry == 0 {
return errors.WithMessagef(err,"Failed to report branch status %t",commitDone)
}
}
}
return nil
}
和 TC 進行通信的主要邏輯還是在 DataSourceManager 里面。AT 模式涉及的兩個關鍵對象 DataSourceManager、UndoLogManager 就浮出水面。一個用於遠程 TC 交互,一個用於本地數據庫處理。
事務執行
func (tx *Tx) Exec(query string, args ...interface{}) (sql.Result, error) {
var parser = p.New()
// 解析業務 sql
act,_ := parser.ParseOneStmt(query,"","")
deleteStmt,isDelete := act.(*ast.DeleteStmt)
if isDelete {
executor := &DeleteExecutor{
tx: tx.tx,
sqlRecognizer: mysql.NewMysqlDeleteRecognizer(query,deleteStmt),
values: args,
}
return executor.Execute()
}
insertStmt,isInsert := act.(*ast.InsertStmt)
if isInsert {
executor := &InsertExecutor{
tx: tx.tx,
sqlRecognizer: mysql.NewMysqlInsertRecognizer(query,insertStmt),
values: args,
}
return executor.Execute()
}
updateStmt,isUpdate := act.(*ast.UpdateStmt)
if isUpdate {
executor := &UpdateExecutor{
tx: tx.tx,
sqlRecognizer: mysql.NewMysqlUpdateRecognizer(query,updateStmt),
values: args,
}
return executor.Execute()
}
return tx.tx.Tx.Exec(query,args)
}
執行業務 sql,並生成 undoLog 的關鍵,在於識別業務 sql 執行了什么操作:插入?刪除?修改?這里使用 tidb 的 sql parser 去解析業務 sql,再使用相應的執行器去執行業務 sql,生成 undoLog 保存在 Tx_Context 中。
事務開啟
db.Begin()
返回增強型的 Tx 對象。
func (db *DB) Begin(ctx *context.RootContext) (*Tx,error) {
tx,err := db.DB.Begin()
if err != nil {
return nil,err
}
proxyTx := &tx2.ProxyTx{
Tx: tx,
DSN: db.conf.DSN,
ResourceId: db.GetResourceId(),
Context: tx2.NewTxContext(ctx),
}
return &Tx{
tx: proxyTx,
reportRetryCount: db.conf.ReportRetryCount,
reportSuccessEnable: db.conf.ReportSuccessEnable,
},nil
}
seata-golang at 模式的使用
- 首先執行 scripts 腳本,初始化數據庫
如果之前沒有初始化過 seata 數據庫,先執行seata-golang/scripts/server/db/mysql.sql
腳本 - 修改 dsn 數據庫配置,修改下列文件:
seata-golang/tc/app/profiles/dev/config.yml
seata-golang/samples/at/product_svc/conf/client.yml
seata-golang/samples/at/product_svc/conf/client.yml
- 將下列文件中的 configPath 修改為 client.yml 配置文件的路徑
seata-golang/samples/at/product_svc/main.go
seata-golang/samples/at/order_svc/main.go
seata-golang/samples/at/aggregation_svc/main.go
- 依次運行 tc、order_svc、product_svc、aggragation_svc,訪問下列地址開始測試:
http://localhost:8003/createSoCommit
http://localhost:8003/createSoRollback
TC 啟動參考參與 Seata 社區到 go 與 Seata 的邂逅
seata-golang 后續安排
接下來不打算再增加新的 feature。Java 版 Seata 畢竟發展了一年多時間,並且有很多社區成員一起維護,Go 版本目前主要是我在開發,時間不到2個月,現有的代碼,僅是完成了框架,還需要大量優化,改bug,后續的工作重心在於使 seata-golang 穩定運行,生產可用,希望對分布式事務感興趣且對 Go 感興趣的同學一起加入進來,一起做些事情。進入微信群,請加我微信:scottlewis,備注進群。