使用golang理解mysql的兩階段提交
文章源於一個問題:如果我們現在有兩個mysql實例,在我們要盡量簡單地完成分布式事務,怎么處理?
場景重現
比如我們現在有兩個數據庫,mysql3306和mysql3307。這里我們使用docker來創建這兩個實例:
# mysql3306創建命令
docker run -d -p 3306:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3306.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3306:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7
# msyql3306的配置:
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30
# mysql3307創建命令
docker run -d -p 3307:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3307.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3307:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7
# msyql3307的配置:
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
server-id = 2
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30
在mysql3306中
我們有一個user表
create table user (
id int,
name varchar(10),
score int
);
insert into user values(1, "foo", 10)
在mysql3307中,我們有一個wallet表。
create table wallet (
id int,
money float
);
insert into wallet values(1, 10.1)
我們可以看到,id為1的用戶初始分數(score)為10,而它的錢,在wallet中初始錢(money)為10.1。
現在假設我們有一個操作,需要對這個用戶進行操作:每次操作增加分數2,並且增加錢數1.2。
這個操作需要很強的一致性。
思考
兩階段提交
這里是一個分布式事務的概念,我們可以使用2PC的方法進行保證事務
2PC的概念如圖所示,引入一個資源協調者的概念,由這個資源協調者進行事務協調。
第一階段,由這個資源協調者對每個mysql實例調用prepare命令,讓所有的mysql實例准備好,如果其中由mysql實例沒有准備好,協調者就讓所有實例調用rollback命令進行回滾。如果所有mysql都prepare完成,那么就進入第二階段。
第二階段,資源協調者讓每個mysql實例都調用commit方法,進行提交。
mysql里面也提供了分布式事務的語句XA。
用單個實例的事務行不行
等等,這個兩階段提交和我們的事務感覺也差不多,都是進行一次開始,然后執行,最后commit,mysql為什么還要專門定義一個xa的命令呢?於是我陷入了思考...
思考不如實操,於是我用golang寫了一個使用mysql的事務實現的“兩階段提交”:
package main
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的連接
db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的連接
db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 開始前顯示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
tx1, err := db1.Begin()
if err != nil {
panic(errors.WithStack(err))
}
tx2, err := db2.Begin()
if err != nil {
panic(errors.WithStack(err))
}
defer func() {
if err := recover(); err != nil {
fmt.Printf("%+v\n", err)
fmt.Println("=== call rollback ====")
tx1.Rollback()
tx2.Rollback()
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// DML操作
if _, err = tx1.Exec("update user set score=score+2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = tx2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("commit before error"))
// commit
fmt.Println("=== call commit ====")
err = tx1.Commit()
if err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("commit db2 before error"))
err = tx2.Commit()
if err != nil {
panic(errors.WithStack(err))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
我這里已經非常小心地在defer中recover錯誤信息,並且執行了rollback命令。
如果我在commit命令之前的任意一個地方調用了panic(errors.New("commit before error"))
那么命令就會進入到了rollback這里,就會把兩個實例的事務都進行回滾。
通過結果我們可以看到,分數和錢數都沒有改變。這個是ok的。
但是如果我在db2的commit之前觸發了panic,那么這個命令進入到了rollback中,但是db1已經commit了,db2還沒有commit,這個時候會出現什么情況?
非常可惜,我們看到了這里的score增長了,但是money沒有增長,這個就說明無法做到事務一致性了。
回到mysql的xa
那么還要回歸到2PC,mysql為2PC的實現增加了xa命令,那么使用這個命令我們能不能避免這個問題呢?
同樣,我用golang寫了一個使用xa命令的代碼
package main
import (
"database/sql"
"fmt"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的連接
db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的連接
db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 開始前顯示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
// 生成xid
xid := strconv.FormatInt(time.Now().Unix(), 10)
fmt.Println("=== xid:" + xid + " ====")
defer func() {
if err := recover(); err != nil {
fmt.Printf("%+v\n", err)
fmt.Println("=== call rollback ====")
db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// XA 啟動
fmt.Println("=== call start ====")
if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// DML操作
if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// XA end
fmt.Println("=== call end ====")
if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// prepare
fmt.Println("=== call prepare ====")
if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 prepare error"))
if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// commit
fmt.Println("=== call commit ====")
if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 commit error"))
if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
首先看成功的情況:
一切完美。
如果我們在prepare階段拋出panic,那么結果如下:
證明在第一階段出現異常是可以回滾的。
但是如果我們在commit階段拋出panic:
我們發現,這里的分數增加了,但是money卻沒有增加。
那么這個xa和單個事務有什么區別呢?我又陷入了深深的沉思...
xa的用法不對
經過在技術群(全棧神盾局)請教,討論之后,發現這里對2pc的兩個階段理解還沒到位,這里之所以分為兩個階段,是強調的是每個階段都會持久化,就是第一個階段完成了之后,每個mysql實例就把第一個階段的請求實例化了,這個時候不管是mysql實例停止了還是其他問題,每次重啟的時候都會重新回復這個commit。
我們把這個代碼的rollback去掉,假設commit必須成功。
package main
import (
"database/sql"
"fmt"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的連接
db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的連接
db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 開始前顯示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
// 生成xid
xid := strconv.FormatInt(time.Now().Unix(), 10)
fmt.Println("=== xid:" + xid + " ====")
defer func() {
if err := recover(); err != nil {
fmt.Printf("%+v\n", err)
fmt.Println("=== call rollback ====")
// db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
// db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// XA 啟動
fmt.Println("=== call start ====")
if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// DML操作
if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// XA end
fmt.Println("=== call end ====")
if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// prepare
fmt.Println("=== call prepare ====")
if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 prepare error"))
if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// commit
fmt.Println("=== call commit ====")
if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
panic(errors.New("db2 commit error"))
if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
這個時候,我們停掉程序(停掉mysql的鏈接),使用xa recover
可以發現,db2的xa事務還留在db2中了。
我們在控制台直接調用xa commit '1585644880'
還能繼續把這個xa事務進行提交。
這下money就進行了提交,又恢復了一致性。
所以呢,我琢磨了一下,我們寫xa的代碼應該如下:
package main
import (
"database/sql"
"fmt"
"log"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
)
func main() {
var err error
// db1的連接
db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")
if err != nil {
panic(err.Error())
}
defer db1.Close()
// db2的連接
db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")
if err != nil {
panic(err.Error())
}
defer db2.Close()
// 開始前顯示
var score int
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
var money float64
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
// 生成xid
xid := strconv.FormatInt(time.Now().Unix(), 10)
fmt.Println("=== xid:" + xid + " ====")
defer func() {
if err := recover(); err != nil {
fmt.Printf("%+v\n", err)
fmt.Println("=== call rollback ====")
db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}()
// XA 啟動
fmt.Println("=== call start ====")
if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// DML操作
if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {
panic(errors.WithStack(err))
}
// XA end
fmt.Println("=== call end ====")
if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// prepare
fmt.Println("=== call prepare ====")
if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// panic(errors.New("db2 prepare error"))
if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {
panic(errors.WithStack(err))
}
// commit
fmt.Println("=== call commit ====")
if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
// TODO: 嘗試重新提交COMMIT
// TODO: 如果還失敗,記錄xid,進入數據恢復邏輯,等待數據庫恢復重新提交
log.Println("xid:" + xid)
}
// panic(errors.New("db2 commit error"))
if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {
log.Println("xid:" + xid)
}
db1.QueryRow("select score from user where id = 1").Scan(&score)
fmt.Println("user1 score:", score)
db2.QueryRow("select money from wallet where id = 1").Scan(&money)
fmt.Println("wallet1 money:", money)
}
就是第二階段的commit,我們必須設定它一定會“成功”,如果有不成功的情況,那么就需要記錄下不成功的xid,有一個數據恢復邏輯,重新commit這個xid。來保證最終一致性。
binlog
其實我們使用binlog也能看出一些端倪
# 這里的mysql-bin.0003替換成為你當前的log
SHOW BINLOG EVENTS in 'mysql-bin.000003';
## XA的binlog
| mysql-bin.000003 | 1967 | Anonymous_Gtid | 1 | 2032 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| mysql-bin.000003 | 2032 | Query | 1 | 2138 | XA START X'31353835363338363233',X'',1 |
| mysql-bin.000003 | 2138 | Table_map | 1 | 2190 | table_id: 108 (hade1.user) |
| mysql-bin.000003 | 2190 | Update_rows | 1 | 2252 | table_id: 108 flags: STMT_END_F |
| mysql-bin.000003 | 2252 | Query | 1 | 2356 | XA END X'31353835363338363233',X'',1 |
| mysql-bin.000003 | 2356 | XA_prepare | 1 | 2402 | XA PREPARE X'31353835363338363233',X'',1 |
| mysql-bin.000003 | 2402 | Anonymous_Gtid | 1 | 2467 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| mysql-bin.000003 | 2467 | Query | 1 | 2574 | XA COMMIT X'31353835363338363233',X'',1
## 非xa的事務
| mysql-bin.000003 | 2574 | Anonymous_Gtid | 1 | 2639 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| mysql-bin.000003 | 2639 | Query | 1 | 2712 | BEGIN |
| mysql-bin.000003 | 2712 | Table_map | 1 | 2764 | table_id: 108 (hade1.user) |
| mysql-bin.000003 | 2764 | Update_rows | 1 | 2826 | table_id: 108 flags: STMT_END_F |
| mysql-bin.000003 | 2826 | Xid | 1 | 2857 | COMMIT /* xid=67 */
我們很明顯可以看到兩階段提交中是有兩個GTID的,生成一個GTID就代表內部生成一個事務,所以第一個階段prepare結束之后,第二個階段commit的時候就持久化了第一個階段的內容,並且生成了第二個事務。當commit失敗的時候,最多就是第二個事務丟失,第一個事務實際上已經保存起來了了(只是還沒commit)。
而非xa的事務,只有一個GTID,在commit之前任意一個階段出現問題,整個事務就全部丟失,無法找回了。所以這就是mysql xa命令的機制。
總結
看了一些資料,原來mysql從5.7之后才真正實現了兩階段的xa。當然這個兩階段方式在真實的工程中的使用其實很少的,xa的第一定律是避免使用xa。工程中會有很多方式來避免這種分庫的事務情況。
不過,不妨礙掌握了mysql的xa,在一些特定的場合,我們也能完美解決問題。