RedisTemplate詳解


SpringBoot集成redis使用starter是spring-boot-starter-data-redis。

一、關於spring-data-redis

spring-data-redis針對jedis提供了如下功能:

  • 連接池自動管理,提供了一個高度封裝的“RedisTemplate”類。
  • 針對jedis客戶端中大量api進行了歸類封裝,將同一類型操作封裝為operation接口。
    • ValueOperations:簡單K-V操作
    • SetOperations:set類型數據操作
    • ZSetOperations:zset類型數據操作
    • HashOperations:針對map類型的數據操作
    • ListOperations:針對list類型的數據操作
  • 提供了對key的“bound”(綁定)便捷化操作API,可以通過bound封裝指定的key,然后進行一系列的操作而無須“顯式”的再次指定Key,即BoundKeyOperations將事務操作封裝,由容器控制。
    • BoundValueOperations
    • BoundSetOperations
    • BoundListOperations
    • BoundSetOperations
    • BoundHashOperations
  • 針對數據的“序列化/反序列化”,提供了多種可選擇策略(RedisSerializer)
    • JdkSerializationRedisSerializer:POJO對象的存取場景,使用JDK本身序列化機制,將pojo類通過ObjectInputStream/ObjectOutputStream進行序列化操作,最終redis-server中將存儲字節序列。是目前最常用的序列化策略。
    • StringRedisSerializer:Key或者value為字符串的場景,根據指定的charset對數據的字節序列編碼成string,是“new String(bytes, charset)”和“string.getBytes(charset)”的直接封裝。是最輕量級和高效的策略。
    • JacksonJsonRedisSerializer:jackson-json工具提供了javabean與json之間的轉換能力,可以將pojo實例序列化成json格式存儲在redis中,也可以將json格式的數據轉換成pojo實例。因為jackson工具在序列化和反序列化時,需要明確指定Class類型,因此此策略封裝起來稍微復雜。【需要jackson-mapper-asl工具支持】
    • OxmSerializer:提供了將javabean與xml之間的轉換能力,目前可用的三方支持包括jaxb,apache-xmlbeans;redis存儲的數據將是xml工具。不過使用此策略,編程將會有些難度,而且效率最低;不建議使用。【需要spring-oxm模塊的支持】

如果你的數據需要被第三方工具解析,那么數據應該使用StringRedisSerializer而不是JdkSerializationRedisSerializer。

二、關於key的設計

1. key的存活時間

無論什么時候,只要有可能就利用key超時的優勢。一個很好的例子就是儲存一些諸如臨時認證key之類的東西。這樣在設置key的時候,設成同樣的超時時間,Redis就會自動為你清除。

2. 關系數據庫的redis

  • 把表名轉換為key前綴 如, tag:
  • 第2段放置用於區分區key的字段--對應mysql中的主鍵的列名,如userid
  • 第3段放置主鍵值,如2,3,4...., a , b ,c
  • 第4段,寫要存儲的列名

例:user:userid:9:username

三、Redis的數據類型

1. String字符串

  • string是redis最基本的類型,一個key對應一個value。
  • string類型是二進制安全的。意思是redis的string可以包含任何數據。比如jpg圖片或者序列化的對象 。
  • string類型是Redis最基本的數據類型,一個鍵最大能存儲512MB。

2. 鏈表

  • redis列表是簡單的字符串列表,排序為插入的順序。列表的最大長度為2^32-1。
  • redis的列表是使用鏈表實現的,這意味着,即使列表中有上百萬個元素,增加一個元素到列表的頭部或尾部的操作都是在常量的時間完成。
  • 可以用列表獲取最新的內容(像帖子,微博等),用ltrim很容易就會獲取最新的內容,並移除舊的內容。
  • 用列表可以實現生產者消費者模式,生產者調用lpush添加項到列表中,消費者調用rpop從列表中提取,如果沒有元素,則輪詢去獲取,或者使用brpop等待生產者添加項到列表中。

3. 集合

  • redis集合是無序的字符串集合,集合中的值是唯一的,無序的。可以對集合執行很多操作,例如,測試元素是否存在,對多個集合執行交集、並集和差集等等。
  • 我們通常可以用集合存儲一些無關順序的,表達對象間關系的數據,例如用戶的角色,可以用sismember很容易就判斷用戶是否擁有某個角色。
  • 在一些用到隨機值的場合是非常適合的,可以用 srandmember/spop 獲取/彈出一個隨機元素。
    同時,使用@EnableCaching開啟聲明式緩存支持,這樣就可以使用基於注解的緩存技術。注解緩存是一個對緩存使用的抽象,通過在代碼中添加下面的一些注解,達到緩存的效果。

4. ZSet有序集合

  • 有序集合由唯一的,不重復的字符串元素組成。有序集合中的每個元素都關聯了一個浮點值,稱為分數。可以把有序看成hash和集合的混合體,分數即為hash的key。
  • 有序集合中的元素是按序存儲的,不是請求時才排序的。

5. Hash-哈希

  • redis的哈希值是字符串字段和字符串之間的映射,是表示對象的完美數據類型。
  • 哈希中的字段數量沒有限制,所以可以在你的應用程序以不同的方式來使用哈希。

四、SpringBoot與redis的整合

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

1. application.properties(或.yml)中的redis配置,可以是單機或集群

2. RedisTemplate的實例化

@Configuration
public class RedisConfig {

    @Bean
    @ConditionalOnMissingBean(name = "redisTemplate")
    public RedisTemplate<Object, Object> redisTemplate(
            RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<String,Object>();
        //使用fastjson序列化
        FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class);
        // value值的序列化采用fastJsonRedisSerializer
        template.setValueSerializer(fastJsonRedisSerializer);
        template.setHashValueSerializer(fastJsonRedisSerializer);
        // key的序列化采用StringRedisSerializer
        template.setKeySerializer(new StringRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean(StringRedisTemplate.class)
    public StringRedisTemplate stringRedisTemplate(
            RedisConnectionFactory redisConnectionFactory) {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

}

3. 封裝的通用操作工具

@Component
public class RedisUtil {

    /**
     * 默認過期時長,單位:秒
     **/
    public static final long DEFAULT_EXPIRE = 60 * 60 * 24;
    /**
     * 不設置過期時長
     **/
    public static final long NOT_EXPIRE = -1;

    /**
     * 表示 SET IF NOT EXIST
     **/
    private static final String NX = "NX";
    /**
     * 表示 SET WITH EXPIRE_TIME
     **/
    private static final String EX = "EX";
    /**
     * 加鎖成功
     **/
    private static final String LOCK_OK = "OK";
    /**
     * 解鎖成功
     **/
    private static final Long UNLOCK_OK = 1L;
    /**
     * 解鎖的腳本
     **/
    private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    /**
     * 請求標識
     **/
    private static ThreadLocal<String> LOCK_VALUE = new ThreadLocal<String>() {
        @Override
        protected String initialValue() {
            return UUID.randomUUID().toString();
        }
    };

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * @MethodName lock
     * @Description 加鎖
     * @param key
     * @param expireSeconds 過期時間
     * @return
     */
    public boolean lock(String key, int expireSeconds) {
        return redisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                if (nativeConnection instanceof JedisCluster) {
                    JedisCluster jedisCluster = (JedisCluster) nativeConnection;
                    String result = jedisCluster.set(key, LOCK_VALUE.get(), NX, EX, expireSeconds);
                    return LOCK_OK.equals(result);
                }
                if (nativeConnection instanceof Jedis) {
                    Jedis jedis = (Jedis) nativeConnection;
                    String result = jedis.set(key, LOCK_VALUE.get(), NX, EX, expireSeconds);
                    return LOCK_OK.equals(result);
                }
                return false;
            }
        });
    }

    /**
     * @MethodName unlock
     * @Description 解鎖
     * @param key
     * @Return boolean
     */
    public boolean unlock(String key) {
        return redisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                if (nativeConnection instanceof JedisCluster) {
                    JedisCluster jedisCluster = (JedisCluster) nativeConnection;
                    Object unlock = jedisCluster.eval(UNLOCK_SCRIPT, Collections.singletonList(key), Collections.singletonList(LOCK_VALUE.get()));
                    return UNLOCK_OK.equals(unlock);
                }
                if (nativeConnection instanceof Jedis) {
                    Jedis jedis = (Jedis) nativeConnection;
                    Object unlock = jedis.eval(UNLOCK_SCRIPT, Collections.singletonList(key), Collections.singletonList(LOCK_VALUE.get()));
                    return UNLOCK_OK.equals(unlock);
                }
                return false;
            }
        });
    }

    /**
     * 刪除key
     * @param key
     */
    public void delete(String key) {
        redisTemplate.delete(key);
    }

    /**
     * 指定緩存失效時間
     * @param key  鍵
     * @param time 時間(秒)
     * @return
     */
    public boolean expire(String key, long time) {
        try {
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根據key獲取過期時間
     * @param key 鍵 不能為null
     * @return 時間(秒) 返回0代表為永久有效
     */
    public long getExpire(String key) {
        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
    }

    /**
     * 判斷key是否存在
     * @param key 鍵
     * @return true 存在 false不存在
     */
    public boolean hasKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 刪除緩存
     * @param key 可以傳一個值 或多個
     */
    public void del(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                redisTemplate.delete(key[0]);
            } else {
                redisTemplate.delete(CollectionUtils.arrayToList(key));
            }
        }
    }

    /**
     * 獲取緩存
     * @param key 鍵
     * @return*/
    public Object get(String key) {
        return key == null ? null : redisTemplate.opsForValue().get(key);
    }

    /**
     * 添加緩存
     * @param key   鍵
     * @param value 值
     * @return true成功 false失敗
     */
    public boolean set(String key, Object value) {
        try {
            redisTemplate.opsForValue().set(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 添加緩存並設置過期時間
     * @param key   鍵
     * @param value 值
     * @param time  時間(秒) time要大於0 如果time小於等於0 將設置無限期
     * @return true成功 false 失敗
     */
    public boolean set(String key, Object value, long time) {
        try {
            if (time > 0) {
                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
            } else {
                set(key, value);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 遞增
     * @param key 鍵
     * @return
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("遞增因子必須大於0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 遞減
     * @param key 鍵
     * @return
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("遞減因子必須大於0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    /**
     * 設置一組Map的鍵值對
     * @param key  鍵 不能為null
     * @param item 項 不能為null
     * @return*/
    public Object hGet(String key, String item) {
        return redisTemplate.opsForHash().get(key, item);
    }

    /**
     * 獲取hashKey對應的所有鍵值
     * @param key 鍵
     * @return 對應的多個鍵值
     */
    public Map<Object, Object> hmGet(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * 添加一個Map類型值
     * @param key 鍵
     * @param map 對應多個鍵值
     * @return true 成功 false 失敗
     */
    public boolean hmSet(String key, Map<String, Object> map) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 添加一個Map類型值並設置過期時間
     * @param key  鍵
     * @param map  對應多個鍵值
     * @param time 時間(秒)
     * @return true成功 false失敗
     */
    public boolean hmSet(String key, Map<String, Object> map, long time) {
        try {
            redisTemplate.opsForHash().putAll(key, map);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一張hash表中放入數據,如果不存在將創建
     * @param key   鍵
     * @param item  項
     * @param value 值
     * @return true 成功 false失敗
     */
    public boolean hSet(String key, String item, Object value) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 向一張hash表中放入數據,如果不存在將創建
     * @param key   鍵
     * @param item  項
     * @param value 值
     * @param time  時間(秒)  注意:如果已存在的hash表有時間,這里將會替換原有的時間
     * @return true 成功 false失敗
     */
    public boolean hSet(String key, String item, Object value, long time) {
        try {
            redisTemplate.opsForHash().put(key, item, value);
            if (time > 0) {
                expire(key, time);
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 刪除hash表中的值
     * @param key  鍵 不能為null
     * @param item 項 可以使多個 不能為null
     */
    public void hDel(String key, Object... item) {
        redisTemplate.opsForHash().delete(key, item);
    }

    /**
     * 判斷hash表中是否有該項的值
     * @param key  鍵 不能為null
     * @param item 項 不能為null
     * @return true 存在 false不存在
     */
    public boolean hHasKey(String key, String item) {
        return redisTemplate.opsForHash().hasKey(key, item);
    }

    /**
     * hash遞增 如果不存在,就會創建一個 並把新增后的值返回
     * @param key  鍵
     * @param item 項
     * @param by   要增加幾(大於0)
     * @return
     */
    public double hIncr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, by);
    }

    /**
     * hash遞減
     * @param key  鍵
     * @param item 項
     * @param by   要減少記(小於0)
     * @return
     */
    public double hDecr(String key, String item, double by) {
        return redisTemplate.opsForHash().increment(key, item, -by);
    }

    /**
     * 根據key獲取Set中的所有值
     * @param key 鍵
     * @return
     */
    public Set<Object> sGet(String key) {
        try {
            return redisTemplate.opsForSet().members(key);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 根據value從一個set中查詢,是否存在
     * @param key   鍵
     * @param value 值
     * @return true 存在 false不存在
     */
    public boolean sHasKey(String key, Object value) {
        try {
            return redisTemplate.opsForSet().isMember(key, value);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 將數據放入set緩存
     * @param key    鍵
     * @param values 值 可以是多個
     * @return 成功個數
     */
    public long sSet(String key, Object... values) {
        try {
            return redisTemplate.opsForSet().add(key, values);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 將set數據放入緩存
     * @param key    鍵
     * @param time   時間(秒)
     * @param values 值 可以是多個
     * @return 成功個數
     */
    public long sSetAndTime(String key, long time, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().add(key, values);
            if (time > 0) expire(key, time);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 獲取set緩存的長度
     * @param key 鍵
     * @return
     */
    public long sGetSetSize(String key) {
        try {
            return redisTemplate.opsForSet().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 移除值為value的
     * @param key    鍵
     * @param values 值 可以是多個
     * @return 移除的個數
     */
    public long setRemove(String key, Object... values) {
        try {
            Long count = redisTemplate.opsForSet().remove(key, values);
            return count;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 獲取list緩存的內容
     * @param key   鍵
     * @param start 開始
     * @param end   結束  0 到 -1代表所有值
     * @return
     */
    public List<Object> lGet(String key, long start, long end) {
        try {
            return redisTemplate.opsForList().range(key, start, end);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 獲取list緩存的長度
     * @param key 鍵
     * @return
     */
    public long lGetListSize(String key) {
        try {
            return redisTemplate.opsForList().size(key);
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }

    /**
     * 通過索引 獲取list中的值
     * @param key   鍵
     * @param index 索引  index>=0時, 0 表頭,1 第二個元素,依次類推;index<0時,-1,表尾,-2倒數第二個元素,依次類推
     * @return
     */
    public Object lGetIndex(String key, long index) {
        try {
            return redisTemplate.opsForList().index(key, index);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 將list放入緩存
     * @param key   鍵
     * @param value 值
     * @return
     */
    public boolean lSet(String key, Object value) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 將list放入緩存
     * @param key   鍵
     * @param value 值
     * @param time  時間(秒)
     * @return
     */
    public boolean lSet(String key, Object value, long time) {
        try {
            redisTemplate.opsForList().rightPush(key, value);
            if (time > 0) expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 將list放入緩存
     * @param key   鍵
     * @param value 值
     * @return
     */
    public boolean lSet(String key, List<Object> value) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 將list放入緩存
     * @param key   鍵
     * @param value 值
     * @param time  時間(秒)
     * @return
     */
    public boolean lSet(String key, List<Object> value, long time) {
        try {
            redisTemplate.opsForList().rightPushAll(key, value);
            if (time > 0) expire(key, time);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 根據索引修改list中的某條數據
     * @param key   鍵
     * @param index 索引
     * @param value 值
     * @return
     */
    public boolean lUpdateIndex(String key, long index, Object value) {
        try {
            redisTemplate.opsForList().set(key, index, value);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 移除N個值為value
     * @param key   鍵
     * @param count 移除多少個
     * @param value 值
     * @return 移除的個數
     */
    public long lRemove(String key, long count, Object value) {
        try {
            Long remove = redisTemplate.opsForList().remove(key, count, value);
            return remove;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        }
    }
}

4. 分布式鎖的解讀

分布式鎖一般有三種實現方式:

  • 數據庫樂觀鎖;
  • 基於Redis的分布式鎖;
  • 基於ZooKeeper的分布式鎖。

本小節着重講解第二種。

(1) 鎖的可靠性具備的四個方面

  • 互斥性:在任意時刻,只有一個客戶端能持有鎖。
  • 不會發生死鎖:即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。
  • 具有容錯性:只要大部分的Redis節點正常運行,客戶端就可以加鎖和解鎖。
  • 解鈴還須系鈴人:加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。

(2) 加鎖

可以看到,我們加鎖就一行代碼:jedis.set(String key, String value, String nxxx, String expx, int time),這個set()方法一共有五個形參:

  • 第一個為key,我們使用key來當鎖,因為key是唯一的。
  • 第二個為value,我們傳的是requestId,很多童鞋可能不明白,有key作為鎖不就夠了嗎,為什么還要用到value?原因就是我們在上面講到可靠性時,分布式鎖要滿足第四個條件解鈴還須系鈴人,通過給value賦值為requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據。requestId可以使用UUID.randomUUID().toString()方法生成。
  • 第三個為nxxx,這個參數我們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,我們進行set操作;若key已經存在,則不做任何操作;
  • 第四個為expx,這個參數我們傳的是EX,意思是我們要給這個key加一個過期的設置,具體時間由第五個參數決定。
  • 第五個為time,與第四個參數相呼應,代表key的過期時間。

總的來說,執行上面的set()方法就只會導致兩種結果:

  • 當前沒有鎖(key不存在),那么就進行加鎖操作,並對鎖設置個有效期,同時value表示加鎖的客戶端。
  • 已有鎖存在,不做任何操作。

我們的加鎖代碼滿足我們可靠性里描述的三個條件。首先,set()加入了NX參數,可以保證如果已有key存在,則函數不會調用成功,也就是只有一個客戶端能持有鎖,滿足互斥性。其次,由於我們對鎖設置了過期時間,即使鎖的持有者后續發生崩潰而沒有解鎖,鎖也會因為到了過期時間而自動解鎖(即key被刪除),不會發生死鎖。最后,因為我們將value賦值為requestId,代表加鎖的客戶端請求標識,那么在客戶端在解鎖的時候就可以進行校驗是否是同一個客戶端。至於容錯性可以用集群實現。

錯誤示例一:比較常見的錯誤示例就是使用jedis.setnx()jedis.expire()組合實現加鎖,代碼如下:

public static void wrongGetLock1(Jedis jedis, String lockKey, String requestId, int expireTime) {
 
    Long result = jedis.setnx(lockKey, requestId);
    if (result == 1) {
        // 若在這里程序突然崩潰,則無法設置過期時間,將發生死鎖
        jedis.expire(lockKey, expireTime);
    }
 
}

setnx()方法作用就是SET IF NOT EXIST,expire()方法就是給鎖加一個過期時間。乍一看好像和前面的set()方法結果一樣,然而由於這是兩條Redis命令,不具有原子性,如果程序在執行完setnx()之后突然崩潰,導致鎖沒有設置過期時間。那么將會發生死鎖。網上之所以有人這樣實現,是因為低版本的jedis並不支持多參數的set()方法。

錯誤示例二:這一種錯誤示例就比較難以發現問題,而且實現也比較復雜。實現思路:使用jedis.setnx()命令實現加鎖,其中key是鎖,value是鎖的過期時間。執行過程:1. 通過setnx()方法嘗試加鎖,如果當前鎖不存在,返回加鎖成功。2. 如果鎖已經存在則獲取鎖的過期時間,和當前時間比較,如果鎖已經過期,則設置新的過期時間,返回加鎖成功。代碼如下:

public static boolean wrongGetLock2(Jedis jedis, String lockKey, int expireTime) {
 
    long expires = System.currentTimeMillis() + expireTime;
    String expiresStr = String.valueOf(expires);
 
    // 如果當前鎖不存在,返回加鎖成功
    if (jedis.setnx(lockKey, expiresStr) == 1) {
        return true;
    }
 
    // 如果鎖存在,獲取鎖的過期時間
    String currentValueStr = jedis.get(lockKey);
    if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
        // 鎖已過期,獲取上一個鎖的過期時間,並設置現在鎖的過期時間
        String oldValueStr = jedis.getSet(lockKey, expiresStr);
        if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
            // 考慮多線程並發的情況,只有一個線程的設置值和當前值相同,它才有權利加鎖
            return true;
        }
    }
        
    // 其他情況,一律返回加鎖失敗
    return false;
 
}

那么這段代碼問題在哪里?

a. 由於是客戶端自己生成過期時間,所以需要強制要求分布式下每個客戶端的時間必須同步。 

b. 當鎖過期的時候,如果多個客戶端同時執行jedis.getSet()方法,那么雖然最終只有一個客戶端可以加鎖,但是這個客戶端的鎖的過期時間可能被其他客戶端覆蓋。

c. 鎖不具備擁有者標識,即任何客戶端都可以解鎖。

(3) 解鎖

解鎖只需要兩行代碼就搞定了!

第一行代碼,我們寫了一個簡單的Lua腳本代碼。第二行代碼,我們將Lua代碼傳到jedis.eval()方法里,並使參數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId。eval()方法是將Lua代碼交給Redis服務端執行。

那么這段Lua代碼的功能是什么呢?其實很簡單,首先獲取鎖對應的value值,檢查是否與requestId相等,如果相等則刪除鎖(解鎖)。那么為什么要使用Lua語言來實現呢?因為要確保上述操作是原子性的。

錯誤示例一:最常見的解鎖代碼就是直接使用jedis.del()方法刪除鎖,這種不先判斷鎖的擁有者而直接解鎖的方式,會導致任何客戶端都可以隨時進行解鎖,即使這把鎖不是它的。

public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
    jedis.del(lockKey);
}

錯誤示例二:分成兩條命令去執行,不具備原子性

public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {
        
    // 判斷加鎖與解鎖是不是同一個客戶端
    if (requestId.equals(jedis.get(lockKey))) {
        // 若在此時,這把鎖突然不是這個客戶端的,則會誤解鎖
        jedis.del(lockKey);
    }
 
}

如代碼注釋,問題在於如果調用jedis.del()方法的時候,這把鎖已經不屬於當前客戶端的時候會解除他人加的鎖。那么是否真的有這種場景?答案是肯定的,比如客戶端A加鎖,一段時間之后客戶端A解鎖,在執行jedis.del()之前,鎖突然過期了,此時客戶端B嘗試加鎖成功,然后客戶端A再執行del()方法,則將客戶端B的鎖給解除了。

注:如果你的項目中Redis是多機部署的,那么可以嘗試使用Redisson實現分布式鎖,這是Redis官方提供的Java組件。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM