關於Redis分布式鎖這一篇應該是講的最好的了,先收藏起來再看!


前言

在Java並發編程中,我們通常使用到synchronized 、Lock這兩個線程鎖,Java中的鎖,只能保證對同一個JVM中的線程有效。而在分布式集群環境,這個時候我們就需要使用到分布式鎖。

實現分布式鎖的方案

  • 基於數據庫實現分布式鎖
  • 基於緩存Redis實現分布式鎖
  • 基於Zookeeper的臨時序列化節點實現分布式鎖

Redis實現分布式鎖

場景:在高並發的情況下,可能有大量請求來到數據庫查詢三級分類數據,而這種數據不會經常改變,可以引入緩存來存儲第一次從數據庫查詢出來的數據,其他線程就可以去緩存中獲取數據,來減少數據庫的查詢壓力。

在集群的環境下,就可以使用分布式鎖來控制去查詢數據庫的次數。

階段一

    private  Map<String, List<Catelog2Vo>>  getCatalogJsonDBWithRedisLock() {
        // 去Redis中搶占位置
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
        if (lock){
            // 搶到鎖了 執行業務
            Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
            // 刪除鎖
            stringRedisTemplate.delete("lock");
            return dataFromDb;
        }else {
            // 自旋獲取鎖
            // 休眠100ms
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonDBWithRedisLock();
        }
    }

得到鎖以后,我們應該再去緩存中確定一次,如果沒有才需要繼續查詢,從數據庫查到數據以后,應該先把數據放入緩存中,再將數據返回。

  private Map<String, List<Catelog2Vo>> getDataFromDb() {
        // 得到鎖以后,我們應該再去緩存中確定一次,如果沒有才需要繼續查詢
        String catalogJson = stringRedisTemplate.opsForValue().get("catalogJson");
        if (!StringUtils.isEmpty(catalogJson)) {
            // 反序列化 轉換為指定對象
            Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJson, new 
            TypeReference<Map<String, List<Catelog2Vo>>>() {});
            return result;
        }
        System.out.println("查詢數據庫了......");
        // 查詢所有分類數據在進行刷選
        List<CategoryEntity> categoryEntityList = baseMapper.selectList(null);
        // 查詢一級分類
        List<CategoryEntity> leave1Categorys = getParent_cid(categoryEntityList, 0L);
        Map<String, List<Catelog2Vo>> listMap = leave1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), l1 -> {

            List<CategoryEntity> categoryL2List = getParent_cid(categoryEntityList, l1.getCatId());
            List<Catelog2Vo> catelog2Vos = null;
            if (categoryL2List != null) {
                catelog2Vos = categoryL2List.stream().map(l2 -> {
                    Catelog2Vo catelog2Vo = new Catelog2Vo(l2.getParentCid().toString(), null, l2.getCatId().toString(), l2.getName());

                    List<CategoryEntity> categoryL3List = getParent_cid(categoryEntityList, 
                                                                        l2.getCatId());
                    if (categoryL3List != null) {
                        List<Catelog2Vo.Catelog3Vo> catelog3Vos =  
                            categoryL3List.stream().map(l3 -> {
                            Catelog2Vo.Catelog3Vo catelog3Vo = new 
                               Catelog2Vo.Catelog3Vo(l2.getCatId().toString(), 
                                                      l3.getCatId().toString(), 
                                                      l3.getName());
                            return catelog3Vo;
                        }).collect(Collectors.toList());
                        catelog2Vo.setCatalog3List(catelog3Vos);
                    }
                    return catelog2Vo;
                }).collect(Collectors.toList());
            }
            return catelog2Vos;
        }));
        // 最后需將數據加入的緩存中
        String jsonString = JSON.toJSONString(listMap);
        stringRedisTemplate.opsForValue().set("catalogJson", jsonString, 1L, 
                                              TimeUnit.DAYS);

        return listMap;
    }

階段二

    private  Map<String, List<Catelog2Vo>>  getCatalogJsonDBWithRedisLock() {
        // 去Redis中搶占位置
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
        if (lock){
            // 搶到鎖了 執行業務
            // 設置過期時間
            stringRedisTemplate.expire("lock",3,TimeUnit.SECONDS);
            Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
            // 刪除鎖
            stringRedisTemplate.delete("lock");
            return dataFromDb;
        }else {
            // 自旋獲取鎖
            // 休眠100ms
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonDBWithRedisLock();
        }
    }

階段三

  private  Map<String, List<Catelog2Vo>>  getCatalogJsonDBWithRedisLock() {
        // 去Redis中搶占位置  保證原子性
      Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", 
      "1111",300,TimeUnit.SECONDS);
        if (lock){
            // 搶到鎖了 執行業務
            Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
            // 刪除鎖
            stringRedisTemplate.delete("lock");
            return dataFromDb;
        }else {
            // 自旋獲取鎖
            // 休眠100ms
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonDBWithRedisLock();
        }
    }

階段四

private  Map<String, List<Catelog2Vo>>  getCatalogJsonDBWithRedisLock() {
        String uuid = UUID.randomUUID().toString();
        // 去Redis中搶占位置  保證原子性
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
        if (lock){
            // 搶到鎖了 執行業務
            Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
            String s = stringRedisTemplate.opsForValue().get("lock");
            if (uuid.equals(s)){
                // 刪除鎖
                stringRedisTemplate.delete("lock");
            }
            return dataFromDb;
        }else {
            // 自旋獲取鎖
            // 休眠100ms
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonDBWithRedisLock();
        }
    }

階段五

  private  Map<String, List<Catelog2Vo>>  getCatalogJsonDBWithRedisLock() {
        String uuid = UUID.randomUUID().toString();
        // 去Redis中搶占位置  保證原子性
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
        if (lock){
            // 搶到鎖了 執行業務
            Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
            String s = stringRedisTemplate.opsForValue().get("lock");
            // 獲取值對比+對比成功刪除=原子操作 Lua腳本解鎖
            String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
                    "    return redis.call(\"del\",KEYS[1])\n" +
                    "else\n" +
                    "    return 0\n" +
                    "end";
            Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class) , Arrays.asList("lock"), uuid);
            return dataFromDb;
        }else {
            // 自旋獲取鎖
            // 休眠100ms
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonDBWithRedisLock();
        }
    }

小總結
  1. stringRedisTemplate.opsForValue().setIfAbsent(“lock”, uuid,300,TimeUnit.SECONDS);
  2. stringRedisTemplate.execute(new DefaultRedisScript(script, Long.class) , Arrays.asList(“lock”), uuid);
  3. 使用Redis來實現分布式鎖需保證加鎖【占位+過期時間】和刪除鎖【判斷+刪除】操作的原子性。
  4. Redis鎖的過期時間小於業務的執行時間該如何自動續期?
    • 設置一個比業務耗時更長的過期時間
    • Redisson的看門狗機制

Redisson實現分布式鎖

Redisson 簡介

Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務。其中包括(BitSetSetMultimapSortedSetMapListQueueBlockingQueueDequeBlockingDequeSemaphoreLockAtomicLongCountDownLatchPublish / SubscribeBloom filterRemote serviceSpring cacheExecutor serviceLive Object serviceScheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。

原理機制

集成Spring Boot 項目

  1. 引入依賴 【可引入Spring Boot 封裝好的starter】

      <!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.12.0</version>
            </dependency>
    
    
  2. 添加配置類

    @Configuration
    public class MyRedissonConfig {
    
        @Bean(destroyMethod = "shutdown")
        public RedissonClient redissonClient(){
            // 創建配置  記得加redis://
            Config config = new Config();
            config.useSingleServer().setAddress("redis://192.168.26.104:6379");
            // 根據配置創建RedissClient客戶端
            RedissonClient redissonClient = Redisson.create(config);
            return redissonClient;
        }
    }
    
    
可重入鎖 Reentrant Lock
  1. 獲取一把鎖 redissonClient.getLock(“my-lock”);
  2. 給業務代碼加鎖 lock.lock();
  3. 解鎖 lock.unlock();
  4. 看門狗機制 鎖會自動續期
@ResponseBody
    @GetMapping("/hello")
    public String hello(){
        // 1、獲取一把鎖,只要鎖的名字一樣,就是同一把鎖
        RLock lock = redissonClient.getLock("my-lock");
        // 加鎖
        // 阻塞式等待,默認加的鎖都是【看門狗時間】30s時間
        //1)、鎖的自動續期,如果業務超長,運行期間自動給鎖續上新的30s,不用擔心業務時間長,鎖自動過期被刪掉
        //2)、加鎖的業務只要運行完成,就不會給當前鎖續期,即使不手動解鎖,鎖默認在30s以后自動刪除
        lock.lock();
        try {
            System.out.println("加鎖成功......."+Thread.currentThread().getId());
            Thread.sleep(30000);
        } catch (InterruptedException e) {

        }finally {
            // 釋放鎖   不會出現死鎖狀態 如果沒有執行解鎖,鎖有過期時間,過期了會將鎖刪除
            lock.unlock();
            System.out.println("解鎖成功......"+Thread.currentThread().getId());
        }
        return "hello";
    }

lock方法有一個重載方法 lock(long leaseTime, TimeUnit unit)

  public void lock(long leaseTime, TimeUnit unit) {
        try {
            this.lock(leaseTime, unit, false);
        } catch (InterruptedException var5) {
            throw new IllegalStateException();
        }
    }

注意:指定了過期時間后,不會進行自動續期,此時如果有多個線程,即便業務還然在執行,過期時間到了之后,鎖就會被釋放,其他線程就會爭搶到鎖。

二個方法對比

  1. 如果設置了過期時間,就會發生執行腳本給Redis,進行占鎖,設置過期時間為我們指定的時間。

  2. 未設置過期時間,就會使用看門狗的默認時間LockWatchdogTimeout 30*1000

  3. 只有沒有指定過期的時間的方法才有自動續期功能

  4. 自動續期實現機制 :只要占鎖成功,就會自動啟動一個定時任務【重新給鎖設置過期時間,新的過期時間就是看門狗的默認時間】,每隔10s【( internalLockLeasTime)/3】都會自動續期。

  5. 持有鎖的機器宕機問題,因為來不及續期,所以鎖自動被釋放,當該機再次恢復時,因為其后台守護線程是ScheduleTask,所以恢復后會馬上執行一次watchDog續期邏輯,執行過程中,它會感知到自己已經丟失了鎖,所以不存在共同持有的問題。

讀寫鎖 ReadWriteLock

保證一定能讀到最新數據,修改期間,寫鎖是一個互斥鎖,讀鎖是一個共享鎖。

  1. 寫+讀 寫鎖沒有釋放,讀鎖就得等待
  2. 寫+寫 阻塞方式
  3. 讀+寫 寫鎖等待讀鎖釋放才能加鎖
  4. 讀+讀 相當於無鎖,並發讀
 @ResponseBody
    @GetMapping("/write")
    public String writeLock(){
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
        RLock rLock = readWriteLock.writeLock();
        String s = "";
        try {
            rLock.lock();
            System.out.println("寫鎖加鎖成功......"+Thread.currentThread().getId());
            s = UUID.randomUUID().toString();
            stringRedisTemplate.opsForValue().set("writeLock",s);
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            rLock.unlock();
            System.out.println("寫鎖釋放成功......"+Thread.currentThread().getId());
        }
       return s;
    }

    @ResponseBody
    @GetMapping("/read")
    public String readLock(){
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
        RLock rLock = readWriteLock.readLock();
        rLock.lock();
        String s = "";
        try{
            System.out.println("讀鎖加鎖成功......"+Thread.currentThread().getId());
            s = stringRedisTemplate.opsForValue().get("writeLock");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            System.out.println("讀鎖釋放成功......"+Thread.currentThread().getId());
            rLock.unlock();
        }
        return s;
    }

信號量 Semaphore

使用信號量來做分布式限流

    @ResponseBody
    @GetMapping("/park")
    public String park() throws InterruptedException {
        RSemaphore park = redissonClient.getSemaphore("park");
        // 搶占一個車位
        boolean b = park.tryAcquire();
        // 如果還可以搶占到 就執行業務代碼
        if (b){
            // 執行業務代碼
        }else {
            return "error";
        }
        return "ok=>"+b;
    }

    @ResponseBody
    @GetMapping("/go")
    public String go() {
        RSemaphore park = redissonClient.getSemaphore("park");
        //  釋放一個車位
        park.release();
        return "ok";
    }

閉鎖 CountDownLatch

模擬場景:等待班級放學走了,保安關校門。

    @ResponseBody
    @GetMapping("/lockdoor")
    public String lockDoor() throws InterruptedException {
        RCountDownLatch door = redissonClient.getCountDownLatch("door");
        door.trySetCount(5);
        // 等待閉鎖完成
        door.await();
        return "放假了.....";
    }

    @GetMapping("/gogogo/{id}")
    @ResponseBody
    public String gogogo(@PathVariable("id")Long id){
        RCountDownLatch door = redissonClient.getCountDownLatch("door");
        // 計數減一
        door.countDown();
        return id+"班級走了....";
    }

Redisson解決上面Redis查詢問題
  /**
     * 使用Redisson分布式鎖來實現多個服務共享同一緩存中的數據
     * @return
     */
    private Map<String, List<Catelog2Vo>> getCatalogJsonDBWithRedissonLock() {
        RLock lock = redissonClient.getLock("catalogJson-lock");
         // 該方法會阻塞其他線程向下執行,只有釋放鎖之后才會接着向下執行
        lock.lock();
        Map<String, List<Catelog2Vo>> dataFromDb = null;
        try {
            dataFromDb = getDataFromDb();
        }finally {
            lock.unlock();
        }
        return dataFromDb;
    }

最后

歡迎關注公眾號:前程有光,領取一線大廠Java面試題總結+各知識點學習思維導+一份300頁pdf文檔的Java核心知識點總結!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM