C#基於Redis實現分布式鎖


  【本博客屬於原創,如需轉載,請注明出處:https://www.cnblogs.com/gdouzz/p/12097968.html

  最近研究庫存的相關,在高峰期經常出現超賣等等情況,最后根據采用是基於Redis來實現了分布式鎖,特此拿出來和大家分享。

  准備工作:centos7,Redis,Nginx,以及JMeter測試工具。

  分布式鎖的引出

  在傳統的程序中,我們寫了如下最簡單對庫存操作的代碼如下:

  下面是基於AspNetCore.WebAPI 創建的一個對庫存進行操作(減少)的接口,我相信很多同志都能夠寫出這種加lock來保證高並發的時候,庫存不會出現超賣,這種做法的性能問題,不屬於我們這篇文章的討論范圍,我們要討論的是,這種寫法到底會不會造成超賣的情況出現呢?,如果這是傳統的企業內部應用,單體架構,如下圖所示,也能滿足需求;   

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using StackExchange.Redis;

namespace RedisDistributedLockMvc.Controllers
{
    [Route("api/inv")]
    [ApiController]
    public class InvController : ControllerBase
    {

        private static readonly object LockObject = new object();
        private static readonly ConfigurationOptions Options = new ConfigurationOptions()
        {
            EndPoints = { { "192.168.232.132", 6379 } },
            Password = "123456"
        };


        [HttpGet]
        public ActionResult<string> Get()
        {
            string msg = null;
            lock (LockObject)
            {
                int invQty = GetInvQty();
                if (invQty > 0)
                {
                    invQty = invQty - 1;
                    SetInvQty(invQty);
                    msg = $"扣減成功,當前庫存:{invQty}"; 
                }
                else
                {
                    msg = "扣減失敗,庫存不足";
                }
            }
            Console.WriteLine(msg);
            return msg;
        }


        private int GetInvQty()
        {
            var qty = 0;
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                qty = Convert.ToInt32(db.StringGet("InvQty"));
            }
            return qty;
        }

        private void SetInvQty(int qty)
        {
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                db.StringSet("InvQty", qty);
            }
        }
    }
}

      隨着業務的越來越復雜,這種單體架構的形式,已經滿足不了我們的正常業務需求,很多公司演變成了下面這種架構模式;

  下面是我在測試環境搭建的過程(為了讓自己有更深刻的體會,我建議大家按照上面的架構圖搭建一個簡單的環境);

  1、把上面的AspNetCore代碼發布到Centos 7機器上,分別指向該機器的不同端口(5000,5001),等同於部署了兩份;

  2、在Centos 7機器上安裝Nginx,然后修改nginx配置文件,指向剛剛配置配置的地址;

  特別說明:如果覺得上面操作很難,在Centos中,可以通過yum源來安裝相應的軟件,通過使用xshell來編寫命令,如果是對文件操作的,不熟悉命令可以用WinScp這種軟件進行可視化修改完后保存。然后需要特別注意的是,防火牆以及相關的端口和服務是否啟動。

  除此之外還要特別留意,Nginx是一個進程,剛剛兩個不同的端口,分別對應不同的進程,這三個進程之間的安全是通過叫(seLinux)來管理的,如果nginx配置好之后,外網訪問還是報502,可以嘗試着把seLinux關閉(當然不推薦關閉,也有相關的解決方案)。

  當然,如果有同學想嘗試這個過程,碰到問題的也可以聯系我:QQ:3484677573,說明是在博客園看到的即可。

  搭建環境完畢之后,接下來開始我們的測試;

  測試之前,因為要模擬高並發的環境,我們采用的jmeter作為我們壓測的工具,簡單的使用教程如下:

  首先從官網下載和安裝jmeter,安裝完之后,找到安裝下的bin目錄,找到jmeter.bat,雙擊即可啟動jmeter。

   打開jmeter之后,添加線程組,比較簡單,可以指定線程的個數等等。

  接下來再添加一個HttpRequest,如下圖所示,根據提示,輸入相關的內容,然后點擊上面的運行,即可開始測試。

   至此,測試工具也准備OK了,那我們開始測試,先假設我們Redis里面有50個庫存;

  接下來在centos啟動linux,以及運行我們的服務,如下圖所示(5000和50001);

 

   使用jmeter進行壓測,調用,看看是否會出現超賣的情況。

   1、先開啟50個線程,進行壓測,看看輸出結果,兩個服務輸出的結果如下:

  把結果設置成50,確實出現了超賣的情況,只要兩台機器輸出的當前庫存是一樣,就說明出現了超賣。

    說明我們最開始的那段代碼在分布式環境下,或者在我們最常見的負載均衡部署方式下面是不行的,所以就提出了我們分布式鎖的解決方案。

  PS:我以前也覺得上面這種加鎖的方法,好像是不能用在分布式環境中,經過上面這么一折騰,我印象更加深刻了,也有了更清晰的認識。

  分布式鎖的解決方案

  分布式鎖的解決方案,在業界內也有很多,也有很多成熟的框架,下面我們介紹一種基於Redis來實現的分布式鎖解決方案;

  先解釋一下,前面的做法為什么不行和我們為什么要采取redis來做分布式鎖

  1、上面這種Lock屬於進程內的鎖,當只有一個進程的時候(只部署了一台服務器)是沒有問題的,當存在多台服務器的時候,就會出問題;

     2、之所以采取Redis來做分布式鎖,Redis是單線程的,當我們有N個請求同時到達的時候,它會通過隊列的形式變成串行訪問;

     話不多說,直接看代碼

  這個版本的分布式鎖,我們做了最簡單的考慮

  1、鎖超時的問題(通過對Redis官網給出的SetNx方法,對應的就是StackExchange.dll里面的 db.StringSet("InvQty222", "111", TimeSpan.FromSeconds(900), When.NotExists, CommandFlags.None);

  2、執行過程中,可能出異常的情況,在finally里面釋放鎖;

private static readonly object LockObject = new object();
        private static readonly ConfigurationOptions Options = new ConfigurationOptions()
        {
            EndPoints = { { "192.168.232.132", 6379 } },
            Password = "123456"
        };


        [HttpGet]
        public ActionResult<string> Get()
        {
            #region 用Lock方式實現鎖
            //string msg = null;
            //lock (LockObject)
            //{
            //    int invQty = GetInvQty();
            //    if (invQty > 0)
            //    {
            //        invQty = invQty - 1;
            //        SetInvQty(invQty);
            //        msg = $"扣減成功,當前庫存:{invQty}";
            //    }
            //    else
            //    {
            //        msg = "扣減失敗,庫存不足";
            //    }
            //}
            //Console.WriteLine(msg);
            //return msg;
            #endregion

            #region   Redis實現的第一版本分布式鎖
            string msg = null;
            var isSuccess = SetLockVersion1("1");
            //如果key存在返回的就是false
            //如果key不存在返回,就set,返回true
            //除此之外,我們還要考慮的是這把鎖的超時時間,
            //如果這把鎖一直不釋放(執行過程卡住了,那么要考慮把鎖超時)
            if (isSuccess)
            {
                try
                {
                    int invQty = GetInvQty();
                    if (invQty > 0)
                    {
                        invQty = invQty - 1;
                        SetInvQty(invQty);
                        msg = $"扣減成功,當前庫存:{invQty}";
                    }
                    else
                    {
                        msg = "扣減失敗,庫存不足";
                    }
                }
                finally
                {
                    //還要考慮執行過程中,如果出現了異常,也要把鎖給釋放掉。
                    UnLockVersion1();  //釋放鎖;
                }
            }
            else
            {
                msg = "資源正忙,請刷新后重試";
            }
            Console.WriteLine(msg);
            return msg;
            #endregion
        }


        private int GetInvQty()
        {
            var qty = 0;
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                qty = Convert.ToInt32(db.StringGet("InvQty"));
            }
            return qty;
        }

        private void SetInvQty(int qty)
        {
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                db.StringSet("InvQty", qty);
              
            }
        }

        private bool SetLockVersion1(string value)
        {
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                var flag= db.StringSet("LockValue", value, TimeSpan.FromSeconds(900), When.NotExists, CommandFlags.None); //如果存在了返回false,不存在才返回true;
                db.KeyExpire("LockValue", TimeSpan.FromSeconds(10));
                return flag;
            }
        }


        private bool UnLockVersion1()
        {
            using (var conn = ConnectionMultiplexer.Connect(Options))
            {
                var db = conn.GetDatabase();
                return db.KeyDelete("LockValue");
            }
        }

  按照上面的程序,我們再把代碼部署到centos上,然后利用jmeter進行壓測;

  經過我們這么一折騰,好像超賣的現象沒有出現了;但是我們上面的做法還是比較的簡單,很多情況都沒有考慮在里面,就比如下面這幾種情況;

  問題一、鎖失效問題,問題根源,A線程加的鎖,被B線程釋放了。

  

 

    為了解決這種情況,我們可以在進來的時候,存一個clientId到Redis的鎖里面,再失效key的時候,判斷一下,當前clientId和redis鎖里面的值是否一致,如果一致,就才釋放。

string msg = null;
            string clientId = Guid.NewGuid().ToString();
            var isSuccess = SetLockVersion1(clientId);
            //如果key存在返回的就是false
            //如果key不存在返回,就set,返回true
            //除此之外,我們還要考慮的是這把鎖的超時時間,
            //如果這把鎖一直不釋放(執行過程卡住了,那么要考慮把鎖超時)
            if (isSuccess)
            {
                try
                {
                    int invQty = GetInvQty();
                    if (invQty > 0)
                    {
                        invQty = invQty - 1;
                        SetInvQty(invQty);
                        msg = $"扣減成功,當前庫存:{invQty}";
                    }
                    else
                    {
                        msg = "扣減失敗,庫存不足";
                    }
                }
                finally
                {

                    if (clientId.Equals(GetLockValue()))
                    {
                        //還要考慮執行過程中,如果出現了異常,也要把鎖給釋放掉。
                        UnLockVersion1();  //釋放鎖;
                    }
                }
            }
            else
            {
                msg = "資源正忙,請刷新后重試";
            }
            Console.WriteLine(msg);
            return msg;

  問題二:鎖超時的問題,如果客戶端1需要執行這把鎖的時間大於鎖設定的超時時間,該怎么做呢

  1、開啟一個守護線程(后台線程),假如你鎖的設置30秒超時,那你每隔10秒去檢查是不是還是client1持有鎖,如果是那就延長10秒,類似於Watch dog思路。

  這個守護線程要注意的點就是,如果鎖都沒人使用了,這個守護線程要及時的關閉,不能一直開啟着,如果不需要延長時間即不必要去延長。

  

 

 

 

 

 

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM