Redis高並發分布式鎖實戰


Redis分布式鎖原理

手寫分布式鎖

場景:秒殺減庫存 准備:啟動redis,存儲key:stock、value:300

以下代碼是一個減庫存的接口。

    @RequestMapping("/deduct_stock")
    public String deductStock() {
    	 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
    	 if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣減成功,剩余庫存:" + realStock);
          } else {
                System.out.println("扣減失敗,庫存不足");
          }
          return "end";
    }

接口的含義是從redis拿到庫存值,判斷是否大於0,大於0 則減1 並更新redis存儲的庫存值,反之小於0,則打印扣減失敗,庫存不足。

首先不難看出接口是有並發問題的,如果同時多個線程執行減庫存操作,查詢出來的庫存值都一致再存儲到redis里邊,那肯定就有問題了,假設同時過來三個線程查出來300庫存,調用接口減庫存,同時更新庫存值299,這樣的話就會造成超賣!

這樣的話,可能大家第一想法就是有並發問題的這塊代碼加上鎖,比如說synchronized,jvm的內置鎖,進程級別的鎖。代碼如下:

    @RequestMapping("/deduct_stock")
    public String deductStock() {
    	 synchronized(this) {
             int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
             if (stock > 0) {
                    int realStock = stock - 1;
                    stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                    System.out.println("扣減成功,剩余庫存:" + realStock);
              } else {
                    System.out.println("扣減失敗,庫存不足");
              }
       		}
       		return "end";
    }

這樣根據synchronized的特性,多個線程請求這個服務,線上環境部署一台應用,在單台服務器上是可以解決問題的。

但是在目前高可用集群環境多台服務器下還是會有問題的。

比如說上圖,通過nginx負載均衡兩個tomcat進程,分發請求到不同的tomcat里邊去,發現還是會發生上邊超賣的情況。

下邊是我自己壓測的過程,有興趣的可以自己測試一下:

1.在本地啟動兩個tomcat,並將上邊加了sync鎖的代碼打包放到tomcat里邊運行。

2.配置nginx 權重1:1的負載上邊兩個tomcat的地址+端口,啟動nginx

3.配置jmeter壓測上邊配置的nginx負載的tomcat,我這jmeter配置的200個線程數,Ramp-up :1s 循環次數4次,代表1s內執行200次請求,循環4次,啟動jmeter。

4.通過兩個tomcat打印的日志信息可以發現了有重復庫存的出現,說明在同一時間請求,超賣問題已經出現,在集群環境下sync不能解決並發問題。

這其中還會發生一個問題,那就是有可能在同一台服務器上已經減過的庫存再次出現,這是因為另一台服務器給set的值導致從redis取的時候取到已經減過的庫存,導致的超賣問題,也是典型的分布式問題。

如果系統中用到redis的話,推薦用redis實現分布式鎖解決這個問題,接下來一步步進行分析。

如果用redis實現分布式鎖的話,我們一般用setnx這個命令來實現:

SETNX
格式:setnx key value 
將 key 的值設為 value ,當且僅當 key 不存在。
若給定的 key 已經存在,則 SETNX 不做任何動作。
SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
返回值: 
設置成功,返回 1 。
設置失敗,返回 0 。

根據setnx的特性,多個進程過來請求的話,讓他們同時去使用setnx命令去設置同一個值,如果設置成功,則說明搶到鎖,可以進行執行邏輯代碼,如果沒設置成功的話,說明沒搶到鎖,沒搶到的線程進行等待重試。

根據以上思路,快速實現一個簡單的入門級別分布式鎖,代碼如下:

    @RequestMapping("/deduct_stock")
    public String deductStock() {
    	 //在實際應用過程中,肯定是給操作的對象上鎖,比如說操作某件商品,就對應商品id進行上鎖
    	 String lockKey = "lock:product_101";
    	 //上鎖
    	 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "xxx");
    	 if(!result) {
    	 	return "系統繁忙,請稍后再試";
    	 }
         int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
         if (stock > 0) {
             int realStock = stock - 1;
             stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
             System.out.println("扣減成功,剩余庫存:" + realStock);
         } else {
         	System.out.println("扣減失敗,庫存不足");
         }
         //釋放鎖
         stringRedisTemplate.delete(lockKey);
         return "end";
    }

當然這個只是入門級別的分布式鎖,肯定有很多問題會發生,在實際業務過程中,“上鎖” 之后的業務代碼肯定是會很多的,在操作過程中如果發生異常,就執行不到釋放鎖的代碼,這樣就會發生死鎖的問題,上鎖的“product_101”key就會一直在redis里邊存在,其他的線程就不能再去對上鎖的“product_101”進行操作了。

解決辦法就是try{xxx} finally{} 不管拋異常還是怎么樣都需要把這個鎖給釋放了:

    @RequestMapping("/deduct_stock")
    public String deductStock() {
    	 //在實際應用過程中,肯定是給操作的對象上鎖,比如說操作某件商品,就對應商品id進行上鎖
    	 String lockKey = "lock:product_101";
    	 //上鎖
    	 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "xxx");
    	 if(!result) {
    	 	return "系統繁忙,請稍后再試";
    	 }
    	 try {
             int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
             if (stock > 0) {
                 int realStock = stock - 1;
                 stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                 System.out.println("扣減成功,剩余庫存:" + realStock);
             } else {
                System.out.println("扣減失敗,庫存不足");
             }
         } finally {
             //釋放鎖
             stringRedisTemplate.delete(lockKey);
         }
         return "end";
    }

加try{}finally{}就一定解決問題了嗎 也不一定, 如果服務器宕機,或者basis進行服務器重啟的時候,一樣執行不到finally的代碼,還是會發生死鎖的情況。

發生上邊的情況我們可以進行設置超時時間,但是有個問題注意一下,比如說下邊這段代碼:

Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "xxx");
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

不要把設置超時時間和設置key分開寫,這樣的話容易發生原子性問題,原子是什么,原子是最小的單位不可再分割。保證原子性的話,需要redis把它當成一條命令去執行,不能分開來執行。在開發過程中這點也要注意,如果像以上代碼分開來的話比如說執行到設置key這段代碼服務器發生問題宕機或者重啟,還是會發生以上死鎖問題,正確的代碼如下:

    @RequestMapping("/deduct_stock")
    public String deductStock() {
    	 //在實際應用過程中,肯定是給操作的對象上鎖,比如說操作某件商品,就對應商品id進行上鎖
    	 String lockKey = "lock:product_101";
    	 //上鎖 保證原子性
    	 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "xxx", 10, TimeUnit.SECONDS);
    	 if(!result) {
    	 	return "系統繁忙,請稍后再試";
    	 }
    	 try {
             int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
             if (stock > 0) {
                 int realStock = stock - 1;
                 stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                 System.out.println("扣減成功,剩余庫存:" + realStock);
             } else {
                System.out.println("扣減失敗,庫存不足");
             }
         } finally {
             //釋放鎖
             stringRedisTemplate.delete(lockKey);
         }
         return "end";
    }

以上代碼在並發不是特別高的情況下是不會有問題的,但是如果說在極端高並發的場景下並且執行的業務代碼邏輯又特別長,第一個請求過來執行超過了10秒鍾,鎖就失效了,這樣第二個請求就能獲取鎖去執行了,在執行過程中,第一個請求執行完了執行delete key去釋放鎖,這樣,第三個請求就能進來了,然后第二個請求執行完,就會把第三個請求的鎖釋放掉,這樣周而復始,還是會有問題,上邊的超賣問題還是得不到解決,甚至 在極端高並發的情況下,造成大量的超賣。

首先分析這個問題出現的根本原因在哪里,不難看出當前線程搶占的鎖被其他線程給刪除掉了,這樣肯定是不合理的,線程自己的鎖肯定需要自己來刪除,明白的這個點后,我們可以給鎖加上一個用uuid生成的clientId放到value里邊去,以此來判斷是不是線程自己的鎖,代碼如下:

    @RequestMapping("/deduct_stock")
    public String deductStock() {
    	 //在實際應用過程中,肯定是給操作的對象上鎖,比如說操作某件商品,就對應商品id進行上鎖
    	 String lockKey = "lock:product_101";
    	 //上鎖 保證原子性
    	 String clientId = UUID.randomUUID().toString();
    	 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
    	 if(!result) {
    	 	return "系統繁忙,請稍后再試";
    	 }
    	 try {
             int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
             if (stock > 0) {
                 int realStock = stock - 1;
                 stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                 System.out.println("扣減成功,剩余庫存:" + realStock);
             } else {
                System.out.println("扣減失敗,庫存不足");
             }
         } finally {
             //釋放鎖
             if	(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
             	stringRedisTemplate.delete(lockKey);
             }
         }
         return "end";
    }

假設在這樣的情況下,線程1在執行到判斷是不是自己的鎖的時候 也就是 if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {}這段代碼的時候剛好是9.9秒時間,在要刪除鎖的時候進行了卡頓,這個時候線程2進來了,線程2在執行邏輯的時候,線程1的卡頓結束,又把線程2的鎖給刪除了,這樣說白了判斷clientId和刪除key這兩個代碼還是有原子性問題的,但是就沒有線程的api可以調用了。

我們發現上邊有很多問題都是因為時間超時導致redis存儲的鎖失效,然后其他線程並發來執行,當然我們也可以將超時時間設置的長一些,設置為30s,當然有業務邏輯超過30s的很少,但是還是存在的,比如說再跑定時任務的時候用到了分布式鎖,超過了30秒鍾就一樣還是會出現鎖超時的問題,所以這樣單純的延長時間還是治標不治本,所以想要完美的解決這個問題就要引入一個完美的解決方案叫做鎖續命(watchDog)。

鎖續命(watchDog):假設主線程搶到鎖開始執行業務邏輯,開啟一個分線程,在分線程里邊做一個定時任務,比如說設置的鎖超時時間是30s,那么我們的定時任務時間就設置為10s,定時任務設置的時間一定要比鎖超時時間小,每10s定時任務先去判斷主線程有沒有結束,沒有結束的話說明主線程就還在,還在進行業務邏輯操作,這個時候我們執行一條expire命令,將主線程鎖的超時時間重新設置為30s,這樣的話只要主線程還沒結束,主線程就會被分線程定時任務去做續命邏輯,維持在30s,判斷主線程結束,就不再執行續命邏輯。

Redisson

依據上邊的場景加問題,市面上有很多優秀的分布式鎖框架,其中一個Redisson的實現中,就有鎖續命的實現,使用方法也很簡單。

引入redisson的jar包

            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.16.8</version>
            </dependency>

引入之后,進行redisson客戶端的配置,注入到spring容器。

    @Bean
    public Redisson redisson() {
        // 此為單機模式
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

然后我們再實現分布式鎖就很簡單了,代碼如下:

    @Autowired
    private Redisson redisson;
    
    @RequestMapping("/deduct_stock")
    public String deductStock() {
    	 //在實際應用過程中,肯定是給操作的對象上鎖,比如說操作某件商品,就對應商品id進行上鎖
    	 String lockKey = "lock:product_101";
    	 //上鎖 保證原子性
    	/* String clientId = UUID.randomUUID().toString();
    	 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
    	 if(!result) {
    	 	return "系統繁忙,請稍后再試";
    	 }*/
    	//獲取鎖對象
    	 RLock redissonLock = redisson.getLock(lockKey);
        //加分布式鎖
         redissonLock.lock();  //  .setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
    	 try {
             int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
             if (stock > 0) {
                 int realStock = stock - 1;
                 stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                 System.out.println("扣減成功,剩余庫存:" + realStock);
             } else {
                System.out.println("扣減失敗,庫存不足");
             }
         } finally {
             //釋放鎖
             /*if	(clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
             	stringRedisTemplate.delete(lockKey);
             }*/
            //解鎖
            redissonLock.unlock();
         }
         return "end";
    }

再redisson加分布所鎖的過程中,也就是 redissonLock.lock(),lock()方法是做了很多操作的。

redisson加鎖的核心流程:

如上圖所示:假設現在有兩個線程同時調用lock()方法給同一個key加鎖,原理也就是跟執行setnx命令差不多,只能有一個線程能加鎖成功,如果線程1執行成功,那么相應的線程2就執行不成功,線程2就會自旋並間歇性的嘗試去加鎖,檢測鎖是否還存在,如果不存在會去嘗試加鎖,在的話就會繼續等待(不會一直while嘗試加鎖,會有阻塞的邏輯)。線程1加鎖成功會另外開啟一個后台線程每隔一段時間檢查是否還持有鎖,如果持有則延長加鎖的時間。

有了以上的邏輯,我們繼續根據源碼推敲。

加鎖成功:

在RedissonLock類的lock()方法中發現會先獲取線程的id,然后執行tryAcquire()方法,並且傳遞一個值為-1l的leaseTime參數

trytryAcquire()緊接着調用tryAcquireAsync()方法並且根據leaseTime參數值判斷是否-1l走相應的邏輯:

我們點進去else繼續看發現會根據傳遞的參數執行一段lua腳本:

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

具體lua腳本介紹可參考"Redis持久化、主從與哨兵架構詳解"中"Redis Lua腳本"介紹。

lua腳本具有減少網絡開銷、原子操作、替代redis的事物等優勢,這里這段lua腳本大概的意思是:判斷傳入的getLock()方法里邊的name存不存在,如果返回0,表示不存在則存入一個hash結構,key為傳入的name(也就是上邊代碼的lockKey),value是根據threadId生成的一個唯一的名稱(相當於上邊手寫分布式鎖的clientId)並且給這個key設置對應參數1(unit.toMillis(leaseTime))里邊的超時時間也就是30s,后邊的hincrby加上增量1是為了可重入鎖設置,可重入鎖是在加鎖的代碼塊里執行邏輯,可能在加鎖的代碼塊之外的代碼還有可能發生並發問題就再嘗試加一層鎖,當然是同一個線程,是以線程為單位,當一個線程獲取對象鎖之后,這個線程可以再次獲取本對象上的鎖,而其他的線程是不可以的。成功后返回null,否則判斷hash結構(hexists)加上傳入的參數,key為鎖名稱,value為生成的客戶端唯一值(this.getLockName(threadId)),如果存在則設置這個結構hincrby為哈希表key中field鍵的值加上增量1並且設置超時時間為傳入參數的時間leaseTime(30s)(可重復鎖)。

這里解釋一下為什么說傳入的leaseTime是30s,上邊截圖里解釋了調用tryAcquireAsync()方法的else代碼塊,這里傳入的時間是 this.internalLockLeaseTime,根據代碼跟蹤發現this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),繼續跟蹤發現this.lockWatchdogTimeout = 30000L,如果不做任何設置,就是30s,也可以通過設置redisson里邊的config參數,設置其他的超時時間(不建議)。

繼續分析tryAcquireAsync()方法,執行完lua腳本加鎖或者重置鎖的超時時間之后,會調用一個回調方法,這個回調方法會調用一個重置到期時間(鎖續命)的方法:

重置到期方法會調用更新過期時間的方法renewExpiration(),該方法主要實現了延時任務,也就是this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS按照不設置超時時間的情況下,默認是30s除去3,就是每10s會執行一次去重置該鎖的過期時間。

其中renewExpirationAsync()方法調用一段lua腳本

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

該腳本大概意思就是根據鎖名稱和根據線程id生成的唯一標識判斷該鎖是否還存在,如果存在則重新設置超時時間默認為30s並且返回true,然后繼續調用回調方法,回調方法判斷如果執行成功的話,繼續每隔10s調用renewExpiration(),也就是延時任務自己去續命。

加鎖不成功:

我們重新回到加鎖的tryAcquireAsync方法,繼續分析tryLockInnerAsync方法執行的lua腳本,在如果線程不存在就直接加鎖,和鎖重復的判斷邏輯之后,還有一段腳本return redis.call('pttl', KEYS[1]),意思是返回該鎖剩余的過期時間,假如加鎖成功之后過期時間是30s,過了5s,剩余時間就是25s。

重新回到最開始的lock方法,在嘗試加鎖不成功之后返回超時時間,如果超時時間不為null(lua腳本執行的時候如果加鎖成功會返回nil 也就是null),會執行while(true)循環進行等待加鎖。

其中this.commandExecutor.getNow(future)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); latch()方法會返回一個Semaphore(信號量),然后調用tryAcquire方法進行等待(許可)上邊嘗試加鎖返回的鎖的剩余的超時時間(返回的ttl),等待的過程中不會占用cpu,就算1000個線程在等待也會讓出cpu空間,不會耗費性能,等待結束后繼續調用tryAcquire()方法嘗試加鎖,也就是上圖所示的間歇性嘗試加鎖(自旋)

但是這樣還有有一個問題,那就是如果在獲取到返回的超時時間之后,假如返回的超時時間是20s,在這20s內鎖被釋放掉了,難道該線程還需要一直等待20s嗎,答案肯定是不需要,有一個地方進行阻塞等待,就肯定需要去喚醒在阻塞的線程繼續一起搶鎖,這樣才能完成的形成一個閉環。

在沒有搶到鎖的線程會利用redis的訂閱功能,訂閱一個名叫“redisson_lock__channel”+ 鎖的名稱的channel頻道,等待頻道發布消息進行喚醒。

那么什么時候會給頻道 發布消息呢,肯定是在unlock方法,釋放鎖的時候會去在頻道發布消息,告訴在等待的線程可以進行搶鎖了。進入unlock方法之后發現調用了unlockAsync方法,該方法又調用了unlockInnerAsync方法,unlockInnerAsync方法打開之后發現依舊是一段lua腳本:

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

改lua腳本大概意思是:首先根據this.getRawName()也就是鎖名稱和this.getLockName(threadId)是以上生成的代表持有鎖的唯一標識(客戶端標識)判斷是否存在,如果不存在代表沒有自己的鎖,肯定是不能刪除的,然后繼續判斷可重入鎖,進行減1,如果大於0則重置超時時間,否則根據鎖名稱進行刪除鎖並發布訂閱,內容為頻道名稱和解鎖內容(0)並返回1。

發布成功之后,回到剛剛加鎖的代碼塊中回看訂閱頻道的類:

protected CompletableFuture<RedissonLockEntry> subscribe(long threadId) {
    return this.pubSub.subscribe(this.getEntryName(), this.getChannelName());
}

this.pubSub這個類中去發布的訂閱,這個類是LockPubSub,在這個類中有消費onMessage方法,也就是說發布成功之后,當隊列有消息監聽改隊列的線程會調用onMessage方法去消費:

先判斷lua腳本里返回的消息是不是0,調用poll方法刪除檢索並刪除隊列的元素,如果不為null回調run方法。然后調用getLatch方法獲取當初阻塞等待的信號量,調用release()方法喚醒線程去搶鎖

為了滿足各種需求redisson還提供了其他豐富的api,其中trylock()也是一種嘗試加鎖的api,但是不會自旋阻塞去一直嘗試加鎖,沒有看門狗(getLockWatchdogTimeout())的邏輯。它會根據設置的時間去嘗試加鎖,加鎖成功返回true,失敗返回false,加鎖成功后設置的leaseTime就是鎖的最長超時時間,三個參數如下:

boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

waitTime是指在這段時間內嘗試加鎖會去等鎖,比如說設置10s,在10s內加鎖成功返回true,失敗返回false,超過等待時間,加鎖失敗后不會發生自旋去一直嘗試加鎖。

leaseTime是指超時時間,加鎖成功后,leaseTime就是鎖設置的超時時間。

unit是指時間的單位SECONDS,MINUTES,HOURS....

redis分布式鎖在集群中存在的問題

假設線程1在主節點加鎖成功,主節點在同步數據到從節點的過程中宕機,重新選舉從節點為主節點,這個時候新的主節點是不存在線程1的鎖的,這個時候線程2過來加鎖成功執行邏輯完成,再來一個線程過來加鎖成功,而線程1並發問題還沒執行完成,這樣的話就又會出現“超賣”的問題,這樣的問題我們稱為redis主從架構鎖失效問題

關於zk(zookeeper):zk也支持集群架構,zk也可以實現分布式鎖。假如現在有一個zk集群,主節點是leader,有兩個從節點follower1,follower2。從cap(CAP 理論是針對分布式數據庫而言的,它是指在一個分布式系統中,一致性(Consistency, C)、可用性(Availability, A)、分區容錯性(Partition Tolerance, P)三者不可兼得。)的理論來說redis集群着重於滿足ap(如果要可用性高並允許分區,則需放棄一致性。一旦分區發生,節點之間可能會失去聯系,為了實現高可用,每個節點只能用本地數據提供服務,而這樣會導致全局數據的不一致性。),對可用性滿足的多一點,基於zk的集群架構對cp(如果不要求可用性,相當於每個請求都需要在各服務器之間強一致,而分區容錯性會導致同步時間無限延長,如此 CP 也是可以保證的。很多傳統的數據庫分布式事務都屬於這種模式。)對數據一致性的要求更高一點,比如說要基於zk集群實現分布式鎖,也是去設置key,value和redis類似在主節點進行寫數據,但是zk主節點寫入數據后並不會直接向客戶端返回結果,而是先向從節點follower節點同步數據,從節點同步完成數據后向主節點發送成功消息,主節點收到消息后計算集群內有半數以上節點同步完成數據才會認為數據真的寫成功,這個時候才會向客戶端反饋成功消息繼續執行邏輯,以此來保證數據一致性。這個就是zk的zab協議中的廣播——主從同步(主從同步數據比較簡單, 當有寫操作時,如果是從機接收,會轉到主機。做一次轉發,保證寫都是在主機上進行。主先提議事務,收到過半回復后,再發提交。 主收到寫操作時,先本地生成事務為事務生成zxid,然后發給所有follower節點。 當follower收到事務時,先把提議事務的日志寫到本地磁盤,成功后返回給leader。 leader收到過半反饋后對事務提交。再通知所有的follower提交事務,follower收到后也提交事務,提交后就可以對客戶端進行分發了。),當zk集群的主節點宕機后,也會發生選舉,但是zk底層的zab協議中的選舉機制決定了會選舉出同步數據更多的節點為主節點(后續會解釋),所以如果用zk來實現分布式鎖的話,上述鎖闡述的redis主從架構鎖失效問題也就會得到解決,但是相應的因為底層實現的原因性能就會比redis差很多。

選redis還是zk實現分布式鎖:首先zk的性能肯定不如redis,但是從分布式鎖的角度語義上來說,zk可能更適合一些,所以如果對性能要求比較高的話就選redis,對數據的強一致性有特別嚴格要求的話就選zk,現在的主流的分布式鎖方案還是redis,也有一些辦法去減少redis主從架構鎖失效問題。

redis的紅鎖(建議不要使用):

首先需要實現紅鎖需要有多個redis節點,這個節點最好是奇數節點會對資源的利用率更高。加入說現在有3個redis節點,他們之間的關系是相等的,沒有主從、集群關系,都是對等的單節點,過來存儲鎖也是一樣的類似setnx(如果存在key不做操作)的機制,但是需要在所有節點去執行存儲的動作,有半數以上的節點返回存儲成功客戶端才會認為加鎖成功,才能走加鎖的邏輯。這樣做的好處就是因為半數以上成功才算成功的機制就算其中一個節點宕機也不會產生鎖失效的問題。但是這樣就失去了使用redis 的意義,對性能上也會產生影響。因為集群架構是會立馬返回結果,但是這種紅鎖的機制也是去犧牲了一些可用性去同步多個節點后才會返回結果。對數據一致性會保證的更好一點。

redlock在redisson里邊的實現,使用demo如下:

對於redisson中的redlock這樣做還是會有問題的,單實例肯定不是很可靠吧?加鎖成功之后,結果 Redis 服務宕機了,這不就涼涼~這時候會提出來將 Redis 主從部署。即使是主從,也是存在巧合的!比如說現在為了高可用給每個redis節點加上一個從節點,主從結構中存在明顯的競態:

  1. 客戶端 A 從 master 獲取到鎖
  2. 在 master 將鎖同步到 slave 之前,master 宕掉了。
  3. slave 節點被晉級為 master 節點
  4. 客戶端 B 取得了同一個資源被客戶端 A 已經獲取到的另外一個鎖。安全失效!

有時候程序就是這么巧,比如說正好一個節點掛掉的時候,多個客戶端同時取到了鎖。如果你可以接受這種小概率錯誤,那用這個基於復制的方案就完全沒有問題。

當然也可以直接簡單粗暴多加一些單節點,但是根據以上的半數以上機制來看節點加的越多,需要存值的節點也就越多,消耗的性能就越多,這樣就違背了用redis 的初衷。

那我使用集群呢?

如果還記得前面的內容,應該是知道對集群進行加鎖的時候,其實是通過 CRC16 的 hash 函數來對 key 進行取模,將結果路由到預先分配過 slot 的相應節點上。

發現其實還是發到單個節點上的

還有一個問題就是redis宕機后恢復數據的問題,就算不用主從結構,單機節點追求redis高性能的情況下一般設置持久化策略是不會設置立即持久化的,比如aof大多數情況下會設置1s后持久化這樣子。假設現在其中一個節點宕機又立即重啟的情況下,redis恢復數據如果aof持久化配置的策略不是每一條都存儲的情況下,還是有可能丟失數據從而發生以上鎖失效問題。

redis作者也對redlock有一定的爭議:

https://www.cnblogs.com/liuzhihang/p/15003362.html

結論

Redisson RedLock 是基於聯鎖 MultiLock 實現的,但是使用過程中需要自己判斷 key 落在哪個節點上,對使用者不是很友好。

Redisson RedLock 已經被棄用,直接使用普通的加鎖即可,會基於 wait 機制將鎖同步到從節點,但是也並不能保證一致性。僅僅是最大限度的保證一致性。

分布式鎖的優化:分布式鎖從底層來講就是把並行執行的請求給串行化了,因為redis是單線程的肯定就不會有並發問題了。

分布式鎖一旦加了之后,對同一個商品的下單請求,會導致所有客戶端都必須對同一個商品的庫存鎖key進行加鎖。

比如,對iphone這個商品的下單,都必對“iphone_stock”這個鎖key來加鎖。這樣會導致對同一個商品的下單請求,就必須串行化,一個接一個的處理。假設加鎖之后,釋放鎖之前,查庫存 -> 創建訂單 -> 扣減庫存,這個過程性能很高吧,算他全過程20毫秒,這應該不錯了。那么1秒是1000毫秒,只能容納50個對這個商品的請求依次串行完成處理。

缺陷:同一個商品多用戶同時下單的時候,會基於分布式鎖串行化處理,導致沒法同時處理同一個商品的大量下單的請求。

這種方案,要是應對那種低並發、無秒殺場景的普通小電商系統,可能還可以接受。

解決方案:

1.從粒度着手,鎖的粒度范圍越小越好,加鎖的代碼越少性能就越高,因為加鎖的代碼會串行執行,沒有必要加鎖的代碼肯定是讓他們並行執行這樣效率更高。

2.分段鎖。其實說出來也很簡單,看過java里的ConcurrentHashMap的源碼和底層原理,應該知道里面的 核心思路:分段加鎖

把數據分成很多個段,每個段是一個單獨的鎖,所以多個線程過來並發修改數據的時候,可以並發的修改不同段的數據。不至於說,同一時間只能有一個線程獨占修改ConcurrentHashMap中的數據。

另外,Java 8中新增了一個LongAdder類,也是針對Java 7以前的AtomicLong進行的優化,解決的是CAS類操作在高並發場景下,使用樂觀鎖思路,會導致大量線程長時間重復循環。

LongAdder中也是采用了類似的分段CAS操作,失敗則自動遷移到下一個分段進行CAS的思路。

其實分布式鎖的優化思路也是類似的,之前我們是在另外一個業務場景下落地了這個方案到生產中,不是在庫存超賣問題里用的。

但是庫存超賣這個業務場景不錯,很容易理解,所以我們就用這個場景來說一下。

分段加鎖思想。假如你現在iphone有1000個庫存,那么你完全可以給拆成20個庫存段,要是你願意,可以在數據庫的表里建20個庫存字段,每個庫存段是50件庫存,比如stock_01對應50件庫存,stock_02對應50件庫存。類似這樣的,也可以在redis之類的地方放20個庫存key。

接着,1000個/s 請求,用一個簡單的隨機算法,每個請求都是隨機在20個分段庫存里,選擇一個進行加鎖。

每個下單請求鎖了一個庫存分段,然后在業務邏輯里面,就對數據庫或者是Redis中的那個分段庫存進行操作即可,包括查庫存 -> 判斷庫存是否充足 -> 扣減庫存。

相當於一個20毫秒,可以並發處理掉20個下單請求,那么1秒,也就可以依次處理掉20 * 50 = 1000個對iphone的下單請求了。

一旦對某個數據做了分段處理之后,有一個坑一定要注意:就是如果某個下單請求,咔嚓加鎖,然后發現這個分段庫存里的庫存不足了,此時咋辦?
這時你得自動釋放鎖,然后立馬換下一個分段庫存,再次嘗試加鎖后嘗試處理。 這個過程一定要實現。

分布式鎖並發優化方案的不足:

最大的不足,很不方便,實現太復雜。

  • 首先,你得對一個數據分段存儲,一個庫存字段本來好好的,現在要分為20個分段庫存字段;
  • 其次,你在每次處理庫存的時候,還得自己寫隨機算法,隨機挑選一個分段來處理;
  • 最后,如果某個分段中的數據不足了,你還得自動切換到下一個分段數據去處理。

這個過程都是要手動寫代碼實現的,還是有點工作量,挺麻煩的。

不過我們確實在一些業務場景里,因為用到了分布式鎖,然后又必須要進行鎖並發的優化,又進一步用到了分段加鎖的技術方案,效果當然是很好的了,一下子並發性能可以增長幾十倍。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM