PHP redis分布式鎖+隊列實現商品搶購


有一個場景,商品A預售量1000件,早上10點准時開搶,10W個人一起來搶,在正式開始之后,我們將面對兩個問題
1  大批的數據庫請求和大量的訂單創建,數據庫壓力巨大,有可能宕機
2  商品可能出現超賣的情況
解決方案如下:


這里我們先看商品超賣的問題
最原始的下單流程無非就是: 判斷商品庫存是否足夠 -> 足夠則下單
這種處理方式在沒什么並發的情況下不會出現問題,但是一旦並發量一大,這種流程就肯定會出現超賣
假設有A和B兩個進程,A要買1個,B也要買1個,可是商品庫存就剩下一個了,這兩個進程同時進入庫存判斷,都通過了,然后進入:下單->減庫存  最后結果就是商品庫存變成了負數,這顯然是不符合需求的
所以我們要做的就是,庫存判斷 ->下單 -> 減庫存  讓這整個流程原子化  ,要么都能執行,要么先等着,別上趕着。

我們利用redis的單線程,可以實現這一點,也就是俗稱的分布式鎖
網上關於分布式鎖的做法良莠不齊,博主之前也陷入過誤區,這里挨個給大家爬坑,拋磚引玉
為了讓大家只關注這個鎖的意義,這里關於商品過多的信息不作完整贅述,只做簡單的舉例
這里是redis中存放的商品信息:productInfo:16 指的是ID為16的商品信息(秒殺活動商品詳情頁打開非常頻繁,建議緩存起來)
limitBuy 指的是這件商品的限購數量,一會用得上



storage:16 指的是ID為16的商品庫存,也建議在添加搶購活動的時候就緩存到redis中
 


 

好,准備工作做好了,正式開始!
先上代碼

        $redis = new RedisService();
 
        //購買數量不得大於限購數量
        $productInfo = $redis->hashGet("productInfo:".$productId);
        if($productInfo['limitBuy'] < $num)
        {
            echo  json_encode(["code"=>30017,"msg"=>'每人限購'.$productInfo['limitBuy'].""]);die();
        }
        //加分布式鎖,原子化下單流程
        $storageLockKey = "storage:".$productId;
        $expireTime = $redis ->lock($storageLockKey,5,200);
        //判斷商品庫存
        $storageKey = $this->getStorageKey($productId);
        $storage = $redis->get($storageKey);
        if($storage <= 0 || $storage < $num)
        {
           $redis->unlock($storageLockKey,$expireTime);
           echo  json_encode(["code"=>30018,"msg"=>'庫存不足']);die();
        }
 
        //欲購買數量+已購買數量  不得超過限購數量
        $limitBuyKey = "product:limitBuy:".$productId;
        if($redis->getZsetScore($limitBuyKey,"user_".$userId)+$num > $productInfo['limitBuy'])
        {
            $redis->unlock($storageLockKey,$expireTime);
            echo  json_encode(["code"=>30019,"msg"=>'每人限購'.$productInfo['limitBuy'].""]);die();
        }
 
        $orderInfo = array(
            'buyer_id' => $userId,#用戶ID
            'product_id' => $productId,#產品ID
            'num' => $num,  #購買數量
            'price'=>$productInfo['price'],
            'pay_type'=>1 //在線支付
 
        );
        //訂單放進隊列
        $orderKey = "orderList";
        $orderRe = $redis->push($orderKey,serialize($orderInfo));
        //下單成功
        if($orderRe)
        {
 
 
            //獲取原有集合元素個數
            $count = $redis->countZset($limitBuyKey);
            //記錄購買人和購買數量
            $redis->alterZsetScore($limitBuyKey,"user_".$userId,$num);
            //如果是第一次插入元素,設置過期時間+3天,防止內存堆積
            if(!$count)
            {
                $recordExpire = strtotime($productInfo['endTime']) - strtotime($productInfo['beginTime']) +3*24*3600;
                $redis->expire($limitBuyKey,$recordExpire);
            }
 
            //商品減少庫存
            $redis->alterNumber($storageKey,-$num);
            $redis->unlock($storageLockKey,$expireTime);
            echo  json_encode(["code"=>200,"msg"=>'下單成功']);die();
        }

可以看到,我們的下單流程是   加分布式鎖 -> 判斷庫存 -> 限購 -> 訂單信息放入Redis列表

我們重點來看這個分布式鎖如何實現

這里的鎖保存的值是一個過期時間的時間戳(毫秒級)

有人說為什么要記錄個時間戳呢?為什么不直接利用redis的自動過期時間呢?
有兩個原因:
1  redis的set設置健值 和 設置過期時間 是分開的,假設設置了健值,在設置過期時間之前,程序出錯了,這個鎖就沒有過期時間了,就會一直存在redis中,形成死鎖
2  把值設為過期時間,可以通過getset方法在設置新過期時間的時候,取得舊的過期時間,判斷是否已經被別的進程搶先獲得鎖
看這段代碼:

 

流程解釋:
setnx 判斷當前是否有鎖

當前沒有鎖,則獲得鎖,返回過期時間
當前有鎖,判斷鎖是否過期

如果過期則更新過期時間,getset獲得鎖,判斷返回的的過期時間是否已經被搶先重置了,被搶先則等待20毫秒,沒被搶先則返回過期時間
沒有過期則等待20毫秒

返回的過期時間,是為了解鎖的時候配對上,誰加的鎖,只有誰能解。防止執行時間過長的進程解掉了別人的鎖,把后面的進程放進來
這樣,分布式鎖基本就完成了,解鎖的時候直接讓鎖過期即可

這樣就可以保證  永遠只有一個進程獲得鎖,永遠只有一個進程在進行庫存的相關判斷和操作,防止超賣
我們進行測試:
商品庫存為2000

並發5000次請求

10秒完成,平均每秒500個並發

可以每人買一件的話,生成了2000個訂單,寫入redis列表

庫存為0  沒有出現超賣的情況

接下來后來啟動個定時任務,每分鍾啟動一次,每次彈出3000個元素,寫入數據庫


 

linux添加定時任務

過一分鍾我們看到,隊列里的訂單已被彈出批量寫入數據庫了

在這個過程中,我們這兩個問題都得到了較好地解決
訂單先寫入redis  后台慢慢寫入數據庫,緩解了數據庫寫的壓力
產品詳情在后台發布秒殺活動的時候,就寫入redis緩存,前段獲取產品詳情頁的時候不會走數據庫,緩解讀的壓力
redis分布式鎖,讓整個下單流程原子化,只允許一個進程進行下單,防止了超賣的情況。

 

轉載 來自:https://blog.csdn.net/weixin_40325128/article/details/89378834


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM