什么是事務?
事務是數據庫非常重要的部分,它具有四大特性(原子性、一致性、隔離性、持久性)
以下內容出自《高性能MySQL》第三版,了解事務的ACID及四種隔離級有助於我們更好的理解事務運作。
下面舉一個銀行應用是解釋事務必要性的一個經典例子。假如一個銀行的數據庫有兩張表:支票表(checking)和儲蓄表(savings)。現在要從用戶Jane的支票賬戶轉移200美元到她的儲蓄賬戶,那么至少需要三個步驟:
- 檢查支票賬戶的余額高於或者等於200美元。
- 從支票賬戶余額中減去200美元。
- 在儲蓄帳戶余額中增加200美元。
上述三個步驟的操作必須打包在一個事務中,任何一個步驟失敗,則必須回滾所有的步驟。
一個很好的事務處理系統,必須具備這些標准特性:
- 原子性(atomicity)
一個事務必須被視為一個不可分割的最小工作單元,整個事務中的所有操作要么全部提交成功,要么全部失敗回滾,對於一個事務來說,不可能只執行其中的一部分操作,這就是事務的原子性
- 一致性(consistency)
數據庫總是從一個一致性的狀態轉換到另一個一致性的狀態。(在前面的例子中,一致性確保了,即使在執行第三、四條語句之間時系統崩潰,支票賬戶中也不會損失200美元,因為事務最終沒有提交,所以事務中所做的修改也不會保存到數據庫中。)
- 隔離性(isolation)
通常來說,一個事務所做的修改在最終提交以前,對其他事務是不可見的。(在前面的例子中,當執行完第三條語句、第四條語句還未開始時,此時有另外的一個賬戶匯總程序開始運行,則其看到支票帳戶的余額並沒有被減去200美元。)
- 持久性(durability)
一旦事務提交,則其所做的修改會永久保存到數據庫。(此時即使系統崩潰,修改的數據也不會丟失。)
Tx 對象
database/sql 提供了事務處理的功能,通過 Tx 類型實現
之前在增刪改查操作中使用的都是 sqlx.DB 類型對象,而事務則使用的是 Tx 類型對象,使用 DB.Begin() 方法可以創建 Tx 類型對象,Tx 類型對象也可以調用 Exec() 和 Query() 等方法執行數據庫操作,用法和之前操作一樣,但是需要在操作完畢后執行 Tx 的 Commit() 或 Rollback() 方法完成數據庫事務的提交或回滾,同時釋放連接
一旦調用 Begin() 方法,Tx 類型對象就會從連接池中獲取一個空閑的連接,接下來的 sql 執行都基於這個連接,直到 commit 或 rollback 后才釋放到連接池
Tx 類型對象擁有的方法如下:
在事務處理的時候,不能使用 DB 類型對象的方法去執行 sql 語句,當然如果使用也能執行成功,但是這和你在事務里執行的操作不屬於一個事務(隔離性),將不會接受 commit 或 rollback 的改變,如下面的操作時:
demo:下面這個偽代碼中,調用Db.Exec() 方法的時候,和 Tx 執行 Exec() 方法是不同的,只有 Tx 的操作會綁定到事務中,Db 則是額外的一個連接,兩者不屬於同一個事務
Tx,err := Db.Begin() Db.Exec() Tx.Exec() Tx.Commit()
事務並發
對於 sqlx.DB 類型對象,調用了 Query() 方法之后,在 Next() 方法中獲取結果的時候,rows 是維護了一個連接,再次調用 QueryRow() 方法的時候,DB 類型對象會再從連接池取出一個新的連接給到查詢結果 row,row 和 rows 的連接,兩者可以共存,並且互相不影響
for rows.Next() { //定義變量接收查詢數據 var uid int var create_time, username, password, department, email string err := rows.Scan(&uid, &create_time, &username, &password, &department, &email) if err != nil { fmt.Println("get data failed, error:[%v]", err.Error()) } fmt.Println(uid, create_time, username, password, department, email) row := Db.QueryRow("select * from userinfo where uid = 1") err = row.Scan(&uid, &create_time, &username, &password, &department, &email) if err != nil { fmt.Printf("scan failed, error:[%v]", err.Error()) return } fmt.Println(uid, create_time, username, password, department, email) } //關閉結果集(釋放連接) rows.Close() 運行結果: 1 2019-07-06 11:45:20 anson 123456 技術部 123456@163.com 1 2019-07-06 11:45:20 anson 123456 技術部 123456@163.com 3 2019-07-06 11:45:20 johny 123456 技術部 123456@163.com 1 2019-07-06 11:45:20 anson 123456 技術部 123456@163.com 4 2019-07-06 11:45:20 johny 123456 技術部 123456@163.com 1 2019-07-06 11:45:20 anson 123456 技術部 123456@163.com
對於 sql.Tx 類型對象,因為事務過程只有一個連接,事務內的操作都是順序執行的,在開始下一個數據庫交互之前,必須先完成上一個數據庫交互
demo:Tx 執行了 Query() 方法后,rows 維護了數據庫連接,然后 Tx 嘗試調用 QueryRow 將嘗試獲取該連接進行數據庫操作,因為還沒有調用 rows.Close() 方法(事務還沒有結束),因此連接屬於 busy 狀態,Tx 類型對象是無法再從連接池獲取連接的
rows, err := Tx.Query("select * from userinfo") if err != nil { fmt.Printf("query faied, error:[%v]", err.Error()) return } for rows.Next() { //定義變量接收查詢數據 var uid int var create_time, username, password, department, email string err := rows.Scan(&uid, &create_time, &username, &password, &department, &email) if err != nil { fmt.Println("get data failed, error:[%v]", err.Error()) } fmt.Println(uid, create_time, username, password, department, email) row := Tx.QueryRow("select * from userinfo where uid = 1") //在這里獲取數據庫連接報錯 err = row.Scan(&uid, &create_time, &username, &password, &department, &email) if err != nil { fmt.Printf("scan failed, error:[%v]", err.Error()) return } fmt.Println(uid, create_time, username, password, department, email) } //關閉結果集(釋放連接) rows.Close() 運行結果: 1 2019-07-06 11:45:20 anson 123456 技術部 123456@163.com scan failed, error:[driver: bad connection][mysql] 2019/07/09 22:52:07 packets.go:446: busy buffer
實例(加強理解)
通過下面完整的例子能夠更好的理解
demo:
- 定義了一個 clearTransaction(Tx) 函數,該函數會執行 rollback 操作,因為在事物處理的過程中,任何一個錯誤都會導致 main 函數退出,因此在 main 函數退出時執行 defer 的 rollback 操作,回滾事務和釋放連接
- 如果不添加 defer 來回滾事務和釋放連接,只在最后 commit 或 rollback,那么當 doSomething 發生異常的時候,函數就退出了,此時還沒有執行到 commit 操作,這樣就導致該事務的連接沒有關閉,事務也沒有回滾
- Tx 事務環境中,只有一個數據庫連接,事務內的 Exec() 方法都是依次執行的,事務中也可以使用 DB 進行查詢,但是 DB 查詢的過程會新建連接,不屬於 Tx 這個事務
package main
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
var (
userName string = "chenkai"
password string = "chenkai"
ipAddrees string = "192.168.0.115"
port int = 3306
dbName string = "test"
charset string = "utf8"
)
func connectMysql() (*sqlx.DB) {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=%s", userName, password, ipAddrees, port, dbName, charset)
Db, err := sqlx.Open("mysql", dsn)
if err != nil {
fmt.Printf("mysql connect failed, detail is [%v]", err.Error())
}
return Db
}
func updateData(Db *sqlx.DB) {
Tx, err := Db.Begin()
if err != nil {
fmt.Printf("open the transaction failed, error:[%v]", err.Error())
return
}
//回滾處理
defer clearTransaction(Tx)
result, err := Tx.Exec("update userinfo set username = 'honey' where uid = 1")
if err != nil {
fmt.Printf("update failed, error:[%v]", err.Error())
return
}
rowAffected, _ := result.RowsAffected()
fmt.Printf("affected rows:[%v]\n", rowAffected)
result, err = Tx.Exec("update userinfo set username = 'honey' where uid = 3")
if err != nil {
fmt.Printf("update failed, error:[%v]", err.Error())
return
}
rowAffected, _ = result.RowsAffected()
fmt.Printf("affected rows:[%v]\n", rowAffected)
//主動 panic
doSomething()
//提交事務
err = Tx.Commit()
if err != nil {
// tx.Rollback() 此時處理錯誤,會忽略 doSomthing 的異常
fmt.Printf("commit failed, error:[%v]", err.Error())
}
}
func clearTransaction(Tx *sql.Tx) {
//嘗試進行 rollback,若 Tx 已經關閉,則不作處理
err := Tx.Rollback()
if err != sql.ErrTxDone && err != nil {
fmt.Printf("tx rollback failed, error:[%v]", err.Error())
}
}
func doSomething(){
panic("a panic running error")
}
func main() {
var Db *sqlx.DB = connectMysql()
defer Db.Close()
updateData(Db)
}
運行結果:
affected rows:[1]
affected rows:[1]
panic: a panic running error
ending ~