redis-RedisLockRegistry分析(分布式鎖)-單機版redis


// redis 鎖的優勢

對於分布式服務的情況下,當只使用java原生相關鎖(ReentrantLock)操作時,只能保證一個jvm進程中的操作受到鎖的保護,但對於多個jvm進程就無法進行有效鎖保護控制;

因此為了滿足分布式場景, 需要使用一個統一管理位置,因此通過redis 來做作為鎖控制

 

spring 提供的redis支持

https://docs.spring.io/spring-integration/reference/html/redis.html#redis-lock-registry

其利用java 本地鎖和 redis SET相關指令 雙重保證

  • 本地鎖是使用的java.util.concurrent.locks.ReentrantLock#ReentrantLock() 來實現可重入的特性
  • 使用redis相關命令 ,從以下代碼中可以看出實際其利用 redis SET 中的指令
    // 來源於 spring-integration-redis-4.3.11.RELEASE 版本中  org.springframework.integration.redis.util.RedisLockRegistry

    private boolean obtainLock() {
    Thread currentThread = Thread.currentThread();
    if (currentThread.equals(this.thread)) {
    this.reLock++;
    return true;
    }

    toHardThreadStorage(this);

    /*
    * Set these now so they will be persisted if successful.
    */
    this.lockedAt = System.currentTimeMillis();
    this.threadName = currentThread.getName();

    Boolean success = false;
    try {

    success = RedisLockRegistry.this.redisTemplate.execute(new RedisCallback<Boolean>() {

    @Override
    public Boolean doInRedis(RedisConnection connection) throws DataAccessException {

    /* 核心代碼點
    Perform Redis command 'SET resource-name anystring NX EX max-lock-time' directly.
    As it is recommended by Redis: http://redis.io/commands/set.
    This command isn't supported directly by RedisTemplate.
    */
    long expireAfter = TimeoutUtils.toSeconds(RedisLockRegistry.this.expireAfter,
    TimeUnit.MILLISECONDS);
    RedisSerializer<String> serializer = RedisLockRegistry.this.redisTemplate.getStringSerializer();
    byte[][] actualArgs = new byte[][] {
    // 指定存儲到redis 中key的名稱為 "創建RedisLockRegistry對象時傳遞的registryKey名稱" + ":" + "在RedisLockRegistry對象調用obtain方法時傳遞的lockKey"
    serializer.serialize(constructLockKey()),
    // 指定存儲到redis中的值為 當前新建的RedisLock對象序列化 ; 對於當前序列化操作做了一定的自定義性能提升
    RedisLockRegistry.this.lockSerializer.serialize(RedisLock.this),
    // 對於redis SET NX 命令的含義為 如果key不存在則會執行新增操作
    // 官方解釋 : NX -- Only set the key if it does not already exist.
    serializer.serialize("NX"),
    // 對於 redis SET EX 命令代表的是, 指定當前key過期時間
    // EX seconds -- Set the specified expire time, in seconds.
    serializer.serialize("EX"),
    // 指定當前key 的過期時間 默認單位為 秒
    // EX seconds -- Set the specified expire time, in seconds.
    // 對於"RedisLockRegistry#expireAfter"表示當前key過期時間,默認為60s
    serializer.serialize(String.valueOf(expireAfter))
    };

    return connection.execute("SET", actualArgs) != null;
    }

    });
    }
    finally {

    if (!success) {
    this.lockedAt = 0;
    this.threadName = null;
    toWeakThreadStorage(this);
    }
    else {
    this.thread = currentThread;
    if (logger.isDebugEnabled()) {
    logger.debug("New lock; " + this);
    }
    }

    }

    return success;
    }

     

  •  關於redis中存儲的value使用到的 LockSerializer 序列化和反序列化進行分析

     private class LockSerializer implements RedisSerializer<RedisLockRegistry.RedisLock> {
    
            /**
             * 其序列化操作並未直接將 {@code RedisLock} 對象進行序列化操作, 而主要通過自定義字節數組長度 根據 redisLock中的必需屬性值的字節長度來定義;
             * 其主要存儲了以下屬性
             * {@code RedisLock#localHost} 當前請求redis的服務器ip
             * {@code RedisLock#lockKey} 當前redis 存儲的數據 key
             * {@code RedisLock#threadName} 記錄當前執行加鎖操作的線程名
             * {@code RedisLock#lockedAt} 記錄當前加鎖時間
             */
            @Override
            public byte[] serialize(RedisLockRegistry.RedisLock t) throws SerializationException {
                int hostLength = t.lockHost.length;
                int keyLength = t.lockKey.length();
                int threadNameLength = t.threadName.length();
                byte[] value = new byte[1 + hostLength +
                        1 + keyLength +
                        1 + threadNameLength + 8];
                // 使用 nio中的 最新io操作 API {@code ByteBuffer} 來快速操作字節數組 {@code value}
                ByteBuffer buff = ByteBuffer.wrap(value);
                buff.put((byte) hostLength)
                        .put(t.lockHost)
                        .put((byte) keyLength)
                        .put(t.lockKey.getBytes())
                        .put((byte) threadNameLength)
                        .put(t.threadName.getBytes())
                        .putLong(t.lockedAt);
                return value;
            }
    
            /**
             * 對於反序列化操作其通過在序列化過程中存儲了 每個保存的屬性值的字節長度,通過獲取到每個屬性的長度,利用 {@link java.nio.ByteBuffer#get(byte[])} 實現快速讀取操作
             */
            @Override
            public RedisLockRegistry.RedisLock deserialize(byte[] bytes) throws SerializationException {
                if (bytes == null) {
                    return null;
                }
                ByteBuffer buff = ByteBuffer.wrap(bytes);
                byte[] host = new byte[buff.get()];
                buff.get(host);
                byte[] lockKey = new byte[buff.get()];
                buff.get(lockKey);
                byte[] threadName = new byte[buff.get()];
                buff.get(threadName);
                long lockedAt = buff.getLong();
                RedisLockRegistry.RedisLock lock = new RedisLockRegistry.RedisLock(new String(lockKey));
                lock.lockedAt = lockedAt;
                lock.lockHost = host;
                lock.threadName = new String(threadName);
                return lock;
            }
    
        }

     

關於當前 redis分布式鎖的缺點(不適合redis集群操作)

由於在redis數據寫入過程中,其實際只在一台redis機器上進行了數據保存,對於redis主從或集群模式下由於存在多台機器,這就會導致 多個相同服務的java機器會請求到不同的redis機器從而導致 相同意義的redis 鎖數據被保存到了不同的redis機器上,使不同來源的客戶端都加鎖成功;

舉例而言: 存在 S1 和 S2 兩台分布式 java服務, 存在 R1(master) 和 R2 HA模式的redis機器, 此時 S1請求加鎖操作,會在R1中存儲相關redis數據,而此時S2也准備請求相同key的加鎖操作,而在此時R1機器宕機了,R2機器作為備份機器開始工作,但對於 R1中的數據在R2中是不會存在的,因此當S2請求到R2執行 SET NX 命令時,其會寫入成功,從而導致 從結果體現而言,同一個redis鎖變量同時被兩個不同的客戶端獲取,這實際就沒有達到分布式鎖排他性的目的了

 

//TODO : 因此這里引入我們下一章討論的 redis集群模式下采用RedLock操作來實現分布式鎖


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM