使用golang理解mysql的兩階段提交


使用golang理解mysql的兩階段提交

文章源於一個問題:如果我們現在有兩個mysql實例,在我們要盡量簡單地完成分布式事務,怎么處理?

場景重現

比如我們現在有兩個數據庫,mysql3306和mysql3307。這里我們使用docker來創建這兩個實例:

# mysql3306創建命令
docker run -d -p 3306:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3306.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3306:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7

# msyql3306的配置:
[mysqld]
pid-file     = /var/run/mysqld/mysqld.pid
socket          = /var/run/mysqld/mysqld.sock
datadir          = /var/lib/mysql
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30

# mysql3307創建命令
docker run -d -p 3307:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3307.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3307:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7

# msyql3307的配置:
[mysqld]
pid-file     = /var/run/mysqld/mysqld.pid
socket          = /var/run/mysqld/mysqld.sock
datadir          = /var/lib/mysql
server-id = 2
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30

在mysql3306中
我們有一個user表

create table user (
    id int,
    name varchar(10),
    score int
);


insert into user values(1, "foo", 10)

在mysql3307中,我們有一個wallet表。

create table wallet (
    id int,
    money float 
);


insert into wallet values(1, 10.1)

我們可以看到,id為1的用戶初始分數(score)為10,而它的錢,在wallet中初始錢(money)為10.1。

現在假設我們有一個操作,需要對這個用戶進行操作:每次操作增加分數2,並且增加錢數1.2。

這個操作需要很強的一致性。

思考

兩階段提交

這里是一個分布式事務的概念,我們可以使用2PC的方法進行保證事務

20200331161038

2PC的概念如圖所示,引入一個資源協調者的概念,由這個資源協調者進行事務協調。

第一階段,由這個資源協調者對每個mysql實例調用prepare命令,讓所有的mysql實例准備好,如果其中由mysql實例沒有准備好,協調者就讓所有實例調用rollback命令進行回滾。如果所有mysql都prepare完成,那么就進入第二階段。

第二階段,資源協調者讓每個mysql實例都調用commit方法,進行提交。

mysql里面也提供了分布式事務的語句XA。

用單個實例的事務行不行

等等,這個兩階段提交和我們的事務感覺也差不多,都是進行一次開始,然后執行,最后commit,mysql為什么還要專門定義一個xa的命令呢?於是我陷入了思考...

思考不如實操,於是我用golang寫了一個使用mysql的事務實現的“兩階段提交”:

package main

import (
     "database/sql"
     "fmt"

     _ "github.com/go-sql-driver/mysql"
     "github.com/pkg/errors"
)

func main() {
     var err error

     // db1的連接
     db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
     if err != nil {
          panic(err.Error())
     }
     defer db1.Close()

     // db2的連接
     db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
     if err != nil {
          panic(err.Error())
     }
     defer db2.Close()

     // 開始前顯示
     var score int
     db1.QueryRow("select score from user where id = 1").Scan(&score)
     fmt.Println("user1 score:", score)
     var money float64
     db2.QueryRow("select money from wallet where id = 1").Scan(&money)
     fmt.Println("wallet1 money:", money)

     tx1, err := db1.Begin()
     if err != nil {
          panic(errors.WithStack(err))
     }
     tx2, err := db2.Begin()
     if err != nil {
          panic(errors.WithStack(err))
     }

     defer func() {
          if err := recover(); err != nil {
               fmt.Printf("%+v\n", err)
               fmt.Println("=== call rollback ====")
               tx1.Rollback()
               tx2.Rollback()
          }

          db1.QueryRow("select score from user where id = 1").Scan(&score)
          fmt.Println("user1 score:", score)
          db2.QueryRow("select money from wallet where id = 1").Scan(&money)
          fmt.Println("wallet1 money:", money)
     }()

     // DML操作
     if _, err = tx1.Exec("update user set score=score+2 where id =1"); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = tx2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
          panic(errors.WithStack(err))
     }

    // panic(errors.New("commit before error"))

     // commit
     fmt.Println("=== call commit ====")
     err = tx1.Commit()
     if err != nil {
          panic(errors.WithStack(err))
     }

    // panic(errors.New("commit db2 before error"))

     err = tx2.Commit()
     if err != nil {
          panic(errors.WithStack(err))
     }

     db1.QueryRow("select score from user where id = 1").Scan(&score)
     fmt.Println("user1 score:", score)
     db2.QueryRow("select money from wallet where id = 1").Scan(&money)
     fmt.Println("wallet1 money:", money)
}

我這里已經非常小心地在defer中recover錯誤信息,並且執行了rollback命令。

如果我在commit命令之前的任意一個地方調用了panic(errors.New("commit before error")) 那么命令就會進入到了rollback這里,就會把兩個實例的事務都進行回滾。

20200331162451

通過結果我們可以看到,分數和錢數都沒有改變。這個是ok的。

但是如果我在db2的commit之前觸發了panic,那么這個命令進入到了rollback中,但是db1已經commit了,db2還沒有commit,這個時候會出現什么情況?

20200331162723

非常可惜,我們看到了這里的score增長了,但是money沒有增長,這個就說明無法做到事務一致性了。

回到mysql的xa

那么還要回歸到2PC,mysql為2PC的實現增加了xa命令,那么使用這個命令我們能不能避免這個問題呢?

同樣,我用golang寫了一個使用xa命令的代碼

package main

import (
     "database/sql"
     "fmt"
     "strconv"
     "time"

     _ "github.com/go-sql-driver/mysql"
     "github.com/pkg/errors"
)

func main() {
     var err error

     // db1的連接
     db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
     if err != nil {
          panic(err.Error())
     }
     defer db1.Close()

     // db2的連接
     db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
     if err != nil {
          panic(err.Error())
     }
     defer db2.Close()

     // 開始前顯示
     var score int
     db1.QueryRow("select score from user where id = 1").Scan(&score)
     fmt.Println("user1 score:", score)
     var money float64
     db2.QueryRow("select money from wallet where id = 1").Scan(&money)
     fmt.Println("wallet1 money:", money)

     // 生成xid
     xid := strconv.FormatInt(time.Now().Unix(), 10)
     fmt.Println("=== xid:" + xid + " ====")
     defer func() {
          if err := recover(); err != nil {
               fmt.Printf("%+v\n", err)
               fmt.Println("=== call rollback ====")
               db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
               db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
          }

          db1.QueryRow("select score from user where id = 1").Scan(&score)
          fmt.Println("user1 score:", score)
          db2.QueryRow("select money from wallet where id = 1").Scan(&money)
          fmt.Println("wallet1 money:", money)
     }()

     // XA 啟動
     fmt.Println("=== call start ====")
     if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // DML操作
     if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
          panic(errors.WithStack(err))
     }

     // XA end
     fmt.Println("=== call end ====")
     if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // prepare
     fmt.Println("=== call prepare ====")
     if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     // panic(errors.New("db2 prepare error"))
     if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // commit
     fmt.Println("=== call commit ====")
     if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     // panic(errors.New("db2 commit error"))
     if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     db1.QueryRow("select score from user where id = 1").Scan(&score)
     fmt.Println("user1 score:", score)
     db2.QueryRow("select money from wallet where id = 1").Scan(&money)
     fmt.Println("wallet1 money:", money)
}

首先看成功的情況:
20200331164057

一切完美。

如果我們在prepare階段拋出panic,那么結果如下:

20200331164425

證明在第一階段出現異常是可以回滾的。

但是如果我們在commit階段拋出panic:

20200331164533

我們發現,這里的分數增加了,但是money卻沒有增加。

那么這個xa和單個事務有什么區別呢?我又陷入了深深的沉思...

xa的用法不對

經過在技術群(全棧神盾局)請教,討論之后,發現這里對2pc的兩個階段理解還沒到位,這里之所以分為兩個階段,是強調的是每個階段都會持久化,就是第一個階段完成了之后,每個mysql實例就把第一個階段的請求實例化了,這個時候不管是mysql實例停止了還是其他問題,每次重啟的時候都會重新回復這個commit。

我們把這個代碼的rollback去掉,假設commit必須成功。

package main

import (
     "database/sql"
     "fmt"
     "strconv"
     "time"

     _ "github.com/go-sql-driver/mysql"
     "github.com/pkg/errors"
)

func main() {
     var err error

     // db1的連接
     db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
     if err != nil {
          panic(err.Error())
     }
     defer db1.Close()

     // db2的連接
     db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
     if err != nil {
          panic(err.Error())
     }
     defer db2.Close()

     // 開始前顯示
     var score int
     db1.QueryRow("select score from user where id = 1").Scan(&score)
     fmt.Println("user1 score:", score)
     var money float64
     db2.QueryRow("select money from wallet where id = 1").Scan(&money)
     fmt.Println("wallet1 money:", money)

     // 生成xid
     xid := strconv.FormatInt(time.Now().Unix(), 10)
     fmt.Println("=== xid:" + xid + " ====")
     defer func() {
          if err := recover(); err != nil {
               fmt.Printf("%+v\n", err)
               fmt.Println("=== call rollback ====")
               // db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
               // db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
          }

          db1.QueryRow("select score from user where id = 1").Scan(&score)
          fmt.Println("user1 score:", score)
          db2.QueryRow("select money from wallet where id = 1").Scan(&money)
          fmt.Println("wallet1 money:", money)
     }()

     // XA 啟動
     fmt.Println("=== call start ====")
     if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // DML操作
     if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
          panic(errors.WithStack(err))
     }

     // XA end
     fmt.Println("=== call end ====")
     if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // prepare
     fmt.Println("=== call prepare ====")
     if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     // panic(errors.New("db2 prepare error"))
     if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // commit
     fmt.Println("=== call commit ====")
     if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     panic(errors.New("db2 commit error"))
     if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     db1.QueryRow("select score from user where id = 1").Scan(&score)
     fmt.Println("user1 score:", score)
     db2.QueryRow("select money from wallet where id = 1").Scan(&money)
     fmt.Println("wallet1 money:", money)
}

20200331165500

這個時候,我們停掉程序(停掉mysql的鏈接),使用xa recover可以發現,db2的xa事務還留在db2中了。
20200331165622

我們在控制台直接調用xa commit '1585644880' 還能繼續把這個xa事務進行提交。

20200331165742

這下money就進行了提交,又恢復了一致性。

所以呢,我琢磨了一下,我們寫xa的代碼應該如下:

package main

import (
     "database/sql"
     "fmt"
     "log"
     "strconv"
     "time"

     _ "github.com/go-sql-driver/mysql"
     "github.com/pkg/errors"
)

func main() {
     var err error

     // db1的連接
     db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
     if err != nil {
          panic(err.Error())
     }
     defer db1.Close()

     // db2的連接
     db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
     if err != nil {
          panic(err.Error())
     }
     defer db2.Close()

     // 開始前顯示
     var score int
     db1.QueryRow("select score from user where id = 1").Scan(&score)
     fmt.Println("user1 score:", score)
     var money float64
     db2.QueryRow("select money from wallet where id = 1").Scan(&money)
     fmt.Println("wallet1 money:", money)

     // 生成xid
     xid := strconv.FormatInt(time.Now().Unix(), 10)
     fmt.Println("=== xid:" + xid + " ====")
     defer func() {
          if err := recover(); err != nil {
               fmt.Printf("%+v\n", err)
               fmt.Println("=== call rollback ====")
               db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
               db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
          }

          db1.QueryRow("select score from user where id = 1").Scan(&score)
          fmt.Println("user1 score:", score)
          db2.QueryRow("select money from wallet where id = 1").Scan(&money)
          fmt.Println("wallet1 money:", money)
     }()

     // XA 啟動
     fmt.Println("=== call start ====")
     if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // DML操作
     if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
          panic(errors.WithStack(err))
     }

     // XA end
     fmt.Println("=== call end ====")
     if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // prepare
     fmt.Println("=== call prepare ====")
     if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }
     // panic(errors.New("db2 prepare error"))
     if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
          panic(errors.WithStack(err))
     }

     // commit
     fmt.Println("=== call commit ====")
     if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
          // TODO: 嘗試重新提交COMMIT
          // TODO: 如果還失敗,記錄xid,進入數據恢復邏輯,等待數據庫恢復重新提交
          log.Println("xid:" + xid)
     }
     // panic(errors.New("db2 commit error"))
     if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
          log.Println("xid:" + xid)
     }

     db1.QueryRow("select score from user where id = 1").Scan(&score)
     fmt.Println("user1 score:", score)
     db2.QueryRow("select money from wallet where id = 1").Scan(&money)
     fmt.Println("wallet1 money:", money)
}

就是第二階段的commit,我們必須設定它一定會“成功”,如果有不成功的情況,那么就需要記錄下不成功的xid,有一個數據恢復邏輯,重新commit這個xid。來保證最終一致性。

binlog

其實我們使用binlog也能看出一些端倪

# 這里的mysql-bin.0003替換成為你當前的log
SHOW BINLOG EVENTS in 'mysql-bin.000003';
## XA的binlog
| mysql-bin.000003 | 1967 | Anonymous_Gtid |         1 |        2032 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                               |
| mysql-bin.000003 | 2032 | Query          |         1 |        2138 | XA START X'31353835363338363233',X'',1                                             |
| mysql-bin.000003 | 2138 | Table_map      |         1 |        2190 | table_id: 108 (hade1.user)                                                         |
| mysql-bin.000003 | 2190 | Update_rows    |         1 |        2252 | table_id: 108 flags: STMT_END_F                                                    |
| mysql-bin.000003 | 2252 | Query          |         1 |        2356 | XA END X'31353835363338363233',X'',1                                               |
| mysql-bin.000003 | 2356 | XA_prepare     |         1 |        2402 | XA PREPARE X'31353835363338363233',X'',1                                           |
| mysql-bin.000003 | 2402 | Anonymous_Gtid |         1 |        2467 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                               |
| mysql-bin.000003 | 2467 | Query          |         1 |        2574 | XA COMMIT X'31353835363338363233',X'',1



## 非xa的事務
| mysql-bin.000003 | 2574 | Anonymous_Gtid |         1 |        2639 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                               |
| mysql-bin.000003 | 2639 | Query          |         1 |        2712 | BEGIN                                                                              |
| mysql-bin.000003 | 2712 | Table_map      |         1 |        2764 | table_id: 108 (hade1.user)                                                         |
| mysql-bin.000003 | 2764 | Update_rows    |         1 |        2826 | table_id: 108 flags: STMT_END_F                                                    |
| mysql-bin.000003 | 2826 | Xid            |         1 |        2857 | COMMIT /* xid=67 */

我們很明顯可以看到兩階段提交中是有兩個GTID的,生成一個GTID就代表內部生成一個事務,所以第一個階段prepare結束之后,第二個階段commit的時候就持久化了第一個階段的內容,並且生成了第二個事務。當commit失敗的時候,最多就是第二個事務丟失,第一個事務實際上已經保存起來了了(只是還沒commit)。

而非xa的事務,只有一個GTID,在commit之前任意一個階段出現問題,整個事務就全部丟失,無法找回了。所以這就是mysql xa命令的機制。

總結

看了一些資料,原來mysql從5.7之后才真正實現了兩階段的xa。當然這個兩階段方式在真實的工程中的使用其實很少的,xa的第一定律是避免使用xa。工程中會有很多方式來避免這種分庫的事務情況。

不過,不妨礙掌握了mysql的xa,在一些特定的場合,我們也能完美解決問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM