Redis-分布式鎖(解決緩存擊穿問題)


一. 簡介

分布式鎖在很多場景中都非常的有用,分布式鎖是一個概念,實現他的方式有很多,本篇文章是基於Redis實現的單機分布式鎖。

主要解決多並發編程中由於鎖競爭而帶來的數據不一致的問題。

二. 應用場景

在本篇文章中主要解決Redis中緩存擊穿問題。

並發的訪問一條數據,數據庫有,但是緩存中不存在(沒人訪問這條數據或者Redis中數據剛好過期),導致一瞬間多個請求訪問數據庫,數據庫壓力增大,這類數據通常為熱點數據。

三. 模擬緩存擊穿

以下程序模擬100個線程同時去訪問一條沒有緩存的數據。

1. 業務代碼(service層)

    @Override
    public Object listByRedis(String id) {
        HashMap<Object, Object> result = new HashMap<>();
        //通過布隆過濾器 解決緩存穿透問題. 會有誤判 但是沒有關系 不會有太多誤判。
        if (!bloomFilter.isExist(id)){
            result.put("status", 400);
            result.put("msg", "非法訪問");
            return result;
        }
        //查詢緩存
        Object redisData = redisTemplate.opsForValue().get(id);
        //是否命中
        if(redisData != null){
            //返回結果
            result.put("status", 200);
            result.put("msg", "緩存命中");
            result.put("data", redisData);
            return result;
        }
        try {
            UserInfo userInfo = userInfoMapper.selectById(id);
            if (userInfo != null){
                redisTemplate.opsForValue().set(id, userInfo, 10, TimeUnit.MINUTES);
                result.put("status", 200);
                result.put("msg", "查詢數據庫");
                result.put("data", userInfo);
                return result;
            }else{
                result.put("status", 200);
                result.put("msg", "沒有數據");
                return result;
            }
        }finally {

        }
    }

2. 並發模擬

並發訪問id=1096這條數據

public class ReadTest {

    private static CountDownLatch countDownLatch = new CountDownLatch(99);

    @Test
    public void test() throws InterruptedException {
        TicketsRunBle ticketsRunBle = new TicketsRunBle();
        for (int i = 0;i<=99;i++){
            new Thread(ticketsRunBle, "窗口"+i).start();
            countDownLatch.countDown();
        }
        Thread.currentThread().join();
    }

    public class TicketsRunBle implements Runnable{

        @Override
        public void run() {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RestTemplate restTemplate = new RestTemplate();
            R forObject = restTemplate.getForObject("http://localhost:8082/user?id=1096", R.class);
            System.out.println("結果:" + forObject);
        }
    }

}

 3. 執行結果

截取部分,都是數據庫查詢出來的,日志打印也有Mybatis的記錄。

四. 單機Redis分布式鎖的實現

Redis分布式鎖原理上是使用Setnx命令實現:

SET resource_name my_random_value NX PX 30000。

這個命令僅在不存在key的時候才能被執行成功(NX選項),並且這個key有一個30秒的自動失效時間(PX屬性)。這個key的值是“my_random_value”(一個隨機值),這個值在所有的客戶端必須是唯一的,所有同一key的獲取者(競爭者)這個值都不能一樣。
當client嘗試獲取鎖時,我們將事先定義的key設置一個值,之后的client再設置時則會不成功。

釋放鎖時實現:

為什么value要使用一個唯一的值,主要是為了更安全的釋放鎖,釋放鎖的時候使用腳本告訴Redis:只有key存在並且存儲的值和我指定的值一樣才能告訴我刪除成功。可以通過以下Lua腳本實現:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

使用這種方式可以避免刪除別的Client獲得的鎖。舉個栗子:
客戶端A取得資源鎖,但是緊接着被一個其他操作阻塞了,當客戶端A運行完畢其他操作后要釋放鎖時,原來的鎖早已超時並且被Redis自動釋放,並且在這期間資源鎖又被客戶端B再次獲取到。如果僅使用DEL命令將key刪除,那么這種情況就會把客戶端B的鎖給刪除掉。使用Lua腳本就不會存在這種情況,因為腳本僅會刪除value等於客戶端A的value的key(value相當於客戶端的一個簽名)。

本篇文章獲取鎖有一點不同,上面說的是設置一個固定的key,而本篇文章解決的問題是基於單條數據的一個並發查詢。

所以需要對單條數據的ID作為key進行加鎖,防止查詢同一條數據多次訪問數據庫。

1. 鎖實現:

/**
 * 自定義分布式鎖
 * 這里主要實現對單條數據進行加鎖,通過id進行加鎖
 * 多個線程同時訪問該數據會阻塞
 * @author
 * @Date 2022/1/6
 */
@Component
public class RedisLock {

    private static JedisPool jedisPool;

    @Autowired
    public void setJedisPool(JedisPool jedisPool){
        RedisLock.jedisPool = jedisPool;
    }

    /**
     * 鎖健
     */
    private final static String KEY = "lock_key_";

    /**
     * 鎖過期時間
     */
    private final static long LOCK_EXPIRED = 30000;

    /**
     * 鎖競爭超時時間
     */
    private final static long LOCK_WAIT_TIME_OUT = 999999;

    /**
     * SET命令參數
     */
    static SetParams params = SetParams.setParams().nx().px(LOCK_EXPIRED);

    /**
     * ThreadLocal用於保存某個線程共享變量:對於同一個static ThreadLocal
     * 不同的線程只能從中get,set到自己線程的副本
     */
    private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    /**
     * 嘗試獲取鎖
     * @param key
     * @return
     */
    public Boolean tryLock(String key){
        String value = UUID.randomUUID().toString();
        Jedis resource = jedisPool.getResource();
        long startTime = System.currentTimeMillis();
        try {
            for(;;){
                //SET命令返回OK,獲取鎖成功
                String set = resource.set(KEY.concat(key), value, params);
                if ("OK".equals(set)){
                    threadLocal.set(value);
                    return true;
                }
                //增加一個超時時間判斷
                if(System.currentTimeMillis() - startTime > LOCK_WAIT_TIME_OUT){
                    return false;
                }
                //休眠一段時間 遞歸調用
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }finally {
            resource.close();
        }
    }

    /**
     * 釋放鎖 通過lua腳本實現
     * @param key
     * @return
     */
    public boolean unLock(String key){
        Jedis resource = null;
        try {
            resource = jedisPool.getResource();
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then" +
                    " return redis.call('del', KEYS[1]) " +
                    "else" +
                    " return 0 " +
                    "end";
            Object eval = resource.eval(script, Collections.singletonList(KEY.concat(key)), Collections.singletonList(threadLocal.get()));
            if ("1".equals(eval.toString())) {
                return true;
            }
            return false;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }finally {
            if (resource != null){
                resource.close();
            }
        }
    }
}

2. 業務代碼(service層)

    @Override
    public Object listByRedis(String id) {
        HashMap<Object, Object> result = new HashMap<>();
        //通過布隆過濾器 解決緩存穿透問題. 會有誤判 但是沒有關系 不會有太多誤判。
        if (!bloomFilter.isExist(id)){
            result.put("status", 400);
            result.put("msg", "非法訪問");
            return result;
        }
        //查詢緩存
        Object redisData = redisTemplate.opsForValue().get(id);
        //是否命中
        if(redisData != null){
            //返回結果
            result.put("status", 200);
            result.put("msg", "緩存命中");
            result.put("data", redisData);
            return result;
        }
        try {
            //添加分布式鎖,進來后在查詢一次緩存,如果上一個線程已經查詢並且存入緩存
            Boolean lock = redisLock.tryLock(id);
            if (!lock){
                result.put("status", 500);
                result.put("msg", "訪問超時,稍后再試");
                return result;
            }
            //查詢緩存
            redisData = redisTemplate.opsForValue().get(id);
            //是否命中
            if(redisData != null){
                //返回結果
                result.put("status", 200);
                result.put("msg", "緩存命中");
                result.put("data", redisData);
                return result;
            }
            UserInfo userInfo = userInfoMapper.selectById(id);
            if (userInfo != null){
                redisTemplate.opsForValue().set(id, userInfo, 10, TimeUnit.MINUTES);
                result.put("status", 200);
                result.put("msg", "查詢數據庫");
                result.put("data", userInfo);
                return result;
            }else{
                result.put("status", 200);
                result.put("msg", "沒有數據");
                return result;
            }
        }finally {
            redisLock.unLock(id);
        }
    }

3. 測試結果

打印日志顯示,只會有一次數據庫查詢。

返回的結果除了第一次是查詢數據庫,后面的都是緩存命中


五. 總結

該鎖的實現有很多不足之處,不斷了解學習的一個過程,可使用Redisson中實現的分布式鎖可用性更高。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM