有一個場景,商品A預售量1000件,早上10點准時開搶,10W個人一起來搶,在正式開始之后,我們將面對兩個問題
1 大批的數據庫請求和大量的訂單創建,數據庫壓力巨大,有可能宕機
2 商品可能出現超賣的情況
解決方案如下:
這里我們先看商品超賣的問題
最原始的下單流程無非就是: 判斷商品庫存是否足夠 -> 足夠則下單
這種處理方式在沒什么並發的情況下不會出現問題,但是一旦並發量一大,這種流程就肯定會出現超賣
假設有A和B兩個進程,A要買1個,B也要買1個,可是商品庫存就剩下一個了,這兩個進程同時進入庫存判斷,都通過了,然后進入:下單->減庫存 最后結果就是商品庫存變成了負數,這顯然是不符合需求的
所以我們要做的就是,庫存判斷 ->下單 -> 減庫存 讓這整個流程原子化 ,要么都能執行,要么先等着,別上趕着。
我們利用redis的單線程,可以實現這一點,也就是俗稱的分布式鎖
網上關於分布式鎖的做法良莠不齊,博主之前也陷入過誤區,這里挨個給大家爬坑,拋磚引玉
為了讓大家只關注這個鎖的意義,這里關於商品過多的信息不作完整贅述,只做簡單的舉例
這里是redis中存放的商品信息:productInfo:16 指的是ID為16的商品信息(秒殺活動商品詳情頁打開非常頻繁,建議緩存起來)
limitBuy 指的是這件商品的限購數量,一會用得上
storage:16 指的是ID為16的商品庫存,也建議在添加搶購活動的時候就緩存到redis中
好,准備工作做好了,正式開始!
先上代碼
$redis = new RedisService(); //購買數量不得大於限購數量 $productInfo = $redis->hashGet("productInfo:".$productId); if($productInfo['limitBuy'] < $num) { echo json_encode(["code"=>30017,"msg"=>'每人限購'.$productInfo['limitBuy']."件"]);die(); } //加分布式鎖,原子化下單流程 $storageLockKey = "storage:".$productId; $expireTime = $redis ->lock($storageLockKey,5,200); //判斷商品庫存 $storageKey = $this->getStorageKey($productId); $storage = $redis->get($storageKey); if($storage <= 0 || $storage < $num) { $redis->unlock($storageLockKey,$expireTime); echo json_encode(["code"=>30018,"msg"=>'庫存不足']);die(); } //欲購買數量+已購買數量 不得超過限購數量 $limitBuyKey = "product:limitBuy:".$productId; if($redis->getZsetScore($limitBuyKey,"user_".$userId)+$num > $productInfo['limitBuy']) { $redis->unlock($storageLockKey,$expireTime); echo json_encode(["code"=>30019,"msg"=>'每人限購'.$productInfo['limitBuy']."件"]);die(); } $orderInfo = array( 'buyer_id' => $userId,#用戶ID 'product_id' => $productId,#產品ID 'num' => $num, #購買數量 'price'=>$productInfo['price'], 'pay_type'=>1 //在線支付 ); //訂單放進隊列 $orderKey = "orderList"; $orderRe = $redis->push($orderKey,serialize($orderInfo)); //下單成功 if($orderRe) { //獲取原有集合元素個數 $count = $redis->countZset($limitBuyKey); //記錄購買人和購買數量 $redis->alterZsetScore($limitBuyKey,"user_".$userId,$num); //如果是第一次插入元素,設置過期時間+3天,防止內存堆積 if(!$count) { $recordExpire = strtotime($productInfo['endTime']) - strtotime($productInfo['beginTime']) +3*24*3600; $redis->expire($limitBuyKey,$recordExpire); } //商品減少庫存 $redis->alterNumber($storageKey,-$num); $redis->unlock($storageLockKey,$expireTime); echo json_encode(["code"=>200,"msg"=>'下單成功']);die(); }
可以看到,我們的下單流程是 加分布式鎖 -> 判斷庫存 -> 限購 -> 訂單信息放入Redis列表
我們重點來看這個分布式鎖如何實現
這里的鎖保存的值是一個過期時間的時間戳(毫秒級)
有人說為什么要記錄個時間戳呢?為什么不直接利用redis的自動過期時間呢?
有兩個原因:
1 redis的set設置健值 和 設置過期時間 是分開的,假設設置了健值,在設置過期時間之前,程序出錯了,這個鎖就沒有過期時間了,就會一直存在redis中,形成死鎖
2 把值設為過期時間,可以通過getset方法在設置新過期時間的時候,取得舊的過期時間,判斷是否已經被別的進程搶先獲得鎖
看這段代碼:
流程解釋:
setnx 判斷當前是否有鎖
當前沒有鎖,則獲得鎖,返回過期時間
當前有鎖,判斷鎖是否過期
如果過期則更新過期時間,getset獲得鎖,判斷返回的的過期時間是否已經被搶先重置了,被搶先則等待20毫秒,沒被搶先則返回過期時間
沒有過期則等待20毫秒
返回的過期時間,是為了解鎖的時候配對上,誰加的鎖,只有誰能解。防止執行時間過長的進程解掉了別人的鎖,把后面的進程放進來
這樣,分布式鎖基本就完成了,解鎖的時候直接讓鎖過期即可
這樣就可以保證 永遠只有一個進程獲得鎖,永遠只有一個進程在進行庫存的相關判斷和操作,防止超賣
我們進行測試:
商品庫存為2000
並發5000次請求
10秒完成,平均每秒500個並發
可以每人買一件的話,生成了2000個訂單,寫入redis列表
庫存為0 沒有出現超賣的情況
接下來后來啟動個定時任務,每分鍾啟動一次,每次彈出3000個元素,寫入數據庫
linux添加定時任務
過一分鍾我們看到,隊列里的訂單已被彈出批量寫入數據庫了
在這個過程中,我們這兩個問題都得到了較好地解決
訂單先寫入redis 后台慢慢寫入數據庫,緩解了數據庫寫的壓力
產品詳情在后台發布秒殺活動的時候,就寫入redis緩存,前段獲取產品詳情頁的時候不會走數據庫,緩解讀的壓力
redis分布式鎖,讓整個下單流程原子化,只允許一個進程進行下單,防止了超賣的情況。
轉載 來自:https://blog.csdn.net/weixin_40325128/article/details/89378834