分布式鎖結合SpringCache


1、高並發緩存失效問題:

緩存穿透:

指查詢一個一定不存在的數據,由於緩存不命中導致去查詢數據庫,但數據庫也無此記錄,我們沒有將此次查詢的null寫入緩存,導致這個不存在的數據每次請求都要到存儲層進行查詢,失去了緩存的意義;

風險:利用不存在的數據進行攻擊讓數據庫壓力增大最終崩潰;

解決:對不存在的數據進行緩存並加入短暫的過期時間;

 

緩存雪崩:

緩存雪崩是指我們在設置緩存時key采用相同的過期時間,導致緩存在某一個時刻同時失效,請求全部轉發到DB,DB瞬間壓力過重雪崩;

解決:原有的失效時間基礎上增加一個隨機值;

 

緩存擊穿:

對於一些設置過期時間的key,如果這些key會在某個時間被高並發地訪問,是一種非常“熱點”的數據;如果這個key在大量請求同時進來前正好失效,那么所有對這個key的數據查詢都落在db,我們稱之為緩存擊穿

解決:加鎖。大量並發情況下只讓一個人去查,其他人等到,查到數據后釋放鎖,其他人獲取到鎖后先查緩存,這樣就不會出現大量訪問DB的情況。

 2、加鎖解決擊穿問題

2.1、加本地鎖

“確認緩存”與“查詢數據庫”完成后才釋放鎖,圖示:

改進后(將“確認緩存”、“查數據庫”、“結果放入數據庫“都放入鎖中):

代碼部分:

    public Map<String, List<Catalog2Vo>> getCatalogJsonFromDbWithLocalLock() {
        //加鎖,只要是同一把鎖就鎖柱所有線程,例如:100w個請求用同意一把鎖
        synchronized (this) {
            //得到鎖后再去緩沖中查看,如果緩存沒有再去查看,有則直接取緩存。但是使用“this”只能鎖住本地服務,所以要使用分布式鎖
            return getDataFromDb();
        }
    }
private Map<String, List<Catalog2Vo>> getDataFromDb() {
        //得到鎖后再去緩沖中查看,如果緩存沒有再去查看,有則直接取緩存。但是使用“this”只能鎖住本地服務,所以要使用分布式鎖
        String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
        if (!StringUtils.isEmpty(catalogJSON)) {
            //如果緩存不為空直接返回
            Map<String, List<Catalog2Vo>> result = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catalog2Vo>>>() {
            });
            return result;
        }
        System.out.println("查了數據庫");
            /*        Map<String, List<Catalog2Vo>> catelogJson = (Map<String, List<Catalog2Vo>>) cache.get("catelogJson");

        if (cache.get("catelogJson") == null){

        }*/
        List<CategoryEntity> selectList = baseMapper.selectList(null);
        //1、先查出所有分類
        /**
         * 一級結構:
         * id:[
         * {二級內容}
         * {二級內容}
         * ]
         */
        //List<CategoryEntity> level1Categorys = getLevel1Categorys();
        List<CategoryEntity> level1Categorys = getParent_cid(selectList, 0L);

        //2、封裝數據,二級結構
        /**
         * "catalog1Id":
         * "catalog3List":[三級內容]
         * "id": "",(二級id)
         * "name": ""(二級名字)
         * @return
         */
        Map<String, List<Catalog2Vo>> parent_cid = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
            //1、查到2級分類
            //List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", v.getCatId()));
            List<CategoryEntity> categoryEntities = getParent_cid(selectList, v.getCatId());
            List<Catalog2Vo> catalog2Vos = null;
            if (categoryEntities != null) {
                catalog2Vos = categoryEntities.stream().map(l2 -> {
                    Catalog2Vo catalog2Vo = new Catalog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName());
                    /**
                     * 三級內容:
                     * {
                     *   "catalog2Id": "",(二級id)
                     *   "id": "",(三級id)
                     *   "name": "商務休閑鞋"(三級名字)
                     *  },
                     */
                    //List<CategoryEntity> level3Catalog = getParent_cid(l2);
                    List<CategoryEntity> level3Catalog = getParent_cid(selectList, l2.getCatId());
                    if (level3Catalog != null) {
                        List<Catalog2Vo.Catalog3Vo> collect = level3Catalog.stream().map(l3 -> {
                            Catalog2Vo.Catalog3Vo catalog3Vo = new Catalog2Vo.Catalog3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName());
                            return catalog3Vo;
                        }).collect(Collectors.toList());
                        catalog2Vo.setCatalog3List(collect);
                    }
                    return catalog2Vo;
                }).collect(Collectors.toList());
            }
            return catalog2Vos;
        }));

        //3、放入緩存中
        String s = JSON.toJSONString(parent_cid); //1, TimeUnit.DAYS 空結果緩存:解決緩存穿透;設計過期時間:解決雪崩
        redisTemplate.opsForValue().set("catalogJSON", s, 1, TimeUnit.DAYS); return parent_cid;
    }

3、分布式鎖原理與使用

由於本地鎖只能管理本地服務的事務,所以在分布式微服務中引進了分布式鎖

1、分布式鎖演進:

1.1、分布式鎖階段一:

 1.2、分布式鎖階段二:

1.3、 分布式鎖階段三:

 1.4、分布式鎖階段四:

 1.5、演進代碼:

public Map<String, List<Catalog2Vo>> getCatalogJsonFromDbWithRedisLock() {
        //1、占分布式鎖,去redis占坑同時加鎖成功(原子性),設置過期時間(30秒)。假設過程出問題直接過期不會發生死鎖,改進第階段1和階段2,使用setIfAbsent原子命令將獲取鎖和過期時間同時設置
        String uuid = UUID.randomUUID().toString();
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
        if (lock) {
            System.out.println("獲取分布式鎖成功");
            Map<String, List<Catalog2Vo>> dataFromDb;
            try {
                //執行業務
                dataFromDb = getDataFromDb();
            } finally {
             //刪除鎖.redisTemplate.opsForValue().get("lock")獲取需要時間釋放鎖,存在“不可重復讀”,所以獲取后刪除過程也要要求原子性;
/*            String lockValue = redisTemplate.opsForValue().get("lock");
            if (uuid.equals(lockValue)){
                redisTemplate.delete("lock");
            }*/
                //使用腳本,改進階段4
                String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
                //   public <T> T execute(RedisScript<T> script, List<K> keys, Object... args)
          // 采用隨機的uuid值,改進階段3
Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid); } return getDataFromDb(); } else { System.out.println("獲取分布式鎖失敗"); try { Thread.sleep(200); } catch (Exception e) { } //加鎖失敗,設定時間進行重試 return getCatalogJsonFromDbWithRedisLock();//自旋方式 } }

4、Redisson實踐:

1、Redisson官方文檔:

https://github.com/redisson/redisson/wiki/Table-of-Content

2、Redisson可重用鎖:

 @ResponseBody
    @GetMapping("/hello")
    public String hello(){
        RLock lock= redisson.getLock("my-lock");
        lock.lock(10, TimeUnit.SECONDS);//這種方法不會自動續期
     //Redisson功能:
//1、可以鎖的自動續期,如果業務超長,因此不用擔心業務時間過長自動刪鎖 //2、枷鎖業務只要運行完成,就不會給當業務續期。默認是30s后自動刪鎖 try{ //模擬業務超長執行 System.out.println("加鎖成功"+Thread.currentThread().getId()); Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return "hello"; }

3、緩存數據與數據庫保持一致性:

/**
     * @return 緩存一致性
     * 1、雙寫模式:數據庫修改時修改緩存數據,高並發情況下對數據庫進行修改容易出現臟數據 * 2、失效模式:數據庫更新時刪除緩存數據,等待下次查詢進行更新緩存 */
    public Map<String, List<Catalog2Vo>> getCatalogJsonFromDbWithRedissonLock() {
        //1、占分布式鎖,去redis占坑,鎖的粒度越大,越細越快,鎖的名字一樣
        RLock lock = redisson.getLock("CatalogJson-lock");
        lock.lock();

        Map<String, List<Catalog2Vo>> dataFromDb;
        try {
            //運行代碼
            dataFromDb = getDataFromDb();
        } finally {
            lock.unlock();
        }

        return getDataFromDb();

    }

3.1、雙寫模式:

 3.2、失效模式(采用):

5、SpringCache的使用

1、主啟動類以及基本注解詳解:

/**
 *
 * @Cacheable:觸發​​保存緩存。
 * @CacheEvict:觸發​​刪除緩存。
 * @CachePut:更新緩存,而不會干擾方法的執行。
 * @Caching:重新組合多個緩存操作。
 * @CacheConfig:在類級別共享一些與緩存相關的常見設置。
 *
 * 開啟緩存@EnableCaching
 *
 * 原理:CacheAutoConfiguration->RedisCacheConfiguration->自動配置了RedisCacheManager->初始化所有緩存->每個緩存決定用什么配置
 * ->進行判斷,如果有自定義配置則按自定義配置,如果沒有則按默認->想改緩存配置,只需要給容器中放一個RedisCacheCpnfiguration
 */
@EnableRedisHttpSession
@EnableCaching
@EnableFeignClients(basePackages = "com.atguigu.gulimall.product.feign")
@EnableDiscoveryClient
@MapperScan(basePackages = "com.atguigu.gulimall.product.dao")
@SpringBootApplication
public class GulimallProductApplication {

    public static void main(String[] args) {
        SpringApplication.run(GulimallProductApplication.class, args);
    }

}

 2、注解應用:

    /**
     * 級聯更新所有關聯的數據
     * @CacheEvict:失效模式
     * 1、同時進行多種緩存操作
     * 2、指定刪除某個分區下所有數據:@CacheEvict(value = "category", allEntries = true)
     * 3、存儲同一類型數據、都可以指定同一個分區
     * 
     */
    
/*方法一:@Caching(evict = {
            @CacheEvict(value = "category", key = "'level1Categorys'"),
            @CacheEvict(value = "category", key = "'getCatalogJson'")
    })*/
//@CacheEvict一旦更改數據就清除緩存
    @CacheEvict(value = "category", allEntries = true)//方法二
    @Transactional
    @Override
    public void updateCascade(CategoryEntity category) {
        this.updateById(category);
        categoryBrandRelationService.updateCategory(category.getCatId(), category.getName());
    }

    /**
     * (1指定生成的緩存使用的key:key屬性指定
     * (2指定緩存的數據存活時間:
     * (3將數據保存為json
     */
    //每一個緩存的數據都要制定要放到那個名字的緩存【緩存分區】
    //緩存過期之后,如果多個線程同時請求對某個數據的訪問,會同時去到數據庫,導致數據庫瞬間負荷增高。
    //屬性 sync 可以指示底層將緩存鎖住,使只有一個線程可以進入計算,而其他線程堵塞,直到返回結果更新到緩存中。
    @Cacheable(cacheNames = "category", key = "'level1Categorys'", sync = true) //代表當前方法結果需要緩存,如果緩存中結果,有方法就不調用,如果沒有則調用方法並返回緩存結果
    @Override
    public List<CategoryEntity> getLevel1Categorys() {
        System.out.println("----getLevel1Categorys----");
        List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
        return categoryEntities;
    }

3、將redis緩存數據數據保存為json格式

//可以讀取Properties文件
@EnableConfigurationProperties(CacheProperties.class)
@EnableCaching
@Configuration
public class MyCacheConfig {
    @Bean
    RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {

        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();

        //Key和Value JSON序列化方式
        config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
        config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));

        //使用默認才會讀出properties文件中的配置,自定義時roperties文件中的配置不生效
        CacheProperties.Redis redisProperties = cacheProperties.getRedis();
        if (redisProperties.getTimeToLive() != null) {
            config = config.entryTtl(redisProperties.getTimeToLive());
        }
        if (redisProperties.getKeyPrefix() != null) {
            config = config.prefixCacheNameWith(redisProperties.getKeyPrefix());
        }
        if (!redisProperties.isCacheNullValues()) {
            config = config.disableCachingNullValues();
        }
        if (!redisProperties.isUseKeyPrefix()) {
            config = config.disableKeyPrefix();
        }

        return config;
    }
}

 4、SpringCache能解決的事以及不足:

(1)讀模式

緩存擊穿:查詢一個不存在的數據。解決:緩存空數據( ache-null-values=true)

緩存雪崩:大量key剛好過期。解決:加隨機時間和過期時間(spring.cache.redis.time-to-live=3600000)

緩存擊穿:大量並發同時查詢一個正好過期的數據。解決:加鎖(@Cacheable(cacheNames = "category", key = "'level1Categorys'", sync = true)

(2)寫模式

SpringCache並未做太多處理

總結:讀多寫少,即時性、、一致性要求不搞的數據使用SpringCache

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM