我們用一個系列來講解從需求到上線、從代碼到k8s部署、從日志到監控等各個方面的微服務完整實踐。
整個項目使用了go-zero開發的微服務,基本包含了go-zero以及相關go-zero作者開發的一些中間件,所用到的技術棧基本是go-zero項目組的自研組件,基本是go-zero全家桶了。
實戰項目地址:https://github.com/Mikaelemmmm/go-zero-looklook
關於分布式事務
因為本項目服務划分相對獨立一些,所以目前沒有使用到分布式事務,不過go-zero結合dtm使用分布式事務的最佳實踐,我有整理demo,這里就介紹一下go-zero結合dtm的使用,項目地址go-zero結合dtm最佳實踐倉庫地址 : https://github.com/Mikaelemmmm/gozerodtm
【注】下面說的不是go-zero-looklook項目,是這個項目 https://github.com/Mikaelemmmm/gozerodtm
一、注意事項
-
go-zero 1.2.4版本以上,這個一定要注意
-
dtm 你用最新的就行了
二、clone dtm
git clone https://github.com/yedf/dtm.git
三、配置文件
1、找到項目跟文件夾下的conf.sample.yml
2、cp conf.sample.yml conf.yml
3、使用etcd , 把配置中下面這段注釋打開 (如果沒用etcd就更簡單了 ,這個都省了,直接鏈接到dtm server地址就可以了)
MicroService:
Driver: 'dtm-driver-gozero' # name of the driver to handle register/discover
Target: 'etcd://localhost:2379/dtmservice' # register dtm server to this url
EndPoint: 'localhost:36790'
解釋一下:
MicroService 這個不要動,這個代表要對把dtm注冊到那個微服務服務集群里面去,使微服務集群內部服務可以通過grpc直接跟dtm交互
Driver :'dtm-driver-gozero' , 使用go-zero的注冊服務發現驅動,支持go-zero
Target: 'etcd://localhost:2379/dtmservice' 將當前dtm的server直接注冊到微服務所在的etcd集群中,如果go-zero作為微服務使用的話,就可以直接通過etcd拿到dtm的server grpc鏈接,直接就可以跟dtm server交互了
EndPoint: 'localhost:36790' , 代表的是dtm的server的連接地址+端口 , 集群中的微服務可以直接通過etcd獲得此地址跟dtm交互了,
如果你自己去改了dtm源碼grpc端口,記得這里要改下端口
四、啟動dtm server
在dtm項目根目錄下
go run app/main.go dev
五、使用go-zero的grpc對接dtm
這是一個快速下單扣商品庫存的例子
1、order-api
order-api是http服務入口創建訂單
service order {
@doc "創建訂單"
@handler create
post /order/quickCreate (QuickCreateReq) returns (QuickCreateResp)
}
接下來看logic
func (l *CreateLogic) Create(req types.QuickCreateReq,r *http.Request) (*types.QuickCreateResp, error) {
orderRpcBusiServer, err := l.svcCtx.Config.OrderRpcConf.BuildTarget()
if err != nil{
return nil,fmt.Errorf("下單異常超時")
}
stockRpcBusiServer, err := l.svcCtx.Config.StockRpcConf.BuildTarget()
if err != nil{
return nil,fmt.Errorf("下單異常超時")
}
createOrderReq:= &order.CreateReq{UserId: req.UserId,GoodsId: req.GoodsId,Num: req.Num}
deductReq:= &stock.DecuctReq{GoodsId: req.GoodsId,Num: req.Num}
// 這里只舉了saga例子,tcc等其他例子基本沒啥區別具體可以看dtm官網
gid := dtmgrpc.MustGenGid(dtmServer)
saga := dtmgrpc.NewSagaGrpc(dtmServer, gid).
Add(orderRpcBusiServer+"/pb.order/create", orderRpcBusiServer+"/pb.order/createRollback", createOrderReq).
Add(stockRpcBusiServer+"/pb.stock/deduct", stockRpcBusiServer+"/pb.stock/deductRollback", deductReq)
err = saga.Submit()
dtmimp.FatalIfError(err)
if err != nil{
return nil,fmt.Errorf("submit data to dtm-server err : %+v \n",err)
}
return &types.QuickCreateResp{}, nil
}
進入到下單邏輯時,分別獲取order訂單、stock庫存服務的rpc在etcd中的地址,使用BuildTarget()這個方法
然后創建order、stock對應的請求參數
請求dtm獲取全局事務id , 基於此全局事務id開啟grpc的saga分布式事務 ,將創建訂單、扣減庫存請求放入事務中,這里使用grpc形式請求,每個業務要有一個正向請求、一個回滾請求、以及請求參數,當執行其中任何一個業務正向請求出錯了會自動調用事務中所有業務回滾請求達到回滾效果。
2、order-srv
order-srv是訂單的rpc服務,與dtm-gozero-order數據庫中order表交互
// service
service order {
rpc create(CreateReq)returns(CreateResp);
rpc createRollback(CreateReq)returns(CreateResp);
}
2.1 Create
當order-api提交事務默認請求的是create方法,我們看看logic
func (l *CreateLogic) Create(in *pb.CreateReq) (*pb.CreateResp, error) {
fmt.Printf("創建訂單 in : %+v \n", in)
// barrier防止空補償、空懸掛等具體看dtm官網即可,別忘記加barrier表在當前庫中,因為判斷補償與要執行的sql一起本地事務
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()
if err != nil {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
order := new(model.Order)
order.GoodsId = in.GoodsId
order.Num = in.Num
order.UserId = in.UserId
_, err = l.svcCtx.OrderModel.Insert(tx, order)
if err != nil {
return fmt.Errorf("創建訂單失敗 err : %v , order:%+v \n", err, order)
}
return nil
}); err != nil {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
return &pb.CreateResp{}, nil
}
可以看到,一進入方法內部我們就使用了dtm的子事務屏障技術,至於為什么使用子事務屏障是因為可能會出現重復請求或者空請求造成的臟數據等,在這里dtm自動給我們做了冪等處理不需要我們自己在額外做了,同時保證他內部的冪等處理跟我們自己執行的事務要在一個事務中,所以要使用一個會話的db鏈接,這時候我們就要先獲取
db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()
然后基於此db連接dtm在內部通過sql執行做冪等處理,同時我們基於此db連接開啟事務,這樣就能保證dtm內部的子事務屏障在執行sql操作與我們自己業務執行的sql操作在一個事務中。
dtm在使用grpc調用我們業務的時候,我們的grpc服務在返回給dtm server錯誤時候,dtm會根據我們返回給它的grpc錯誤碼判斷是要執行回滾操作還是一直重試:
- codes.Internal : dtm server不會調用回滾,會一直重試,每次重試dtm的數據庫中都會加一次重試次數,自己可以監控這個重試次數報警,人工處理
- codes.Aborted : dtm server會調用所有回滾請求,執行回滾操作
如果dtm在調用grpc返回錯誤是nil時候,就認為調用成功了
2.2 CreateRollback
當我們調用訂單的創建訂單或者庫存扣減時候返回給dtm server 的codes.Aborted時候,dtm server會調用所有回滾操作,CreateRollback就是對應訂單下單的回滾操作,代碼如下
func (l *CreateRollbackLogic) CreateRollback(in *pb.CreateReq) (*pb.CreateResp, error) {
fmt.Printf("訂單回滾 , in: %+v \n", in)
order, err := l.svcCtx.OrderModel.FindLastOneByUserIdGoodsId(in.UserId, in.GoodsId)
if err != nil && err != model.ErrNotFound {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
if order != nil {
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
db, err := l.svcCtx.OrderModel.SqlDB()
if err != nil {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
order.RowState = -1
if err := l.svcCtx.OrderModel.Update(tx, order); err != nil {
return fmt.Errorf("回滾訂單失敗 err : %v , userId:%d , goodsId:%d", err, in.UserId, in.GoodsId)
}
return nil
}); err != nil {
logx.Errorf("err : %v \n", err)
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
}
return &pb.CreateResp{}, nil
}
其實就是如果之前下單成功了,將之前下成功的單給取消掉就是對應下單的回滾操作
3、stock-srv
3.1 Deduct
扣減庫存,這里跟order的Create一樣了,是下單事務內的正向操作,扣減庫存,代碼如下
func (l *DeductLogic) Deduct(in *pb.DecuctReq) (*pb.DeductResp, error) {
fmt.Printf("扣庫存start....")
stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)
if err != nil && err != model.ErrNotFound {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
if stock == nil || stock.Num < in.Num {
// 【回滾】庫存不足確定需要dtm直接回滾,直接返回 codes.Aborted, dtmcli.ResultFailure 才可以回滾
return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
}
// barrier防止空補償、空懸掛等具體看dtm官網即可,別忘記加barrier表在當前庫中,因為判斷補償與要執行的sql一起本地事務
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
db, err := l.svcCtx.StockModel.SqlDB()
if err != nil {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
sqlResult,err := l.svcCtx.StockModel.DecuctStock(tx, in.GoodsId, in.Num)
if err != nil{
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return status.Error(codes.Internal, err.Error())
}
affected, err := sqlResult.RowsAffected()
if err != nil{
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return status.Error(codes.Internal, err.Error())
}
// 如果是影響行數為0,直接就告訴dtm失敗不需要重試了
if affected <= 0 {
return status.Error(codes.Aborted, dtmcli.ResultFailure)
}
// !!開啟測試!! 測試訂單回滾更改狀態為失效,並且當前庫扣失敗不需要回滾
//return fmt.Errorf("扣庫存失敗 err : %v , in:%+v \n",err,in)
return nil
}); err != nil {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil,err
}
return &pb.DeductResp{}, nil
}
這里值得注意的是當只有庫存不足、或者在扣庫存影響行數為0(未成功)才需要告訴dtm server要回滾,其他情況下其實都是網絡抖動、硬件異常導致,應該讓dtm server一直重試,當然自己要加個最大重試次數的監控報警,如果達到最大次數還未成功能實現自動發短信、打電話人工介入了。
3.2 DeductRollback
這里是對應扣庫存的回滾操作
func (l *DeductRollbackLogic) DeductRollback(in *pb.DecuctReq) (*pb.DeductResp, error) {
fmt.Printf("庫存回滾 in : %+v \n", in)
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
db, err := l.svcCtx.StockModel.SqlDB()
if err != nil {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
if err := l.svcCtx.StockModel.AddStock(tx, in.GoodsId, in.Num); err != nil {
return fmt.Errorf("回滾庫存失敗 err : %v ,goodsId:%d , num :%d", err, in.GoodsId, in.Num)
}
return nil
}); err != nil {
logx.Errorf("err : %v \n", err)
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
return &pb.DeductResp{}, nil
}
六、子事務屏障
這個詞是dtm作者定義的,其實子事務屏障代碼不多,看barrier.CallWithDB這個方法即可。
// CallWithDB the same as Call, but with *sql.DB
func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error {
tx, err := db.Begin()
if err != nil {
return err
}
return bb.Call(tx, busiCall)
}
由於這個方法他在內部開啟本地事務,它內部是在此事務執行了sql操作,所以在我們執行自己的業務時候必須跟它用同一個事務,那就要基於同一個db連接開事務了,so~ 你知道為什么我們要提前獲取db連接了吧,目的就是要讓它內部執行的sql操作跟我們的sql操作在一個事務下。至於它內部為什么執行自己的sql操作,接下來我們分析。
我們看bb.Call這個方法
// Call 子事務屏障,詳細介紹見 https://zhuanlan.zhihu.com/p/388444465
// tx: 本地數據庫的事務對象,允許子事務屏障進行事務操作
// busiCall: 業務函數,僅在必要時被調用
func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) {
bb.BarrierID = bb.BarrierID + 1
bid := fmt.Sprintf("%02d", bb.BarrierID)
defer func() {
// Logf("barrier call error is %v", rerr)
if x := recover(); x != nil {
tx.Rollback()
panic(x)
} else if rerr != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()
ti := bb
originType := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
}[ti.Op]
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 這個是空補償
currentAffected == 0 { // 這個是重復請求或者懸掛
return
}
rerr = busiCall(tx)
return
}
核心其實就是如下幾行代碼
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 這個是空補償
currentAffected == 0 { // 這個是重復請求或者懸掛
return
}
rerr = busiCall(tx)
func insertBarrier(tx DB, transType string, gid string, branchID string, op string, barrierID string, reason string) (int64, error) {
if op == "" {
return 0, nil
}
sql := dtmimp.GetDBSpecial().GetInsertIgnoreTemplate("dtm_barrier.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)", "uniq_barrier")
return dtmimp.DBExec(tx, sql, transType, gid, branchID, op, barrierID, reason)
}
每一個業務邏輯,dtm server在正常成功請求時候, ti.Op 默認正常執行的操作是action,所以正常第一次請求都是ti.Op值都是action,那originType就是“”
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
那么上面這個sql就不會執行因為ti.Op == "" 在insertBarrier中直接return了
currentAffected, rerr := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
那第二個sql的ti.Op是 action, 所以子事務屏障表barrier就會插入一條數據
同理在執行庫存也會插入一條
1、整個事務都成功的子事務屏障
那在一次下訂單正常成功請求下,由於 ti.Op都是action,所以originType都是"" , 所以不管是下單的barrier 還是扣庫存的barrier,在執行他們2次barrier insert時候,originAffected都會忽略,因為originType==“” 會直接被return不插入數據,這樣看來 不管是下單還是扣庫存,barrier的第二條插入數據生效,所以barrier數據表中就會有2條下單數據,一條是訂單的一條是扣庫存的
gid : dtm全局事務id
branch_id : 每個全局事務id下的每個業務id
op : 操作,如果是正常成功請求就是action
barrier_id : 同一個業務下開多個會遞增
這4個字段在表中是聯合唯一索引,在insertBarrier時候,dtm判斷如果存在就忽略不插入
2、如果訂單成功庫存不足回滾子事務屏障
我們庫存只有10個 ,我們下單20個
1)當訂單下成功,因為訂單下單時候並不知道后續庫存情況(即使在下單時候先去查庫存那也會有查詢時候足夠,扣除時候不足情況),
所以下單成功barrier表中按照之前梳理的邏輯就會在barrier表中產生一條正確數據執行數據
2)接着執行扣庫存操作
func (l *DeductLogic) Deduct(in *pb.DecuctReq) (*pb.DeductResp, error) {
fmt.Printf("扣庫存start....")
stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)
if err != nil && err != model.ErrNotFound {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
if stock == nil || stock.Num < in.Num {
//【回滾】庫存不足確定需要dtm直接回滾,直接返回 codes.Aborted, dtmcli.ResultFailure 才可以回滾
return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
}
.......
}
在執行扣庫存業務邏輯之前,由於我們查詢庫存發現庫存不足,所以直接return codes.Aborted 了,不會走到子事務屏障barrier這里,所以barrier表中不會插入數據,而是告訴dtm要回滾
3)調用order回滾操作
訂單回滾的時候會開啟barrier,這時候又會執行barrier代碼(如下),由於回滾代碼的ti.Op是compensate ,orginType就是action
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 這個是空補償
currentAffected == 0 { // 這個是重復請求或者懸掛
return
}
rerr = busiCall(tx)
由於我們之前下訂單成功了,barrier表里有一條下單成功時候的記錄action,所以originAffected==0 ,所以只會插入一條當前回滾記錄繼續調用 busiCall(tx) 執行后續我們自己寫的回滾操作
到此,我們應該只有兩條數據,一條訂單成功創建記錄,一條訂單回滾記錄
4)庫存回滾DeductRollback
訂單回滾成功后,會再繼續調用庫存回滾DeductRollback,庫存回滾代碼如下
這就是子事務屏障自動幫我們判斷的,也就是那兩條核心插入語句幫我們判斷的,以至於我們業務不會出現臟數據
庫存這里回滾分兩種情況
-
沒扣成功回滾
-
扣成功回滾
沒扣成功回滾(我們當前舉例場景是這個 )
首先調用庫存回滾時候ti.Op是compensate ,orginType就是action ,會執行下面2條insert
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 這個是空補償
currentAffected == 0 { // 這個是重復請求或者懸掛
return}rerr = busiCall(tx)
}
這里結合判斷如果是回滾、取消操作,originAffected > 0 當前插入成功了,之前對應的正向扣庫存操作沒有插入成功,說明之前庫存沒扣成功,直接return就不需要執行后續補償了。所以此時會在barrier表中插入2條數據直接return,就不會執行我們后續補償操作了
到此我們barrier表中有4條數據了
扣成功回滾(這個情況自己可以嘗試模擬此場景)
如果我們上一步扣庫存成功,在執行此補償的時候ti.Op是compensate ,orginType就是action ,繼續執行2個insert語句
originAffected, _ := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, originType, bid, ti.Op)
currentAffected, err := insertBarrier(tx, ti.TransType, ti.Gid, ti.BranchID, ti.Op, bid, ti.Op)
dtmimp.Logf("originAffected: %d currentAffected: %d", originAffected, currentAffected)
if (ti.Op == BranchCancel || ti.Op == BranchCompensate) && originAffected > 0 || // 這個是空補償
currentAffected == 0 { // 這個是重復請求或者懸掛
return}rerr = busiCall(tx)
}
這里結合判斷如果是回滾、取消操作,originAffected == 0 當前插入忽略了沒插入進去,說明之前正向扣庫存插入成功了,這里只插入第二個sql語句記錄即可,然后在執行后續我們補償的業務操作。
所以,整體分析下來核心語句就是2條insert,它幫我們解決了重復回滾數據、數據冪等情況,只能說dtm作者想法真的很好,用了最少的代碼幫我們解決了一個很麻煩的問題
七、go-zero對接中注意事項
1、dtm的回滾補償
在使用dtm的grpc時候,當我們使用saga、tcc等如果第一步嘗試或者執行失敗了,是希望它能執行后面的rollback的,在grpc中的服務如果發生錯誤了,必須返回 : status.Error(codes.Aborted, dtmcli.ResultFailure) , 返回其他錯誤,不會執行你的rollback操作,dtm會一直重試,如下:
stock, err := l.svcCtx.StockModel.FindOneByGoodsId(in.GoodsId)
if err != nil && err != model.ErrNotFound {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
if stock == nil || stock.Num < in.Num {
//【回滾】庫存不足確定需要dtm直接回滾,直接返回 codes.Aborted, dtmcli.ResultFailure 才可以回滾
return nil, status.Error(codes.Aborted, dtmcli.ResultFailure)
}
2、barrier的空補償、懸掛等
之前准備工作中,我們創建了dtm_barrier庫以及執行了barrier.mysql.sql,這個其實就是為我們的業務服務做了一個檢查,防止空補償,具體可以看barrier.Call中源碼,沒幾行代碼可以看懂的。
如果我們線上使用的話,你的每個與db交互的服務只要用到了barrier,這個服務使用到的mysql賬號,要給他分配barrier庫的權限,這個不要忘記了
3、barrier在rpc中本地事務
在rpc的業務中,如果使用了barrier的話,那么在model中與db交互時候必須要用事務,並且一定要跟barrier用同一個事務
logic
barrier, err := dtmgrpc.BarrierFromGrpc(l.ctx)
db, err := sqlx.NewMysql(l.svcCtx.Config.DB.DataSource).RawDB()
if err != nil {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, status.Error(codes.Internal, err.Error())
}
if err := barrier.CallWithDB(db, func(tx *sql.Tx) error {
sqlResult,err := l.svcCtx.StockModel.DecuctStock(tx, in.GoodsId, in.Num)
if err != nil{
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return status.Error(codes.Internal, err.Error())
}
affected, err := sqlResult.RowsAffected()
if err != nil{
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return status.Error(codes.Internal, err.Error())
}
// 如果是影響行數為0,直接就告訴dtm失敗不需要重試了
if affected <= 0 {
return status.Error(codes.Aborted, dtmcli.ResultFailure)
}
// !!開啟測試!! : 測試訂單回滾更改狀態為失效,並且當前庫扣失敗不需要回滾
// return fmt.Errorf("扣庫存失敗 err : %v , in:%+v \n",err,in)
return nil
}); err != nil {
// !!!一般數據庫不會錯誤不需要dtm回滾,就讓他一直重試,這時候就不要返回codes.Aborted, dtmcli.ResultFailure 就可以了,具體自己把控!!!
return nil, err
}
model
func (m *defaultStockModel) DecuctStock(tx *sql.Tx,goodsId , num int64) (sql.Result,error) {
query := fmt.Sprintf("update %s set `num` = `num` - ? where `goods_id` = ? and num >= ?", m.table)
return tx.Exec(query,num, goodsId,num)
}
func (m *defaultStockModel) AddStock(tx *sql.Tx,goodsId , num int64) error {
query := fmt.Sprintf("update %s set `num` = `num` + ? where `goods_id` = ?", m.table)
_, err :=tx.Exec(query, num, goodsId)
return err
}
七、使用go-zero的http對接
這個基本沒啥難度,grpc會了這個很簡單,鑒於go在微服務中去使用http場景不多,這里就不詳細去做了,我之前一個版本寫過一個簡單的,但是沒這個完善,有興趣可以去看下,不過那個barrier是自己基於go-zero的sqlx,將dtm的官方的做了修改,現在不需要了。
項目地址:https://github.com/Mikaelemmmm/dtmbarrier-go-zero
項目地址
https://github.com/zeromicro/go-zero
歡迎使用 go-zero
並 star 支持我們!
微信交流群
關注『微服務實踐』公眾號並點擊 交流群 獲取社區群二維碼。