並發下庫存扣減和鎖


先說場景:

物品W現在庫存剩余1個,用戶P1、P2同時購買,只有1人能購買成功,不允許超賣

秒殺也是類似的情況,只有1件商品,N個用戶同時搶購,只有1人能搶到

這里不談秒殺設計,不談使用隊列等使請求串行化,就談下怎么用鎖來保證數據一致性

 

常見的實現方案有以下幾種:

1.代碼同步, 例如使用 synchronized, lock 等同步方法

2.不查詢,直接更新 update table set surplus = (surplus - buyQuantity) where id = xx and (surplus - buyQuantity) > 0

3.使用CAS, update table set  surplus = aa where id = xx and version = y

4.使用數據庫鎖, select xx for update

5.使用分布式鎖(zookeeper, redis等)

 

下面就針對這幾種方案來分析下

1.代碼同步,例如使用 synchronized,lock 等同步方法

偽代碼如下:

public synchronized void buy(String productName, Integer buyQuantity) {
    // 其他校驗...
    // 校驗剩余數量
    Product product  = 從數據庫查詢出記錄;
    if (product.getSurplus < buyQuantity) {
        return "庫存不足";
    }

    // set新的剩余數量
    product.setSurplus(product.getSurplus() - quantity);
    // 更新數據庫
    update(product);
    // 記錄日志...
    // 其他業務...
}

 

先說下這個方案的前提配置:

1.使用spring 聲明式事務管理

2.事務傳播機制使用默認的(PROPAGATION_REQUIRED)

3.項目分層為controller-service-dao 3層,事務管理在service層

 

這個方案不可行,主要是因為以下幾點:

1.synchronized的作用范圍是單個jvm實例,如果做了集群,分布式就沒用了

2.synchronized是作用在對象實例上的,如果不是單例,則多個實例間不會同步(這個一般用spring管理bean,默認就是單例)

3.單個jvm時,synchronized也不能保證多個數據庫事務的隔離性。這與代碼中的事務傳播級別,數據庫的事務隔離級別,加鎖時機等相關

 

3.1.先說下隔離級別,常用的是 Read Committed 和 Repeatable Read,另外2種不常用就不說了

3.1.1.RR(Repeatable Read)級別,mysql默認的是RR,事務開啟后,不會讀取到其他事務提交的數據

根據前面的前提,我們知道在調用buy方法時會開啟事務

假設現在有線程T1,T2同時執行buy方法,假設T1先執行,T2等待

spring的事務開啟和提交等是通過aop(代理)實現的,所以執行buy方法前,就會開啟事務

這時候T1,T2是兩個事務,當T1執行完后,T2執行,讀取不到T1提交的數據,所以會出問題

 

3.1.2.RC(Read Committed)級別,事務開啟后,可以讀取到其他事務提交的數據

看起來這個級別可以解決上面的問題。T2執行時,可以讀取到T1提交的結果

但是問題是,T2執行的時候,T1的事務提交了嗎?

事務和鎖的流程如下

1.開啟事務(aop)

2.加鎖(進入synchronized方法)

3.釋放鎖(退出synchronized方法)

4.提交事務(aop)

可以看出是先釋放鎖,再提交事務,所以T2執行查詢,可能還是未讀到T1提交的數據,還會出問題

 

3.2.根據3.1中的問題,發現主要矛盾是事務開啟和提交的時機與加鎖解鎖時機不一致,有小伙伴們可能就想到了解決方案.

3.2.1.在事務開啟前加鎖,事務提交后解鎖

確實是可以,這相當於事務串行化,拋開性能不談,來談談怎么實現

如果使用默認的事務傳播機制,那么要保證事務開啟前加鎖,事務提交后解鎖,就需要把加鎖,解鎖放在controller層

這樣就有個潛在問題,所有操作庫存的方法,都要加鎖,而且要是同一把鎖,寫起來挺累的

而且這樣還是不能跨jvm

 

3.2.2.將查詢庫存,扣減庫存這2步操作,單獨提取個方法,單獨使用事務,並且事務隔離級別設置為RC

這個其實和上面的3.2.1異曲同工,最終都是將加解鎖放在了事務開啟提交外層

比較而言優點是入口少了controller不用處理

缺點除了上面的不能跨jvm,還有就是 單獨的這個方法,需要放到另外的service類中

因為使用spring,同一個bean的內部方法調用,是不會被再次代理的,所以配置的單獨事務等需要放到另外的 service bean 中

 

2.不查詢,直接更新

看完第一種方案,小伙伴就說了,說的那么復雜,這么多問題,就是因為查詢的數據不是最新的嗎?

我們不查詢,接更新不就行啦

偽代碼如下:

public synchronized void buy(String productName, Integer buyQuantity) {
    // 其他校驗...
    int 影響行數 = update table set surplus = (surplus - buyQuantity) where id = 1 ;
    if (result < 0) {
        return "庫存不足";
    }
    // 記錄日志...
    // 其他業務...
}

 

 測試后發現庫存變成-1了,繼續完善下

public synchronized void buy(String productName, Integer buyQuantity) {
    // 其他校驗...
    int 影響行數 = update table set surplus = (surplus - buyQuantity) where id = 1 and (surplus - buyQuantity) > 0 ;
    if (result < 0) {
        return "庫存不足";
    }
    // 記錄日志...
    // 其他業務...
}

 

測試后,功能OK;

這樣確實可以實現,不過有一些其他問題:

1.不具備通用性,例如add操作

2.庫存操作一般要記錄操作前后的數量等,這樣沒法記錄

3.其他...

但是根據這個方案,引出方案3
 

3.使用CAS,update table set surplus = aa where id = xx and yy = y

CAS是指compare/check and swap/set 意思都差不多,不必太糾結是哪個單詞

我們將上面的sql修改一下:

int 影響行數 = update table set surplus = newQuantity where id = 1 and surplus = oldQuantity ;

這樣,線程T1執行完后,線程T2去更新,影響行數=0,則說明數據被更新,重新查詢判斷執行。

偽代碼如下:

public void buy(String productName, Integer buyQuantity) {
    // 其他校驗...
    Product product = getByDB(productName);
    int 影響行數 = update table set surplus = (surplus - buyQuantity) where id = 1 and surplus = 查詢的剩余數量 ;
    while (result == 0) {
        product = getByDB(productName);
        if (查詢的剩余數量 > buyQuantity) {
            影響行數 = update table set surplus = (surplus - buyQuantity) where id = 1 and surplus = 查詢的剩余數量 ;
        } else {
            return "庫存不足";
        }
    }
    // 記錄日志...
    // 其他業務...
}

 

看到重新查詢幾個字,小伙伴們應該就又想到事務隔離級別問題了

上面代碼中的getByDB方法,必須單獨事務(注意同一個bean內單獨事務不生效哦),而且數據庫的事務隔離級別必須是RC,

否則上面的代碼就會是死循環了。

 

上面的方案,可能會出現一個CAS中經典問題,ABA的問題:

ABA是指:

線程T1 查詢,庫存剩余  100, 線程T2 查詢,庫存剩余  100

線程T1 執行sub update t set surplus = 100 - 10 where id = x and surplus = 100;

線程T3 查詢, 庫存剩余 90

線程T3 執行add update t set surplus = 90 + 10 where id = x and surplus = 90;

線程T2 執行sub update t set surplus = 100 - 10 where id = x and surplus = 100;

 這里線程T2執行的時候,庫存的100已經不是查詢到的100了,但是對於這個業務是不影響的。

 

一般的設計中CAS會使用version來控制

update t set surplus = 90 ,version = version + 1 where id = x and version = oldVersion ;

這樣,每次更新version在原基礎上+1,就可以了。

使用CAS要注意幾點,

1.失敗重試次數,是否需要限制

2.失敗重試對用戶是透明的

 

4.使用數據庫鎖,select xx for update

方案3中的cas,是樂觀鎖的實現,而select for udpate 則是悲觀鎖,在查詢數據的時候,就將數據鎖住

偽代碼如下:

public void buy(String productName, Integer buyQuantity) {
    // 其他校驗...
    Product product = select * from table where name = productName for update;
    if (查詢的剩余數量 > buyQuantity) {
        影響行數 = update table set surplus = (surplus - buyQuantity) where name = productName ;
    } else {
        return "庫存不足";
    }
    // 記錄日志...
    // 其他業務...
}

 

線程T1 進行sub,查詢庫存剩余 100

線程T2 進行sub,這時候,線程T1事務還未提交,線程T2阻塞,直到線程T1事務提交或回滾才能查詢出結果

所以線程T2查詢出的一定是最新的數據,相當於事務串行化了,就解決了數據一致性問題

 

對於select for update,需要注意的有2點

1.統一入口:所有庫存操作都需要統一使用 select for update,這樣才會阻塞,如果另外一個方法還是普通的select,是不會被阻塞的

2.加鎖順序:如果有多個鎖,那么加鎖順序要一致,否則會出現死鎖

 

5.使用分布式鎖(zookeeper,redis等)

使用分布式鎖,原理和方案1種的synchronized是一樣的,只不過synchronized的flag只有jvm進程內可見,而分布式鎖的flag則是全局可見,方案4種的 select for update 的flag也是全局可見

分布式鎖的實現方案有很多:基於redis,基於zookeeper,基於數據庫等等。下一篇博客將介紹 基於redis的分布式鎖

需要注意,使用分布式鎖和synchronized鎖有同樣的問題,就是鎖和事務的順序,這個在方案1里面已經講過,不再重復

 

做個簡單總結

方案1:synchronized等jvm內部鎖不適合用來保證數據庫數據一致性,不能跨jvm

方案2:不具備通用性,不能記錄操作前后日志

方案3:推薦使用,但是如果數據競爭激烈,則自動重試次數會急劇上升,需要注意

方案4:推薦使用,最簡單的方案,但是如果事務過大,會有性能問題;操作不當,會有死鎖問題

方案5:和方案1類似,只是能跨jvm

 

原文:

https://blog.csdn.net/qq315737546/article/details/76850173


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM