Redisson源碼學習之RedissonFairLock


博客待整理,先只是把源碼看了....
后面不再備注redis中的命令含義了,這樣備注寫太多了不好閱讀.

package org.redisson;

import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;

import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandExecutor;
import org.redisson.pubsub.LockPubSub;

/**
 * Distributed implementation of {@link java.util.concurrent.locks.Lock}
 * Implements reentrant lock.<br>
 * Lock will be removed automatically if client disconnects.
 * <p>
 * Implements a <b>fair</b> locking so it guarantees an acquire order by threads.
 *
 * @author Nikita Koksharov
 *
 */
/** vergilyn mark: <br/>
 * remove stale threads: 為什么不寫成公共的? <br/>
 * redisson是如何解決服務器之間時間不同步的: 因為{Redisson.UUID}, 不同服務器生成UUID不同, 所以其時間戳的值肯定會來至原有的服務器. <br/>
 * "redisson_lock_queue:{lock_name}": 這LIST的順序就是獲取鎖的順序, 即FairLock實現原理. <br/>
 * "redisson_lock_timeout:{xxx}": 這其中的score並不表示獲取鎖的順序, 而是表示線程競爭鎖的失效時間點. <br/>
 */
public class RedissonFairLock extends RedissonLock implements RLock {

    private final long threadWaitTime = 5000;
    private final CommandExecutor commandExecutor;

    /** vergilyn mark: <br/>
     *  構造函數, 被protected修飾, 說明不能在外部通過new獲取; 一般的構建是:{@link Redisson#getFairLock(String)}. <br/>
     * @param name lock的'鎖名', 實際定義在{@link RedissonObject#name}
     * @param id
     */
    protected RedissonFairLock(CommandExecutor commandExecutor, String name, UUID id) {
        super(commandExecutor, name, id);
        this.commandExecutor = commandExecutor;
    }

    /** vergilyn mark: <br/>
     * 未被修飾符修飾, 默認即friendly: 在同一個包中的類可以訪問, 其他包中的類不能訪問. <br/>
     * type : LIST <br/>
     * key  : "redisson_lock_queue:{lock_name}" <br/>
     * value: "{Redisson.UUID}:{threadId}" <br/>
     * @return ex, redisson_lock_queue:{lock_name}
     */
    String getThreadsQueueName() {
        return prefixName("redisson_lock_queue", getName());
    }

    /** vergilyn mark: <br/>
     * type : SORT-SET <br/>
     * key  : "redisson_lock_timeout:{lock_name}" <br/>
     * value: "{Redisson.UUID}:{threadId}" <br/>
     * score: 時間戳 <br/>
     * @return  ex, redisson_lock_timeout:{lock_name}
     */
    String getTimeoutSetName() {
        return prefixName("redisson_lock_timeout", getName());
    }
    
    @Override
    protected RedissonLockEntry getEntry(long threadId) {
        return PUBSUB.getEntry(getEntryName() + ":" + threadId);
    }

    /** vergilyn mark: <br/>
     * 訂閱, 重寫: {@link RedissonLock#subscribe(long)}
     */
    @Override
    protected RFuture<RedissonLockEntry> subscribe(long threadId) {
        return PUBSUB.subscribe(getEntryName() + ":" + threadId, 
                getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
    }

    /** vergilyn mark: <br/>
     * 取消訂閱, 重寫: {@link RedissonLock#unsubscribe(RFuture, long)}
     */
    @Override
    protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
        PUBSUB.unsubscribe(future.getNow(), getEntryName() + ":" + threadId, 
                getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
    }

    @Override
    protected RFuture<Void> acquireFailedAsync(long threadId) {
        /* vergilyn mark: lua中數組下標是從1開始
         * lindex key index                   : LIST, 返回列表中下標為指定索引值的元素. 如果指定索引值不在列表的區間范圍內, 返回 nil.
         * zrange key start stop [WITHSCORES] : SORT-SET, 通過索引區間返回有序集合成指定區間內的成員.
         * zincrby key increment member       : SORT-SET, 有序集合中對指定成員的分數加上增量 increment.
         * zrem key member [member ...]       : SORT-SET, 移除有序集合中的一個或多個成員
         * lrem key count value               : LIST, 移除列表元素(count=0, 表示全部)
         */
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
                    // KEYS[1]: "redisson_lock_queue:{xxx}",   ARGV[1]: "{Redisson.UUID}:{threadId}"
                    // KEYS[2]: "redisson_lock_timeout:{xxx}", ARGV[2]: "{threadWaitTime}"(默認: 5000ms)
                    "local firstThreadId = redis.call('lindex', KEYS[1], 0); " +
                    "if firstThreadId == ARGV[1] then " +
                        "local keys = redis.call('zrange', KEYS[2], 0, -1); " + 
                        "for i = 1, #keys, 1 do " + 
                            "redis.call('zincrby', KEYS[2], -tonumber(ARGV[2]), keys[i]);" + 
                        "end;" + 
                    "end;" +
                    "redis.call('zrem', KEYS[2], ARGV[1]); " +
                    "redis.call('lrem', KEYS[1], 0, ARGV[1]); ",
                    Arrays.<Object>asList(getThreadsQueueName(), getTimeoutSetName()),  // KEYS
                    getLockName(threadId), threadWaitTime);                        // PARAMS
    }

    /** vergilyn mark: <br/>
     * FairLock核心方法, 嘗試獲取鎖(內部異步獲取);
     * remark: 因為{Redisson.UUID}, 解決了服務器之間的時間不同步
     * @param leaseTime 鎖自動釋放的時長
     * @param unit leaseTime的時間單位
     * @param threadId 線程
     * @return
     */
    @Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        long currentTime = System.currentTimeMillis();
        if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    // remove stale threads: 移除陳舊的線程
                    // KEYS[1]: "lock_name",                    ARGV[1]: "{leaseTime}"
                    // KEYS[2]: "redisson_lock_queue:{xxx}",    ARGV[2]: "{Redisson.UUID}:{threadId}"
                    // KEYS[3]: "redisson_lock_timeout:{xxx}",  ARGV[3]: "{currentTime}" (部署服務器的時間ms, 不是redis-server的服務器時間)
                    "while true do "
                    + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                    + "if firstThreadId2 == false then "
                        + "break;"
                    + "end; "
                    + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                    + "if timeout <= tonumber(ARGV[3]) then "
                        + "redis.call('zrem', KEYS[3], firstThreadId2); "
                        + "redis.call('lpop', KEYS[2]); "
                    + "else "
                        + "break;"
                    + "end; "
                  + "end;"
                    + 
                    
                    "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
                            + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
                            "redis.call('lpop', KEYS[2]); " +
                            "redis.call('zrem', KEYS[3], ARGV[2]); " +
                            "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                        "end; " +
                        "return 1;", 
                    Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName()),  // KEYS
                    internalLockLeaseTime, getLockName(threadId), currentTime);               // PARAMS
        }
        
        if (command == RedisCommands.EVAL_LONG) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    // remove stale threads: 移除無效的競爭鎖的線程
                    // KEYS[1]: "lock_name",                    ARGV[1]: "{leaseTime}"
                    // KEYS[2]: "redisson_lock_queue:{xxx}",    ARGV[2]: "{Redisson.UUID}:{threadId}"
                    // KEYS[3]: "redisson_lock_timeout:{xxx}",  ARGV[3]: "{currentTime + threadWaitTime}" (部署服務器的時間ms, 不是redis-server的服務器時間)
                    //                                          ARGV[4]: "{currentTime}"

                    // 移除競爭lock_name中無效的線程
                    "while true do "
                    + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                    + "if firstThreadId2 == false then "
                        + "break;"
                    + "end; "
                    + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                    + "if timeout <= tonumber(ARGV[4]) then " // 如果存在且已失效, 則相應的從"redisson_lock_queue:{xxx}"、"redisson_lock_timeout:{xxx}"中移除
                        + "redis.call('zrem', KEYS[3], firstThreadId2); "
                        + "redis.call('lpop', KEYS[2]); "
                    + "else "
                        + "break;"
                    + "end; "
                  + "end;"

                      // exists: 若 key 存在返回 1 ,否則返回 0 。
                        // 1. KEYS[1]: 記錄獲取到lock_name的線程, 及記錄線程獲取鎖的次數. (此HASH只會存在一個field)
                        //    type: HASH, key: "lock_name", field: "{Redisson.UUID}:{threadId}", value: 1 (value表示線程獲取鎖的次數, 釋放鎖時必須釋放相同的次數, 才會釋放鎖lock_name)
                        // 2. 當不存在"redisson_lock_queue:{lock_name}"時, 表示沒有競爭對手存在; 或, 競爭隊列的第一個值為當前線程 (因為是FairLock, 所以要滿足 redis.call('lindex', KEYS[2], 0) == ARGV[2])
                      + "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
                            + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
                            // 當前線程獲取到鎖, 從queue、timeout中移除
                            "redis.call('lpop', KEYS[2]); " +
                            "redis.call('zrem', KEYS[3], ARGV[2]); " +
                            "redis.call('hset', KEYS[1], ARGV[2], 1); " + // 標記當前鎖已被某個線程獲取
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 設置標記的失效時常, 默認是 30 * 1000 ms
                            "return nil; " +
                        "end; " +

                        // (重復獲取鎖) 當前線程即持有鎖的線程, 即可重入鎖(Reentrant Lock)
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 記錄線程獲取鎖的次數
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +    // 重置失效時長
                            "return nil; " +
                        "end; " +
                            
                        "local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
                        "local ttl; " + 
                        "if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + // 隊列存在, 且隊列的第一個值 ≠ 當前線程
                            "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + 
                        "else "
                          + "ttl = redis.call('pttl', KEYS[1]);" + 
                        "end; " + 
                            
                        "local timeout = ttl + tonumber(ARGV[3]);" + // 計算競爭線程的失效時間點
                        "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " + // 設置當前線程競爭鎖的失效時間點
                            "redis.call('rpush', KEYS[2], ARGV[2]);" + // 將當前線程加入競爭鎖的隊列中
                        "end; " +
                        "return ttl;", // 返回持有鎖的剩余時長,
                        Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName()), 
                                    internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);
        }
        
        throw new IllegalArgumentException();
    }

    /** vergilyn mark: <br/>
     * 釋放鎖(內部異步); 只有持有鎖的線程才能釋放鎖, 且線程獲取多少次鎖, 就要釋放多少次, 否則不會釋放(除非鎖達到失效時長).
     * lua執行返回: 1, 成功釋放鎖; nil/0, 未釋放任何鎖.
     * @see RedissonFairLock#forceUnlockAsync()
     */
    @Override
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // remove stale threads: 移除無效的獲取鎖的線程
                "while true do "
                + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                + "if firstThreadId2 == false then "
                    + "break;"
                + "end; "
                + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                + "if timeout <= tonumber(ARGV[4]) then "
                    + "redis.call('zrem', KEYS[3], firstThreadId2); "
                    + "redis.call('lpop', KEYS[2]); "
                + "else "
                    + "break;"
                + "end; "
              + "end;"

                // 鎖已釋放(KEYS[1]已自動失效), 通知下一個獲取鎖的線程
              + "if (redis.call('exists', KEYS[1]) == 0) then " + 
                    "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
                    "if nextThreadId ~= false then " +
                        "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                    "end; " +
                    "return 1; " + // 成功釋放鎖
                "end;" +

                // 當前線程不是持有鎖的線程, 不允許釋放鎖
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +

                // 當前線程是持有鎖的線程, value: 遞減
                // (ReentrantLock概念) 當同一線程中多次獲取鎖, 必須釋放相同多的次數, 才會最終釋放鎖
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " + // 每次遞減后重置失效時長
                    "return 0; " + // 釋放鎖失敗
                "end; " +

                // 當前線程所有獲取鎖的地方都釋放了鎖, 則正確的釋放鎖
                "redis.call('del', KEYS[1]); " +

                // 通知下一個競爭相同鎖的線程.
                "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
                "if nextThreadId ~= false then " +
                    "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                "end; " +
                "return 1; ", // 成功釋放鎖
                Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()), 
                LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
    }
    
    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }

    @Override
    public RFuture<Boolean> deleteAsync() {
        return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getThreadsQueueName(), getTimeoutSetName());
    }

    /** vergilyn mark: <br/>
     * 強制釋放鎖
     * @see RedissonLock#unlockInnerAsync
     */
    @Override
    public RFuture<Boolean> forceUnlockAsync() {
        cancelExpirationRenewal();
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // remove stale threads
                "while true do "
                + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                + "if firstThreadId2 == false then "
                    + "break;"
                + "end; "
                + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                + "if timeout <= tonumber(ARGV[2]) then "
                    + "redis.call('zrem', KEYS[3], firstThreadId2); "
                    + "redis.call('lpop', KEYS[2]); "
                + "else "
                    + "break;"
                + "end; "
              + "end;"
                + 

                // 強制釋放鎖, 即使當前線程不是持有鎖的線程, 並通知下一個競爭相同鎖的線程
                "if (redis.call('del', KEYS[1]) == 1) then " + 
                    "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
                    "if nextThreadId ~= false then " +
                        "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                    "end; " + 
                    "return 1; " + // 強制釋放鎖成功
                "end; " + 
                "return 0;", // 強制釋放鎖失敗
                Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()), 
                LockPubSub.unlockMessage, System.currentTimeMillis());
    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM