Redission加鎖解鎖流程


redission分布式鎖的使用

RLock lock = redissonClient.getLock("myLock");
lock.lock();
try {
    System.out.println("aaa");
} catch (Exception e) {
    System.out.println("bbb");
} finally {
    lock.unlock();
}

獲取鎖的流程圖

加鎖代碼流程(org.redisson.RedissonLock)

public void lock() {
    try {
    //參數意義 -1 代表不自動釋放鎖,null時間單位,false加鎖期間線程被中斷將拋出異常
        lock(-1, null, false);
    } catch (InterruptedException e) {
        throw new IllegalStateException();
    }
}

 

  /**
     * 加鎖
     * @param leaseTime 鎖的使用時間,超過該時間,鎖便自動釋放
     * @param unit 時間單位
     * @param interruptibly 加鎖期間是否可以被中斷
     * @return void
     * @author liekkas 2021-02-27 15:46
     */
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    //獲取當前線程的Id
        long threadId = Thread.currentThread().getId();
        //加鎖,如果ttl為null,則說明ttl(time to live)為空,加鎖成功,否則加鎖不成功。
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        //加鎖成功,直接返回
        if (ttl == null) {
            return;
        }
        //加鎖失敗,訂閱該線程,釋放鎖的時候會發布一個消息,鎖沒有釋放的時候則會等待,直到鎖釋放的時候會執行下面的while循環,重新競爭鎖。此處是用了異步的模式。
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {
            //鎖被釋放,重新競爭鎖
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // 競爭獲取鎖成功,退出循環,不再競爭。
                if (ttl == null) {
                    break;
                }

                // 競爭獲取鎖失敗,則排隊等待所釋放,重新競爭鎖。
                if (ttl >= 0) {
                    try {
                        //利用信號量機制阻塞當前線程ttl時間,之后再重新獲取鎖,如果當前線程被中斷,則拋出                                             InterruptedException異常
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
        //競爭鎖成功后,取消訂閱該線程Id事件
            unsubscribe(future, threadId);
        }
    }

 

/**
 * 利用future模式異步返回鎖的剩余生存時間
 * @param waitTime 等待時間
 * @param leaseTime 鎖的使用時間,超過改時間,鎖便自動釋放
 * @param unit 時間單位
 * @param threadId 當前線程Id
 * @return 鎖的剩余生存時間
 * @author liekkas 2021-02-27 15:46
 */
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

 

/**
 * 
 * @param waitTime 等待時間
 * @param leaseTime 鎖的使用時間,超過改時間,鎖便自動釋放
 * @param unit 時間單位
 * @param threadId 當前線程Id
 * @return 鎖的剩余生存時間
 * @author liekkas 2021-02-27 15:46
 */
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    //如果鎖的租用時間不為-1,則直接獲取鎖,到時自動釋放鎖。
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
     //如果鎖的租用時間為-1,則利用默認的租用時間30s,獲取鎖
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                            commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // 獲取鎖成功,啟用watchDog,自動為鎖延長壽命,保證該鎖被其持有者釋放。
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

lua腳本加鎖流程圖

 

 

 

/**
 * 使用lua腳本加鎖
 * @param waitTime 等待時間
 * @param leaseTime 鎖的使用時間,超過改時間,鎖便自動釋放
 * @param unit 時間單位
 * @param threadId 當前線程Id
 * @param command 執行lua腳本命令
 * @return 鎖的剩余生存時間
 * @author liekkas 2021-02-27 15:46
 */
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
   /**
     * nil相當於java的null,后面描述為了方便就用null代替nil
     * KEYS[1]:鎖的名稱,ARGV[1]:鎖的過期時間,ARGV[2]:線程Id標識名稱
     * lua腳本的邏輯:
     * 首先判斷KEYS[1]是否存在,
     *    如果不存在,則利用hincrby命令把KEYS[1]的ARGV[2]的值加一,並且設置KEYS[1]的
     *       的過期時間為ARGV[1],返回null.
     *    如果KEYS[1]存在,則利用hexists判斷KEYS[1]的ARGV[2]是否存在,
     *    如果存在,則利用hincrby命令把KEYS[1]的ARGV[2]的值加一,並且設置KEYS[1]的
     *       的過期時間為ARGV[1],返回KEYS[1]的存活時間。
     */
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

鎖的釋放流程

lua腳本釋放鎖的流程圖

 

 

 

@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }

 

public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    //釋放鎖
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        //取消鎖的延續時間
        cancelExpirationRenewal(threadId);

        if (e != null) {
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}

 

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
   /**
        * nil相當於java的null,后面描述為了方便就用null代替nil
     * KEYS[1]:鎖的名稱,KEYS[2]:發布訂閱消息的管道名稱,
     * ARGV[1]:發布的消息內容,ARGV[2]:鎖的過期時間,ARGV[3]:線程Id標識名稱
     * lua腳本的邏輯:
     * 首先判斷KEYS[1]的ARGV[3]是否存在,
     *    如果不存在,直接返回null,確保鎖只能被持有的釋放.
     *       如果存在,則KEYS[1]的ARGV[3]的值減一,
     *    如果ARGV[3]仍然大於0,由於是可重入鎖,則說明該線程仍然持有該鎖,
     *    重新設置過期時間,返回0結束。
     *    如果ARGV[3]小於等於於0,則刪除KEYS[1],並發布KEYS[2],ARGV[1]消息,該鎖已釋放。返回1結束。
     *    如果存在,則利用hincrby命令把KEYS[1]的ARGV[2]的值加一,並且設置KEYS[1]的
     *       的過期時間為ARGV[1],返回KEYS[1]的存活時間。
     */
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

總結

以上就是redission的加鎖和釋放鎖的流程,從閱讀源碼的過程中也學到了很多設計思想,對以后的編碼大有裨益!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM