SpringBoot使用RedisTemplate+Lua腳本實現Redis分布式鎖


SpringBoot使用RedisTemplate+Lua腳本實現Redis分布式鎖

問題:定時任務部署在多台Tomcat上,因此到達指定的定時時間時,多台機器上的定時器可能會同時啟動,造成重復數據或者程序異常等問題。

//發送消息,不能重復發送
@Scheduled(cron = "0 0/15 * * * ? ")
public void sendMsg(String userId) {
 
}

項目部署在Tom 1 ,Tom 2

如何控制只有一個Tomcat在同一時刻執行任務

使用分布式鎖來控制,誰搶到了鎖就讓誰執行。

一、基於Redis實現分布式鎖

package cn.pconline.pcloud.base.util;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @Description 使用RedisTemplate+Lua腳本實現Redis分布式鎖
 * @Author jie.zhao
 * @Date 2019/11/19 11:46
 */
@Component
public class RedisLock {
    @Autowired
    private StringRedisTemplate template;

    private static final Long RELEASE_SUCCESS = 1L;

    private static final long DEFAULT_TIMEOUT = 1000 * 10;

    private static final String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

    /**
     * 嘗試獲取鎖 立即返回
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lock(String key, String value, long timeout) {
        return template.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);
    }

    /**
     * 以阻塞方式的獲取鎖
     *
     * @param key
     * @param value
     * @param timeout
     * @return
     */
    public boolean lockBlock(String key, String value, long timeout) {
        long start = System.currentTimeMillis();
        while (true) {
            //檢測是否超時
            if (System.currentTimeMillis() - start > timeout) {
                return false;
            }
            //執行set命令
            //1
            Boolean absent = template.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS);
            //其實沒必要判NULL,這里是為了程序的嚴謹而加的邏輯
            if (absent == null) {
                return false;
            }
            //是否成功獲取鎖
            if (absent) {
                return true;
            }
        }
    }

    public boolean unlock(String key, String value) {
        // 使用Lua腳本:先判斷是否是自己設置的鎖,再執行刪除
        // 使用lua腳本刪除redis中匹配value的key,可以避免由於方法執行時間過長而redis鎖自動過期失效的時候誤刪其他線程的鎖
        // spring自帶的執行腳本方法中,集群模式直接拋出不支持執行腳本的異常EvalSha is not supported in cluster environment.,所以只能拿到原redis的connection來執行腳本

        List<String> keys = new ArrayList<>();
        keys.add(key);
        List<String> args = new ArrayList<>();
        args.add(value);

        Long result = template.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                // 集群模式和單機模式雖然執行腳本的方法一樣,但是沒有共同的接口,所以只能分開執行
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                }

                // 單機模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                }
                return 0L;
            }
        });

        //返回最終結果
        return RELEASE_SUCCESS.equals(result);
    }
}

使用方法:

@Scheduled(cron = "0 0/15 * * * ? ")
public void sendExamTemplateMsg() {
    if (redisLock.lock(RedisKey.REDIS_JOB_SEND_KEY, RedisKey.REDIS_JOB_SEND_VALUE, 1000 * 60)) {
        
        //....
        log.info("定時輪詢考試安排通知結束 \t" + new Date());
    } else {
        log.info("定時輪詢考試安排,未獲取到鎖其他應用正在執行 \t" + new Date());
    }
}

二、分布式鎖的要求

  • 互斥性。在任意時刻,只有一個客戶端能持有鎖。
  • 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。
  • 具有容錯性。只要大部分的Redis節點正常運行,客戶端就可以加鎖和解鎖。
  • 解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。

三、存在的問題

注意!!!! 該加鎖方法僅針對單實例 Redis 可實現分布式加鎖,或者使用場景少的業務。

原因對於 Redis 集群會有一定幾率出現問題

例如:當進程1對master節點寫入了鎖,此時master節點宕機,slave節點提升為master而剛剛寫入master的鎖還未同步,此時進程2也將能夠獲取鎖成功,此時必然會導致數據不同步問題。還有另一個問題即: key 超時之后業務並沒有執行完畢但卻自動釋放鎖了,這樣就會導致並發問題。

如果需要更加健壯的Redis集群分布式鎖,推薦使用Redisson


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM