Redisson分布式鎖剖析


Redisson是具備多種內存數據網格特性的基於Java編寫的Redis客戶端框架(Redis Java Client with features of In-Memory Data Grid),基於Redis的基本數據類型擴展出很多種實現的高級數據結構,具體見其官方的簡介圖。本文要分析的R(ed)Lock實現,只是其中一個很小的模塊,其他高級特性可以按需選用。下面會從基本原理、源碼分析等內容進行展開。

基本原理

從Redis官網 文檔中,摘錄部分:在許多環境中不同進程必須以互斥方式使用共享資源進行操作時,分布式鎖是一個非常有用的原語。此試圖提供一種更規范的算法來實現Redis的分布式鎖。我們提出了一種稱為Redlock的算法,它實現了DLM(Distributed Lock Manager),我們認為它比普通的單實例方法更安全。算法的三個核心特征(三大最低保證):
  • Safety property(安全性):互斥。確保在任何給定時刻下,只有一個客戶端可以持有鎖
  • Liveness property A(活性A):無死鎖。即使存在曾經鎖定資源的客戶端崩潰或者出現網絡分區異常,確保鎖總是能夠成功獲取
  • Liveness property B(活性B):容錯性。只要大多數Redis節點處於正常運行狀態,客戶端就可以獲取和釋放鎖
文檔中還指出了目前算法對於故障轉移的實現還存在明顯的競態條件問題(描述的應該是Redis主從架構下的問題):
  • 客戶端A獲取Redis主節點中的鎖(假設鎖定的資源為X)
  • 在Redis主節點把KEY同步到Redis從節點之前,Redis主節點崩潰
  • Redis從節點因為故障晉升為主節點
  • 此時,客戶端B獲取資源X的鎖成功,問題是資源X的鎖在前面已經被客戶端A獲取過,這樣就出現了並發問題
算法的實現很簡單,單個Redis實例下加鎖命令如下:
SET $resource_name $random_value NX PX $ttl
這里的Nx和PX是SET命令的增強參數,自從Redis的2.6.12版本起,SET命令已經提供了可選的復合操作符:
  • EX:設置超時時間,單位是秒
  • PX:設置超時時間,單位是毫秒
  • NX:IF NOT EXIST的縮寫,只有KEY不存在的前提下才會設置K-V,設置成功返回1,否則返回0
  • XX:IF EXIST的縮寫,只有在KEY存在的前提下才會設置K-V,設置成功返回1,否則返回0
單個Redis實例下解鎖命令如下:
# KEYS[1] = $resource_name
# ARGV[1] = $random_value
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

使用Redisson中的RLock

使用RLock要先實例化Redisson,Redisson已經適配了Redis的哨兵、集群、普通主從和單機模式。這里使用單機模式配置進行演示。實例化RedissonClient:
static RedissonClient REDISSON;

@BeforeClass
public static void beforeClass() throws Exception {
    Config config = new Config();
    // 單機
    config.useSingleServer()
            .setTimeout(10000)
            .setAddress("redis://127.0.0.1:6379");
    REDISSON = Redisson.create(config);
//        // 主從
//        config.useMasterSlaveServers()
//                .setMasterAddress("主節點連接地址")
//                .setSlaveAddresses(Sets.newHashSet("從節點連接地址"));
//        REDISSON = Redisson.create(config);
//        // 哨兵
//        config.useSentinelServers()
//                .setMasterName("Master名稱")
//                .addSentinelAddress(new String[]{"哨兵連接地址"});
//        REDISSON = Redisson.create(config);
//        // 集群
//        config.useClusterServers()
//                .addNodeAddress(new String[]{"集群節點連接地址"});
//        REDISSON = Redisson.create(config);
}
加鎖和解鎖:
@Test
public void testLockAndUnLock() throws Exception {
    String resourceName = "resource:x";
    RLock lock = REDISSON.getLock(resourceName);
    Thread threadA = new Thread(() -> {
        try {
            lock.lock();
            process(resourceName);
        } finally {
            lock.unlock();
            System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
        }
    }, "threadA");
    Thread threadB = new Thread(() -> {
        try {
            lock.lock();
            process(resourceName);
        } finally {
            lock.unlock();
            System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
        }
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}

private void process(String resourceName) {
    String threadName = Thread.currentThread().getName();
    System.out.println(String.format("線程%s獲取到資源%s的鎖", threadName, resourceName));
    try {
        Thread.sleep(1000);
    } catch (InterruptedException ignore) {
    }
}

// 某次執行的輸出結果
線程threadB獲取到資源resource:x的鎖
線程threadB釋放資源resource:x的鎖
線程threadA獲取到資源resource:x的鎖
線程threadA釋放資源resource:x的鎖
更多的時候,會選用帶等待時間周期和鎖最大持有時間的API:
@Test
public void testTryLockAndUnLock() throws Exception {
    String resourceName = "resource:x";
    int waitTime = 500;
    int leaseTime = 1000;
    Thread threadA = new Thread(() -> {
        process(resourceName, waitTime, leaseTime);
    }, "threadA");
    Thread threadB = new Thread(() -> {
        process(resourceName, waitTime, leaseTime);
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}

private void process(String resourceName, int waitTime, int leaseTime) {
    RLock lock = REDISSON.getLock(resourceName);
    try {
        String threadName = Thread.currentThread().getName();
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                System.out.println(String.format("線程%s獲取到資源%s的鎖", threadName, resourceName));
                Thread.sleep(800);
            } finally {
                lock.unlock();
                System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
            }
        } else {
            System.out.println(String.format("線程%s獲取資源%s的鎖失敗,等待時間:%d ms", threadName, resourceName, waitTime));
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
// 某次執行的輸出結果
線程threadA獲取到資源resource:x的鎖
線程threadB獲取資源resource:x的鎖失敗,等待時間:500 ms
線程threadA釋放資源resource:x的鎖
為了使用的時候更加簡單,可以參考spring-tx中的編程式事務那樣進行輕度封裝:
@RequiredArgsConstructor
private static class RedissonLockProvider {

    private final RedissonClient redissonClient;

    public <T> T executeInLock(String resourceName, LockAction lockAction) {
        RLock lock = redissonClient.getLock(resourceName);
        try {
            lock.lock();
            lockAction.onAcquire(resourceName);
            return lockAction.doInLock(resourceName);
        } finally {
            lock.unlock();
            lockAction.onExit(resourceName);
        }
    }

    public <T> T executeInLock(String resourceName, int waitTime, int leaseTime, LockAction lockAction) throws InterruptedException {
        RLock lock = redissonClient.getLock(resourceName);
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                lockAction.onAcquire(resourceName);
                return lockAction.doInLock(resourceName);
            } finally {
                lock.unlock();
                lockAction.onExit(resourceName);
            }
        }
        return null;
    }

    public void executeInLockWithoutResult(String resourceName, int waitTime, int leaseTime, LockActionWithoutResult lockAction) throws InterruptedException {
        RLock lock = redissonClient.getLock(resourceName);
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                lockAction.onAcquire(resourceName);
                lockAction.doInLock(resourceName);
            } finally {
                lock.unlock();
                lockAction.onExit(resourceName);
            }
        }
    }

    public void executeInLockWithoutResult(String resourceName, LockActionWithoutResult lockAction) {
        RLock lock = redissonClient.getLock(resourceName);
        try {
            lock.lock();
            lockAction.onAcquire(resourceName);
            lockAction.doInLock(resourceName);
        } finally {
            lock.unlock();
            lockAction.onExit(resourceName);
        }
    }
}

@FunctionalInterface
interface LockAction {

    default void onAcquire(String resourceName) {

    }

    <T> T doInLock(String resourceName);

    default void onExit(String resourceName) {

    }
}

@FunctionalInterface
interface LockActionWithoutResult {

    default void onAcquire(String resourceName) {

    }

    void doInLock(String resourceName);

    default void onExit(String resourceName) {

    }
}
使用RedissonLockProvider(僅供參考):
@Test
public void testRedissonLockProvider() throws Exception {
    RedissonLockProvider provider = new RedissonLockProvider(REDISSON);
    String resourceName = "resource:x";
    Thread threadA = new Thread(() -> {
        provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {

            @Override
            public void onAcquire(String resourceName) {
                System.out.println(String.format("線程%s獲取到資源%s的鎖", Thread.currentThread().getName(), resourceName));
            }

            @Override
            public void doInLock(String resourceName) {
                try {
                    Thread.sleep(800);
                } catch (InterruptedException ignore) {

                }
            }

            @Override
            public void onExit(String resourceName) {
                System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
            }
        });
    }, "threadA");
    Thread threadB = new Thread(() -> {
        provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {

            @Override
            public void onAcquire(String resourceName) {
                System.out.println(String.format("線程%s獲取到資源%s的鎖", Thread.currentThread().getName(), resourceName));
            }

            @Override
            public void doInLock(String resourceName) {
                try {
                    Thread.sleep(800);
                } catch (InterruptedException ignore) {

                }
            }

            @Override
            public void onExit(String resourceName) {
                System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName));
            }
        });
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}
// 某次執行結果
線程threadA獲取到資源resource:x的鎖
線程threadA釋放資源resource:x的鎖
線程threadB獲取到資源resource:x的鎖
線程threadB釋放資源resource:x的鎖

Redisson中RLock的實現原理

Redisson中RLock的實現是基本參照了Redis的red lock算法進行實現,不過在原始的red lock算法下進行了改良,主要包括下面的特性:
  • 互斥
  • 無死鎖
  • 可重入,類似於ReentrantLock,同一個線程可以重復獲取同一個資源的鎖(一般使用計數器實現),鎖的重入特性一般情況下有利於提高資源的利用率
  • 續期,這個是一個比較前衛解決思路,也就是如果一個客戶端對資源X永久鎖定,那么並不是直接對KEY生存周期設置為-1,而是通過一個守護線程每隔固定周期延長KEY的過期時間,這樣就能實現在守護線程不被殺掉的前提下,避免客戶端崩潰導致鎖無法釋放長期占用資源的問題
  • 鎖狀態變更訂閱,依賴於org.redisson.pubsub.LockPubSub,用於訂閱和通知鎖釋放事件
  • 不是完全參考red lock算法的實現,數據類型選用了HASH,配合Lua腳本完成多個命令的原子性
續期或者說延長KEY的過期時間在Redisson使用watch dog實現,理解為用於續期的守護線程,底層依賴於Netty的時間輪HashedWheelTimer和任務io.netty.util.Timeout實現,俗稱看門狗,下面會詳細分析。先看RLock的類圖:
0
這里有一個疑惑點,RedissonRedLock(RedissonMultiLock的子類)的注釋中提到RedLock locking algorithm implementation for multiple locks. It manages all locks as one. 但從直觀上看,RedissonLock才是整個鎖體系的核心,里面的實現思路也是遵從red lock算法的。
RedissonLock就是RLock的直接實現,也是分布式鎖實現的核心類,從源碼中看到Redisson#getLock()就是直接實例化RedissonLock。
public class Redisson implements RedissonClient {
    
    // ...... 省略其他代碼

    @Override
    public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }

    // ...... 省略其他代碼
}
因此只需要圍繞RedissonLock的源碼進行分析即可。RedissonLock的類繼承圖如下:
0
這里需要有幾點認知:
  • RedissonLock實現了java.util.concurrent.locks.Lock接口中除了newCondition()方法外的所有方法,也就是可以基本無縫適配Lock接口,對於習慣Lock接口的API的使用者來說是一個福音。
  • RedissonLock基本所有同步API都依賴於異步API的實現,也就是RLock的實現依賴於RLockAsync的實現,底層依賴的是Netty的io.netty.util.concurrent.Promise,具體見RedissonPromise,如果用過JUC中的Future的開發者應該比較熟悉Future#get(),這里的做法類似。

右邊的幾個父類的簡單功能描述如下:

  • RObjectAsync:所有Redisson對象的基礎接口,提供一些內存測量、對象拷貝、移動等的異步方法
  • RObject:RObjectAsync的同步版本
  • RExpirableAsync:提供對象TTL相關的異步方法
  • RExpirable:RExpirableAsync的同步版本
  • RedissonObject:直接實現類RObject接口中的方法
  • RedissonExpirable:主要是實現了RExpirable接口中的方法
接着先看RedissonLock的構造函數和核心屬性:
// 存放entryName -> ExpirationEntry,用於獲取當前entryName的線程重入計數器和續期任務
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();

// 內部的鎖持有的最大時間,來源於參數Config#lockWatchdogTimeout,用於控制續期的周期
protected long internalLockLeaseTime;

// ID,唯一標識,是一個UUID
final String id;

// 
final String entryName;

// 鎖釋放事件訂閱發布相關
protected final LockPubSub pubSub;

// 命令異步執行器實例
final CommandAsyncExecutor commandExecutor;

/**
 * CommandAsyncExecutor是命令的異步執行器,里面的方法是相對底層的面向通訊框架的方法,包括異步寫、異步讀和同步結果獲取等
 * name參數就是getLock()時候傳入的參數,其實就是最終同步到Redis中的KEY
 */
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    // 這里的ID為外部初始化的UUID實例,調用toString()
    this.id = commandExecutor.getConnectionManager().getId();
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    // 這里的entryName = uuid值 + : + 外部傳進來的name(KEY),如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
    this.entryName = id + ":" + name;
    // 初始化LockPubSub實例,用於訂閱和發布鎖釋放的事件
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

// RedissonLock內部類ExpirationEntry,存放着線程重入的計數器和續期的Timeout任務
public static class ExpirationEntry {
    
    // 線程ID -> 線程重入的次數
    private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
    private volatile Timeout timeout;
    
    public ExpirationEntry() {
        super();
    }
    
    // 這個方法主要記錄線程重入的計數
    public void addThreadId(long threadId) {
        Integer counter = threadIds.get(threadId);
        if (counter == null) {
            counter = 1;
        } else {
            counter++;
        }
        threadIds.put(threadId, counter);
    }

    public boolean hasNoThreads() {
        return threadIds.isEmpty();
    }

    public Long getFirstThreadId() {
        if (threadIds.isEmpty()) {
            return null;
        }
        return threadIds.keySet().iterator().next();
    }

    public void removeThreadId(long threadId) {
        Integer counter = threadIds.get(threadId);
        if (counter == null) {
            return;
        }
        counter--;
        if (counter == 0) {
            threadIds.remove(threadId);
        } else {
            threadIds.put(threadId, counter);
        }
    }
    
    public void setTimeout(Timeout timeout) {
        this.timeout = timeout;
    }
    public Timeout getTimeout() {
        return timeout;
    }
}
這里需要關注一下Config中的lockWatchdogTimeout參數:
0
翻譯一下大意:lockWatchdogTimeout參數只有在沒有使用leaseTimeout參數定義的成功獲取到鎖的場景(簡單來說就是不設置時限的加鎖)下生效,如果看門狗在下一個lockWatchdogTimeout周期內不進行續期,那么鎖就會過期釋放(從源碼上看,每三分之一lockWatchdogTimeout就會執行一次續期任務,每次通過pexpire把KEY的存活周期延長lockWatchdogTimeout),lockWatchdogTimeout的默認值為30000,也就是30秒。
這里先列舉一下RedissonLock中獲取名稱的方法,以便后面分析這些名稱作為K-V結構的KEY時候使用:
  • id:由配置實例化時候實例化的UUID實例生成,從源碼上分析每個連接方式的Redisson實例有唯一的UUID,ConnectionManager初始化的時候會調用UUID id = UUID.randomUUID(),可以理解為Redisson實例在某個應用程序進程中的唯一標識,畢竟一般情況下,一個應用程序應該只會應用一種Redisson的連接方式
  • getEntryName():返回的是UUID + : + $KEY,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
  • getName():返回的是$KEY,例如resource:x
  • getChannelName():返回的是redisson_lock__channel:{$KEY},例如redisson_lock__channel:{resource:x}
  • getLockName(long threadId):返回的是UUID + : + $threadId,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
接着看加鎖的方法,核心實現主要是:
  • private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException:lock方法體系
  • public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException:tryLock方法體系
先看只包含鎖最大持有時間的lock()方法體系:
/**
 * 獲取鎖,不指定等待時間,只指定鎖的最大持有時間
 * 通過interruptibly參數配置支持中斷
 */
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    // 嘗試獲取鎖,返回的ttl為空代表獲取鎖成功,返回的ttl代表已經存在的KEY的剩余存活時間
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }
    // 訂閱redisson_lock__channel:{$KEY},其實本質的目的是為了客戶端通過Redis的訂閱發布,感知到解鎖的事件
    // 這個方法會在LockPubSub中注冊一個entryName -> RedissonLockEntry的哈希映射,
   // RedissonLockEntry實例中存放着RPromise<RedissonLockEntry>結果,一個信號量形式的鎖和訂閱方法重入計數器
// 下面的死循環中的getEntry()或者RPromise<RedissonLockEntry>#getNow()就是從這個映射中獲取的 RFuture<RedissonLockEntry> future = subscribe(threadId); // 同步訂閱執行,獲取注冊訂閱Channel的響應,區分是否支持中斷 if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } // 走到下面的for循環說明返回的ttl不為空,也就是Redis已經存在對應的KEY,
// 有其他客戶端已經獲取到鎖,此客戶端線程的調用需要阻塞等待獲取鎖
try { while (true) { // 死循環中嘗試獲取鎖,這個是后面會分析的方法 ttl = tryAcquire(leaseTime, unit, threadId); // 返回的ttl為空,說明獲取到鎖,跳出死循環,這個死循環或者拋出中斷異常,或者獲取到鎖成功break跳出,沒有其他方式 if (ttl == null) { break; } // 這個ttl來源於等待存在的鎖的KEY的存活時間,直接使用許可為0的信號量進行阻塞等待,下面的幾個分支判斷都是大同小異,只是有的支持超時時間,有的支持中斷 // 有的是永久阻塞直到鎖釋放事件訂閱LockPubSub的onMessage()方法回調激活getLatch().release()進行解鎖才會往下走 // 這里可以學到一個特殊的技巧,Semaphore(0),信號量的許可設置為0,首個調用acquire()的線程會被阻塞,
// 直到其他線程調用此信號量的release()方法才會解除阻塞,類似於一個CountDownLatch(1)的效果
if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { // 獲取到鎖或者拋出中斷異常,退訂redisson_lock__channel:{$KEY},不再關注解鎖事件 unsubscribe(future, threadId); } } // 這是一個異步轉同步的方法,類似於FutureTask#get(),關鍵看調用的tryAcquireAsync()方法 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } /** * 通過傳入鎖持有的最大時間和線程ID異步獲取鎖 */ private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { // 鎖持有最大時間不為-1,也就是明確鎖的持有時間,不是永久持有的場景 if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 走到這里說明是leaseTime == -1,KEY不設置過期時間的分支,需要啟動看門狗機制。嘗試內部異步獲取鎖,
// 注意這里的lockWatchdogTimeout是從配置中獲取傳進去,不是內部的internalLockLeaseTime屬性,這里的默認值還是30000毫秒
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                   TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e)
-> { // 執行異常場景直接返回 if (e != null) { return; } // 成功獲取到鎖的場景,需要基於線程ID啟用看門狗,通過時間輪指定定時任務進行續期 if (ttlRemaining == null) { // 定時調度進行續期操作 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; } /** * 轉換鎖持有最大時間,通過參數進行加鎖的LUA腳本調用 * getName()就是傳入的KEY,如resource:x getLockName()就是鎖的名稱,形式是:UUID + : + threadId,如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1 * internalLockLeaseTime在leaseTime != -1的前提下使用的是原值,在leaseTime == -1的前提下,使用的是lockWatchdogTimeout */ <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // 時間轉換為毫秒,注意一點這里的internalLockLeaseTime是類內的屬性,被重新賦值了 internalLockLeaseTime = unit.toMillis(leaseTime); // 底層向Redis服務執行LUA腳本 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
先留意一下屬性internalLockLeaseTime,它在tryLockInnerAsync()方法內被重新賦值,在leaseTime == -1L的前提下,它被賦值為lockWatchdogTimeout,這個細節很重要,決定了后面續期方法(看門狗)的調度頻率。另外,leaseTime != -1L不會進行續期,也就是不會啟動看門狗機制。接着需要仔細分析一下tryLockInnerAsync()中執行的LUA腳本,把它提取出來通過注釋進行描述:
-- KEYS[1] == getName() --> $KEY --> resource:x
-- ARGV[1] == internalLockLeaseTime --> 30000
-- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
-- 第一段代碼是判斷鎖定的資源KEY不存在的時候進行相應值的設置,代表資源沒有被鎖定,首次獲取鎖成功
if (redis.call('exists', KEYS[1]) == 0) then
    -- 這里是設置調用次數,可以理解為延長KEY過期時間的調用次數
    redis.call('hset', KEYS[1], ARGV[2], 1);
    -- 設置KEY的過期時間
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
-- 第二段代碼是判斷HASH的field是否存在,如果存在說明是同一個線程重入的情況,這個時候需要延長KEY的TTL,並且HASH的field對應的value加1,記錄延長ttl的次數
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    -- 這里是增加調用次數,可以理解為增加延長KEY過期時間的調用次數
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    -- 延長KEY的過期時間
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
-- 第三段代碼是兜底的,走到這里說明當前線程獲取鎖失敗,鎖已經被其他(進程中的)線程占有,返回當前KEY被占用資源的ttl,用來確定需要休眠的最大時間
return redis.call('pttl', KEYS[1]);
這里畫一個圖演示一下這個Lua腳本中三段代碼出現的邏輯:
0
剩下一個scheduleExpirationRenewal(threadId)方法還沒有分析,里面的邏輯就是看門狗的定期續期邏輯:
// 基於線程ID定時調度和續期
private void scheduleExpirationRenewal(long threadId) {
    // 如果需要的話新建一個ExpirationEntry記錄線程重入計數,同時把續期的任務Timeout對象保存在屬性中
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        // 當前進行的當前線程重入加鎖
        oldEntry.addThreadId(threadId);
    } else {
        // 當前進行的當前線程首次加鎖
        entry.addThreadId(threadId);
        // 首次新建ExpirationEntry需要觸發續期方法,記錄續期的任務句柄
        renewExpiration();
    }
}

// 處理續期
private void renewExpiration() {
    // 根據entryName獲取ExpirationEntry實例,如果為空,說明在cancelExpirationRenewal()方法已經被移除,一般是解鎖的時候觸發
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 新建一個定時任務,這個就是看門狗的實現,io.netty.util.Timeout是Netty結合時間輪使用的定時任務實例
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            // 這里是重復外面的那個邏輯,
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            // 獲取ExpirationEntry中首個線程ID,如果為空說明調用過cancelExpirationRenewal()方法清空持有的線程重入計數,一般是鎖已經釋放的場景
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // 向Redis異步發送續期的命令
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                // 拋出異常,續期失敗,只打印日志和直接終止任務
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                // 返回true證明續期成功,則遞歸調用續期方法(重新調度自己),續期失敗說明對應的鎖已經不存在,直接返回,不再遞歸
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, 
    // 這里的執行頻率為leaseTime轉換為ms單位下的三分之一,由於leaseTime初始值為-1的情況下才會進入續期邏輯,那么這里的執行頻率為lockWatchdogTimeout的三分之一
    internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); 
    
    // ExpirationEntry實例持有調度任務實例
    ee.setTimeout(task);
}

// 調用Redis,執行Lua腳本,進行異步續期
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()), 
        //  這里根據前面的分析,internalLockLeaseTime在leaseTime的值為-1的前提下,對應值為lockWatchdogTimeout
        internalLockLeaseTime, getLockName(threadId));  
}
基於源碼推斷出續期的機制由入參leaseTime決定:
  • 當leaseTime == -1的前提下(一般是lock()和lockInterruptibly()這類方法調用),續期任務的調度周期為lockWatchdogTimeout / 3,鎖的最大持有時間(KEY的過期時間)被刷新為lockWatchdogTimeout
  • 當leaseTime != -1的前提下(一般是lock(long leaseTime, TimeUnit unit)和lockInterruptibly(long leaseTime, TimeUnit unit)這類方法調用指定leaseTime不為-1),這種情況下會直接設置鎖的過期時間為輸入值轉換為ms單位的時間量,不會啟動續期機制
提取續期的Lua腳本如下:
-- KEYS[1] == getName() --> $KEY --> resource:x
-- ARGV[1] == internalLockLeaseTime --> 30000
-- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end;
return 0;
到此為止,不帶waitTime參數的加鎖和續期邏輯基本分析完畢,而帶waitTime參數的tryLock(long waitTime, long leaseTime, TimeUnit unit)實現其實和只存在leaseTime參數的lock(long leaseTime, TimeUnit unit, boolean interruptibly)實現底層調用的方法是一致的,最大的區別是會在嘗試獲取鎖操作之后基於前后的System.currentTimeMillis()計算出時間差和waitTime做對比,決定需要阻塞等待還是直接超時獲取鎖失敗返回,處理阻塞等待的邏輯是客戶端本身的邏輯,這里就不做詳細展開,因為源碼實現也不是十分優雅(太多long currentTime = System.currentTimeMillis()的代碼段了)。接着花點功夫分析一下解鎖的實現,包括一般情況下的解鎖unlock()和強制解鎖forceUnlockAsync():
//  一般情況下的解鎖
@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        // IllegalMonitorStateException一般是A線程加鎖,B線程解鎖,內部判斷線程狀態不一致拋出的
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

@Override
public RFuture<Void> unlockAsync() {
    // 獲取當前調用解鎖操作的線程ID
    long threadId = Thread.currentThread().getId();
    return unlockAsync(threadId);
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
    // 構建一個結果RedissonPromise
    RPromise<Void> result = new RedissonPromise<Void>();
    // 返回的RFuture如果持有的結果為true,說明解鎖成功,返回NULL說明線程ID異常,加鎖和解鎖的客戶端線程不是同一個線程
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
        // 這是內部的異常,說明解鎖異常,需要取消看門狗的續期任務
        if (e != null) {
            cancelExpirationRenewal(threadId);
            result.tryFailure(e);
            return;
        }
        // 這種情況說明線程ID異常,加鎖和解鎖的客戶端線程不是同一個線程,拋出IllegalMonitorStateException異常
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }
        // 走到這里說明正常解鎖,取消看門狗的續期任務
        cancelExpirationRenewal(threadId);
        result.trySuccess(null);
    });
    return result;
}

// 真正的內部解鎖的方法,執行解鎖的Lua腳本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

// 取消續期任務
void cancelExpirationRenewal(Long threadId) {
    // 這里說明ExpirationEntry已經被移除,一般是基於同一個線程ID多次調用解鎖方法導致的(並發解鎖)
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    // 傳入的線程ID不為NULL,從ExpirationEntry中移除線程ID,如果持有的線程ID對應的線程重入計數不為0,會先遞減到0,等於0的前提下才會進行刪除
    if (threadId != null) {
        task.removeThreadId(threadId);
    }
    // 這里threadId == null的情況是為了滿足強制解鎖的場景,強制解鎖需要直接刪除鎖所在的KEY,不需要理會傳入的線程ID(傳入的線程ID直接為NULL)
    // 后者task.hasNoThreads()是為了說明當前的鎖沒有被任何線程持有,對於單線程也確定在移除線程ID之后重入計數器已經為0,
// 從ExpirationEntry中移除,這個時候獲取ExpirationEntry的任務實例進行取消即可
if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { timeout.cancel(); } // EntryName -> ExpirationEntry映射中移除當前鎖的相關實例ExpirationEntry EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } } // 強制解鎖 @Override public boolean forceUnlock() { return get(forceUnlockAsync()); } @Override public RFuture<Boolean> forceUnlockAsync() { // 線程ID傳入為NULL,取消當前的EntryName對應的續期任務 cancelExpirationRenewal(null); // 執行Lua腳本強制刪除鎖所在的KEY並且發布解鎖消息 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE); }
這里列出一般情況下解鎖和強制解鎖的Lua腳本,分析如下:
-- unlockInnerAsync方法的lua腳本
-- KEYS[1] == getName() --> $KEY --> resource:x
-- KEYS[2] == getChannelName() --> 訂閱鎖的Channel --> redisson_lock__channel:{resource:x}
-- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量數值0
-- ARGV[2] == internalLockLeaseTime --> 30000或者具體的鎖最大持有時間
-- ARGV[3] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
-- 第一個IF分支判斷如果鎖所在的哈希的field不存在,說明當前線程ID未曾獲取過對應的鎖,返回NULL表示解鎖失敗
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;
-- 走到這里通過hincrby進行線程重入計數-1,返回計數值
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 計數值大於0,說明線程重入加鎖,這個時候基於internalLockLeaseTime對鎖所在KEY進行續期
if (counter > 0) then
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    -- 計數值小於或等於0,說明可以解鎖,刪除鎖所在的KEY,並且向redisson_lock__channel:{$KEY}發布消息,內容是0(常量數值)
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
-- 最后的return nil;在IDEA中提示是不會到達的語句,估計這里是開發者筆誤寫上去的,前面的if-else都有返回語句,這里應該是不可達的
return nil;

-------------------------------------------------- 不怎么華麗的分割線 -------------------------------------------------

-- forceUnlockAsync方法的lua腳本
-- KEYS[1] == getName() --> $KEY --> resource:x
-- KEYS[2] == getChannelName() --> 訂閱鎖的Channel --> redisson_lock__channel:{resource:x}
-- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量數值0
-- 強制刪除鎖所在的KEY,如果刪除成功向redisson_lock__channel:{$KEY}發布消息,內容是0(常量數值)
if (redis.call('del', KEYS[1]) == 1) then
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1
else
    return 0
end
其他輔助方法都相對簡單,這里弄個簡單的"流水賬"記錄一番:
  • isLocked():基於getName()調用Redis的EXISTS $KEY命令判斷是否加鎖
  • isHeldByThread(long threadId)和isHeldByCurrentThread():基於getName()和getLockName(threadId)調用Redis的HEXISTS $KEY $LOCK_NAME命令判斷HASH中對應的field-value是否存在,存在則說明鎖被對應線程ID的線程持有
  • getHoldCount():基於getName()和getLockName(threadId)調用Redis的HGET $KEY $LOCK_NAME命令,用於獲取線程對於某一個鎖的持有量(注釋叫holds,其實就是同一個線程對某一個鎖的KEY的續期次數)
訂閱和發布部分設計到大量Netty組件使用相關的源碼,這里不詳細展開,這部分的邏輯簡單附加到后面這個流程圖中。最后,通過一個比較詳細的圖分析一下Redisson的加鎖和解鎖流程。
  • 不帶waitTime參數的加鎖流程:
0
  • 帶有waitTime參數的加鎖流程(圖右邊的流程基本不變,主要是左邊的流程每一步都要計算時間間隔):
0
  • 解鎖流程:
0
假設不同進程的兩個不同的線程X和Y去競爭資源RESOURCE的鎖,那么可能的流程如下:
0
最后再概括一下Redisson中實現red lock算法使用的HASH數據類型:
  • KEY代表的就是資源或者鎖,創建、存在性判斷,延長生存周期和刪除操作總是針對KEY進行的
  • FIELD代表的是鎖名稱lockName(),但是其實它由Redisson連接管理器實例的初始化UUID拼接客戶端線程ID組成,嚴格來說應該是獲取鎖的客戶端線程唯一標識
  • VALUE代表的是客戶端線程對於鎖的持有量,從源碼上看應該是KEY被續期的次數

小結

Redisson中的red lock實現,應用到下面的核心技術:
  • 合理應用Redis的基本數據類型HASH
  • Redis的訂閱發布
  • Lua腳本的原子性
  • Netty中的Promise實現
  • Netty中的時間輪HashedWheelTimer和對應的定時任務(HashedWheel)Timeout
  • Semaphore進行帶期限、永久或者可中斷的阻塞以及喚醒,替代CountDownLatch中的無等待期限阻塞
上面的核心技術相對合理地應用,才能實現一個高效而且容錯能力相對比較高的分布式鎖方案,但是從目前來看,Redisson仍未解決red lock算法中的故障轉移缺陷,這個有可能是Redis實現分布式鎖方案的一個底層缺陷,此方案在Redis單實例中是相對完善,一旦應用在Redis集群(普通主從、哨兵或者Cluster),有幾率會出現前文提到的節點角色切換導致多個不同客戶端獲取到同一個資源對應的鎖的問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM