Redis分布式鎖和RedissionLock可重入分布式鎖源碼解讀


Redis分布式鎖和RedissionLock可重入分布式鎖源碼解讀

本文主要講三個部分

1,分布式鎖的基本特性
2,設計一個可用的redis分布式鎖及會遇到的重要問題和解決辦法
3,RedissionLock的lock和unlock的源碼分析

分布式鎖

在分布式模式下,對一份臨界資源需要跨主機跨進程跨線程互斥訪問的時候,需要用分布式鎖,來保證多進程有序操作

分布式特點

1,互斥性:只能有一個線程擁有該鎖
2,鎖超時避免死鎖:當該線程發生異常,能讓其他線程獲取
3,容錯性,高可用
4,高性能
5,可重入:即當前獲取該鎖的線程,可以繼續獲取執行lock獲取該鎖,每次lock可重入計數+1,unlock可重入計數-1,直到這個值為0,視為徹底釋放該鎖,參照JUC下ReentrantLock
6,具備阻塞和非阻塞性:當獲取不到的線程,能夠被及時喚醒

分布式類型:

1,基於數據庫的悲觀鎖:X鎖
2,基於數據庫的樂觀鎖:基於版本號
3,基於redis的分布式鎖
4,基於zookeeper的分布式鎖

注:這篇文章主要講redis分布式鎖,以及具體實現RedissionLock可重入分布式鎖源碼解讀

可重入分布式鎖的設計

1,要保證互斥性和可重入性:需要 記錄獲取該鎖的線程信息,例如線程id,以及記錄可重入計數(即lock的次數)
2,鎖超時避免死鎖:需要對redis key設置超時淘汰,但是多長時間淘汰,是個問題,后面討論
3,容錯性,高可用,高性能:基於redis集群,則暫時不用考慮
4,獲取不到鎖的阻塞線程,需要被及時喚醒:可以基於redis發布訂閱,用來通知等待鎖的線程

如上設計需要解決的兩個問題

1,用記錄獲取鎖的線程id,來實現互斥以及判斷可重入,但是在linux下線程id,是不保證唯一的,更何況還要跨主機進程,這個也是需要好好思考的問題
2,避免死鎖,需要設置鎖超時,那設置多長時間合適?以及設置超時並獲取鎖之后,在java語言下,還存在因為gc,工作線程會暫定運行,即會存在STW (stop the world)問題導致多線程同時獲取鎖的異常情況的解決辦法

java gc STW (stop the word)導致的鎖過期問題

1,工作線程1,獲取鎖,並設置了超時淘汰時長
2,jvm gc垃圾回收時,會暫停工作線程,即STW
3,當工作線程1恢復工作的時候,由於STW的時長稍長,可能鎖已經超時淘汰了,但是該線程還不知道,此時工作線程2去獲取,也是能獲取到的,導致出現多個線程獲取同一個鎖的異常問題,如下圖所示

大概的解決方案,有:

1: 模擬CAS樂觀鎖的方式,增加版本號,如下圖
2: watch dog自動延期機制,在后面介紹RedissionLock時會介紹

注意:單機版的watch dog 並不能解決 STW的過期問題, 需要分布式版本的 watch dog, 獨立的看門狗服務。因為單機版,仍然受gc STW影響。這個問題特別容易被忽略,盡管gc STW時間通常不會太長,至少理論上需要考慮。 鎖刪除之后, 取消看門狗服務的對應的key記錄, 當然,這就使得系統變得復雜, 還要保證看門狗服務的高並發、高可用、數據一致性的問題。

RedissionLock 大致框架邏輯圖

鎖實現的主要數據結構

1,基於hash數據結構,hash key=lockName,保存了具有唯一性的threadId(例如uuid:threadId結構)和可重入計數,該hash下有且只有一個子元素,hincrby lockName uniqueThreadId,1
2,對lockName設置超時淘汰,pexpire lockName expireTime ,一般30秒超時淘汰,或者自己unlock及時釋放
3,基於發布訂閱實現watch dog:pulish redisson_lock__channel:{lockName} LockPubSub.UNLOCK_MESSAGE

RedissionLock.lock

1,鎖不存在,則設置鎖,線程id,和超時淘汰
2,鎖存在,可重入,則可重入計數增1
3,獲取了鎖,則添加watch dog定時線程定時刷新超時淘汰時間,默認每10秒執行一次
4,沒獲取到鎖,則訂閱鎖釋放通知,等待通知

   private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {//是當前線程獲取到鎖了,並且會重新設置超時淘汰時間和自增可重入次數
            return;
        }
        //加鎖操作失敗,訂閱消息,利用 redis 的 pubsub 提供一個通知機制來減少不斷的重試,避免發生活鎖。
        //活鎖:是指線程 1 可以使用資源,但它很禮貌,讓其他線程先使用資源,線程 2 也可以使用資源,但它很紳士,也讓其他線程先使用資源。這樣你讓我,我讓你,最后兩個線程都無法使用資源

        //在lock中執行訂閱,unlock-> unlockInnerAsync 則發布publish redisson_lock__channel:{lockName} LockPubSub.UNLOCK_MESSAGE=0 通知等待的線程去獲取鎖

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                //當前線程沒有拿到鎖,先在上面訂閱了channel key=redisson_lock__channel:{lockName}
                //在如下則,等待在拿到鎖的線程unlock之后可重入計數=0時,
                //會pulish redisson_lock__channel:{lockName} LockPubSub.UNLOCK_MESSAGE 一個信息,則喚醒等待獲取鎖的線程去tryAcquire

                if (ttl >= 0) {
                    try {
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

     private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }

    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime != -1) {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,//key超時淘汰 30秒=30*1000
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    scheduleExpirationRenewal(threadId);//添加watch dog
                }
            }
        });
        return ttlRemainingFuture;
    } 

加鎖lua腳本

1,邏輯調用鏈:lock->tryAcquire->tryLockInnerAsync

/**
	 * 1,如果key(lockName)不存在,(key=getLock中參數name),則 hincrby key hash-key,hash-value
	 * 2,設置key毫秒超時淘汰
	 *
	 * 3,如果key存在,並且包含元素hash-key,執行如上1和2,hash-key=commandExecutor.getConnectionManager().getId()+":"+threadId
	 *
	 * 4,如果key存在,並且不包含元素hash-key,則獲取超時淘汰時間長
	 * @param waitTime
	 * @param leaseTime
	 * @param unit
	 * @param threadId
	 * @param command
	 * @param <T>
	 * @return
	 */
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }


lua腳本保證原子性,lua字段解釋

1,KEYS[1]:表示你加鎖的那個key,比如說RLock lock = redisson.getLock(“myLock”);這里你自己設置了加鎖的那個鎖key就是“myLock”。
2,ARGV[1]:表示鎖的有效期,默認30s
3,ARGV[2]:表示表示加鎖的線程ID,類似於:8743c9c0-0795-4907-87fd-6c719a6b4586:1,大致是uuid:threadId,用來避免threadId重復,又可以實現互斥和可重入判斷

RedissionLock.unlock

1,可重入計數-1,當可重入計數=0時,則發布一條消息用於通知等待的線程
2,刪除watch dog

  public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
    }
  public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.onComplete((opStatus, e) -> {
            cancelExpirationRenewal(threadId);//刪除watch dog

            if (e != null) {
                result.tryFailure(e);
                return;
            }

            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }

            result.trySuccess(null);
        });

        return result;
    }

lua解鎖腳本

/**
	 * 1,如果不存在key 和hash-key,則直接退出
	 * 2,存在key和hash-key,則可重入計數減一
	 * 3,當可重入計數還大於0,則更新一下淘汰時長,還需要繼續被同一個線程解鎖,因為同一個線程下可重入鎖被多次lock
	 * 4,如果可重入計數等於0,則可重入可以徹底釋放了,則刪除這個key,並發布訂閱,讓之前等待該鎖的線程進入
	 * @param threadId
	 * @return
	 */
	protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " + ////在lock中有訂閱這個可以,unlock 則publish redisson_lock__channel:{lockName} LockPubSub.UNLOCK_MESSAGE=0
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

watch dog 邏輯

1,watchDog添加邏輯:lock->tryAcquire->tryAcquireAsync->獲取鎖之后執行scheduleExpirationRenewal(創建一個定時任務,並把這個定時任務加入EXPIRATION_RENEWAL_MAP)->{lua腳本:存在,則重試淘汰時長,默認30秒}
2,創建HashedWheelTimer.HashedWheelTimeout,internalLockLeaseTime / 3間隔循環執行,默認是30/3=10秒,去刷新超時淘汰時間
3,watchDog刪除邏輯:unlock->unlockAsync-> cancelExpirationRenewal(threadId){EXPIRATION_RENEWAL_MAP.remove}

注意:單機版的watch dog 並不能解決 STW的過期問題, 需要分布式版本的 watch dog, 獨立的看門狗服務。因為單機版,仍然受gc STW影響。這個問題特別容易被忽略,盡管gc STW時間通常不會太長,至少理論上需要考慮

 protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }


 private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    } else {
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }


protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

設計結構上最大的問題:

1,異步復制,master和slave不一致,master宕機,slave則沒有該鎖記錄,導致多端獲取該鎖:就是如果你對某個redis master實例,寫入了myLock這種鎖key的value,此時會 異步復制 給對應的master slave實例。但是這個過程中一旦發生redis master宕機,主備切換,redis slave變為了redis master。接着就會導致,客戶端2來嘗試加鎖的時候,在新的redis master上完成了加鎖,而客戶端1也以為自己成功加了鎖。此時就會導致多個客戶端對一個分布式鎖完成了加鎖。這時系統在業務上一定會出現問題,導致臟數據的產生。所以這個就是redis cluster,或者是redis master-slave架構的主從異步復制導致的redis分布式鎖的最大缺陷:在redis master實例宕機的時候,可能導致多個客戶端同時完成加鎖
2,需要設計分布式的watch dog機制以避免GC STW問題,這個問題,在redission中,目前待后面閱讀補充

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM