redis-RedisLockRegistry分析(分布式锁)-单机版redis


// redis 锁的优势

对于分布式服务的情况下,当只使用java原生相关锁(ReentrantLock)操作时,只能保证一个jvm进程中的操作受到锁的保护,但对于多个jvm进程就无法进行有效锁保护控制;

因此为了满足分布式场景, 需要使用一个统一管理位置,因此通过redis 来做作为锁控制

 

spring 提供的redis支持

https://docs.spring.io/spring-integration/reference/html/redis.html#redis-lock-registry

其利用java 本地锁和 redis SET相关指令 双重保证

  • 本地锁是使用的java.util.concurrent.locks.ReentrantLock#ReentrantLock() 来实现可重入的特性
  • 使用redis相关命令 ,从以下代码中可以看出实际其利用 redis SET 中的指令
    // 来源于 spring-integration-redis-4.3.11.RELEASE 版本中  org.springframework.integration.redis.util.RedisLockRegistry

    private boolean obtainLock() {
    Thread currentThread = Thread.currentThread();
    if (currentThread.equals(this.thread)) {
    this.reLock++;
    return true;
    }

    toHardThreadStorage(this);

    /*
    * Set these now so they will be persisted if successful.
    */
    this.lockedAt = System.currentTimeMillis();
    this.threadName = currentThread.getName();

    Boolean success = false;
    try {

    success = RedisLockRegistry.this.redisTemplate.execute(new RedisCallback<Boolean>() {

    @Override
    public Boolean doInRedis(RedisConnection connection) throws DataAccessException {

    /* 核心代码点
    Perform Redis command 'SET resource-name anystring NX EX max-lock-time' directly.
    As it is recommended by Redis: http://redis.io/commands/set.
    This command isn't supported directly by RedisTemplate.
    */
    long expireAfter = TimeoutUtils.toSeconds(RedisLockRegistry.this.expireAfter,
    TimeUnit.MILLISECONDS);
    RedisSerializer<String> serializer = RedisLockRegistry.this.redisTemplate.getStringSerializer();
    byte[][] actualArgs = new byte[][] {
    // 指定存储到redis 中key的名称为 "创建RedisLockRegistry对象时传递的registryKey名称" + ":" + "在RedisLockRegistry对象调用obtain方法时传递的lockKey"
    serializer.serialize(constructLockKey()),
    // 指定存储到redis中的值为 当前新建的RedisLock对象序列化 ; 对于当前序列化操作做了一定的自定义性能提升
    RedisLockRegistry.this.lockSerializer.serialize(RedisLock.this),
    // 对于redis SET NX 命令的含义为 如果key不存在则会执行新增操作
    // 官方解释 : NX -- Only set the key if it does not already exist.
    serializer.serialize("NX"),
    // 对于 redis SET EX 命令代表的是, 指定当前key过期时间
    // EX seconds -- Set the specified expire time, in seconds.
    serializer.serialize("EX"),
    // 指定当前key 的过期时间 默认单位为 秒
    // EX seconds -- Set the specified expire time, in seconds.
    // 对于"RedisLockRegistry#expireAfter"表示当前key过期时间,默认为60s
    serializer.serialize(String.valueOf(expireAfter))
    };

    return connection.execute("SET", actualArgs) != null;
    }

    });
    }
    finally {

    if (!success) {
    this.lockedAt = 0;
    this.threadName = null;
    toWeakThreadStorage(this);
    }
    else {
    this.thread = currentThread;
    if (logger.isDebugEnabled()) {
    logger.debug("New lock; " + this);
    }
    }

    }

    return success;
    }

     

  •  关于redis中存储的value使用到的 LockSerializer 序列化和反序列化进行分析

     private class LockSerializer implements RedisSerializer<RedisLockRegistry.RedisLock> {
    
            /**
             * 其序列化操作并未直接将 {@code RedisLock} 对象进行序列化操作, 而主要通过自定义字节数组长度 根据 redisLock中的必需属性值的字节长度来定义;
             * 其主要存储了以下属性
             * {@code RedisLock#localHost} 当前请求redis的服务器ip
             * {@code RedisLock#lockKey} 当前redis 存储的数据 key
             * {@code RedisLock#threadName} 记录当前执行加锁操作的线程名
             * {@code RedisLock#lockedAt} 记录当前加锁时间
             */
            @Override
            public byte[] serialize(RedisLockRegistry.RedisLock t) throws SerializationException {
                int hostLength = t.lockHost.length;
                int keyLength = t.lockKey.length();
                int threadNameLength = t.threadName.length();
                byte[] value = new byte[1 + hostLength +
                        1 + keyLength +
                        1 + threadNameLength + 8];
                // 使用 nio中的 最新io操作 API {@code ByteBuffer} 来快速操作字节数组 {@code value}
                ByteBuffer buff = ByteBuffer.wrap(value);
                buff.put((byte) hostLength)
                        .put(t.lockHost)
                        .put((byte) keyLength)
                        .put(t.lockKey.getBytes())
                        .put((byte) threadNameLength)
                        .put(t.threadName.getBytes())
                        .putLong(t.lockedAt);
                return value;
            }
    
            /**
             * 对于反序列化操作其通过在序列化过程中存储了 每个保存的属性值的字节长度,通过获取到每个属性的长度,利用 {@link java.nio.ByteBuffer#get(byte[])} 实现快速读取操作
             */
            @Override
            public RedisLockRegistry.RedisLock deserialize(byte[] bytes) throws SerializationException {
                if (bytes == null) {
                    return null;
                }
                ByteBuffer buff = ByteBuffer.wrap(bytes);
                byte[] host = new byte[buff.get()];
                buff.get(host);
                byte[] lockKey = new byte[buff.get()];
                buff.get(lockKey);
                byte[] threadName = new byte[buff.get()];
                buff.get(threadName);
                long lockedAt = buff.getLong();
                RedisLockRegistry.RedisLock lock = new RedisLockRegistry.RedisLock(new String(lockKey));
                lock.lockedAt = lockedAt;
                lock.lockHost = host;
                lock.threadName = new String(threadName);
                return lock;
            }
    
        }

     

关于当前 redis分布式锁的缺点(不适合redis集群操作)

由于在redis数据写入过程中,其实际只在一台redis机器上进行了数据保存,对于redis主从或集群模式下由于存在多台机器,这就会导致 多个相同服务的java机器会请求到不同的redis机器从而导致 相同意义的redis 锁数据被保存到了不同的redis机器上,使不同来源的客户端都加锁成功;

举例而言: 存在 S1 和 S2 两台分布式 java服务, 存在 R1(master) 和 R2 HA模式的redis机器, 此时 S1请求加锁操作,会在R1中存储相关redis数据,而此时S2也准备请求相同key的加锁操作,而在此时R1机器宕机了,R2机器作为备份机器开始工作,但对于 R1中的数据在R2中是不会存在的,因此当S2请求到R2执行 SET NX 命令时,其会写入成功,从而导致 从结果体现而言,同一个redis锁变量同时被两个不同的客户端获取,这实际就没有达到分布式锁排他性的目的了

 

//TODO : 因此这里引入我们下一章讨论的 redis集群模式下采用RedLock操作来实现分布式锁


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM