1、高並發緩存失效問題:
緩存穿透:
指查詢一個一定不存在的數據,由於緩存不命中導致去查詢數據庫,但數據庫也無此記錄,我們沒有將此次查詢的null寫入緩存,導致這個不存在的數據每次請求都要到存儲層進行查詢,失去了緩存的意義;
風險:利用不存在的數據進行攻擊讓數據庫壓力增大最終崩潰;
解決:對不存在的數據進行緩存並加入短暫的過期時間;
緩存雪崩:
緩存雪崩是指我們在設置緩存時key采用相同的過期時間,導致緩存在某一個時刻同時失效,請求全部轉發到DB,DB瞬間壓力過重雪崩;
解決:原有的失效時間基礎上增加一個隨機值;
緩存擊穿:
對於一些設置過期時間的key,如果這些key會在某個時間被高並發地訪問,是一種非常“熱點”的數據;如果這個key在大量請求同時進來前正好失效,那么所有對這個key的數據查詢都落在db,我們稱之為緩存擊穿
解決:加鎖。大量並發情況下只讓一個人去查,其他人等到,查到數據后釋放鎖,其他人獲取到鎖后先查緩存,這樣就不會出現大量訪問DB的情況。
2、加鎖解決擊穿問題
2.1、加本地鎖
“確認緩存”與“查詢數據庫”完成后才釋放鎖,圖示:
改進后(將“確認緩存”、“查數據庫”、“結果放入數據庫“都放入鎖中):
代碼部分:
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDbWithLocalLock() { //加鎖,只要是同一把鎖就鎖柱所有線程,例如:100w個請求用同意一把鎖 synchronized (this) { //得到鎖后再去緩沖中查看,如果緩存沒有再去查看,有則直接取緩存。但是使用“this”只能鎖住本地服務,所以要使用分布式鎖 return getDataFromDb(); } }
private Map<String, List<Catalog2Vo>> getDataFromDb() { //得到鎖后再去緩沖中查看,如果緩存沒有再去查看,有則直接取緩存。但是使用“this”只能鎖住本地服務,所以要使用分布式鎖 String catalogJSON = redisTemplate.opsForValue().get("catalogJSON"); if (!StringUtils.isEmpty(catalogJSON)) { //如果緩存不為空直接返回 Map<String, List<Catalog2Vo>> result = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catalog2Vo>>>() { }); return result; } System.out.println("查了數據庫"); /* Map<String, List<Catalog2Vo>> catelogJson = (Map<String, List<Catalog2Vo>>) cache.get("catelogJson"); if (cache.get("catelogJson") == null){ }*/ List<CategoryEntity> selectList = baseMapper.selectList(null); //1、先查出所有分類 /** * 一級結構: * id:[ * {二級內容} * {二級內容} * ] */ //List<CategoryEntity> level1Categorys = getLevel1Categorys(); List<CategoryEntity> level1Categorys = getParent_cid(selectList, 0L); //2、封裝數據,二級結構 /** * "catalog1Id": * "catalog3List":[三級內容] * "id": "",(二級id) * "name": ""(二級名字) * @return */ Map<String, List<Catalog2Vo>> parent_cid = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> { //1、查到2級分類 //List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", v.getCatId())); List<CategoryEntity> categoryEntities = getParent_cid(selectList, v.getCatId()); List<Catalog2Vo> catalog2Vos = null; if (categoryEntities != null) { catalog2Vos = categoryEntities.stream().map(l2 -> { Catalog2Vo catalog2Vo = new Catalog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName()); /** * 三級內容: * { * "catalog2Id": "",(二級id) * "id": "",(三級id) * "name": "商務休閑鞋"(三級名字) * }, */ //List<CategoryEntity> level3Catalog = getParent_cid(l2); List<CategoryEntity> level3Catalog = getParent_cid(selectList, l2.getCatId()); if (level3Catalog != null) { List<Catalog2Vo.Catalog3Vo> collect = level3Catalog.stream().map(l3 -> { Catalog2Vo.Catalog3Vo catalog3Vo = new Catalog2Vo.Catalog3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName()); return catalog3Vo; }).collect(Collectors.toList()); catalog2Vo.setCatalog3List(collect); } return catalog2Vo; }).collect(Collectors.toList()); } return catalog2Vos; })); //3、放入緩存中 String s = JSON.toJSONString(parent_cid); //1, TimeUnit.DAYS 空結果緩存:解決緩存穿透;設計過期時間:解決雪崩 redisTemplate.opsForValue().set("catalogJSON", s, 1, TimeUnit.DAYS); return parent_cid; }
3、分布式鎖原理與使用
由於本地鎖只能管理本地服務的事務,所以在分布式微服務中引進了分布式鎖
1、分布式鎖演進:
1.1、分布式鎖階段一:
1.2、分布式鎖階段二:
1.3、 分布式鎖階段三:
1.4、分布式鎖階段四:
1.5、演進代碼:
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDbWithRedisLock() { //1、占分布式鎖,去redis占坑同時加鎖成功(原子性),設置過期時間(30秒)。假設過程出問題直接過期不會發生死鎖,改進第階段1和階段2,使用setIfAbsent原子命令將獲取鎖和過期時間同時設置 String uuid = UUID.randomUUID().toString(); Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS); if (lock) { System.out.println("獲取分布式鎖成功"); Map<String, List<Catalog2Vo>> dataFromDb; try { //執行業務 dataFromDb = getDataFromDb(); } finally { //刪除鎖.redisTemplate.opsForValue().get("lock")獲取需要時間釋放鎖,存在“不可重復讀”,所以獲取后刪除過程也要要求原子性; /* String lockValue = redisTemplate.opsForValue().get("lock"); if (uuid.equals(lockValue)){ redisTemplate.delete("lock"); }*/ //使用腳本,改進階段4 String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; // public <T> T execute(RedisScript<T> script, List<K> keys, Object... args)
// 采用隨機的uuid值,改進階段3 Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid); } return getDataFromDb(); } else { System.out.println("獲取分布式鎖失敗"); try { Thread.sleep(200); } catch (Exception e) { } //加鎖失敗,設定時間進行重試 return getCatalogJsonFromDbWithRedisLock();//自旋方式 } }
4、Redisson實踐:
1、Redisson官方文檔:
https://github.com/redisson/redisson/wiki/Table-of-Content
2、Redisson可重用鎖:
@ResponseBody @GetMapping("/hello") public String hello(){ RLock lock= redisson.getLock("my-lock"); lock.lock(10, TimeUnit.SECONDS);//這種方法不會自動續期
//Redisson功能: //1、可以鎖的自動續期,如果業務超長,因此不用擔心業務時間過長自動刪鎖 //2、枷鎖業務只要運行完成,就不會給當業務續期。默認是30s后自動刪鎖 try{ //模擬業務超長執行 System.out.println("加鎖成功"+Thread.currentThread().getId()); Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return "hello"; }
3、緩存數據與數據庫保持一致性:
/** * @return 緩存一致性 * 1、雙寫模式:數據庫修改時修改緩存數據,高並發情況下對數據庫進行修改容易出現臟數據 * 2、失效模式:數據庫更新時刪除緩存數據,等待下次查詢進行更新緩存 */ public Map<String, List<Catalog2Vo>> getCatalogJsonFromDbWithRedissonLock() { //1、占分布式鎖,去redis占坑,鎖的粒度越大,越細越快,鎖的名字一樣 RLock lock = redisson.getLock("CatalogJson-lock"); lock.lock(); Map<String, List<Catalog2Vo>> dataFromDb; try { //運行代碼 dataFromDb = getDataFromDb(); } finally { lock.unlock(); } return getDataFromDb(); }
3.1、雙寫模式:
3.2、失效模式(采用):
5、SpringCache的使用
1、主啟動類以及基本注解詳解:
/** * * @Cacheable:觸發保存緩存。 * @CacheEvict:觸發刪除緩存。 * @CachePut:更新緩存,而不會干擾方法的執行。 * @Caching:重新組合多個緩存操作。 * @CacheConfig:在類級別共享一些與緩存相關的常見設置。 * * 開啟緩存@EnableCaching * * 原理:CacheAutoConfiguration->RedisCacheConfiguration->自動配置了RedisCacheManager->初始化所有緩存->每個緩存決定用什么配置 * ->進行判斷,如果有自定義配置則按自定義配置,如果沒有則按默認->想改緩存配置,只需要給容器中放一個RedisCacheCpnfiguration */ @EnableRedisHttpSession @EnableCaching @EnableFeignClients(basePackages = "com.atguigu.gulimall.product.feign") @EnableDiscoveryClient @MapperScan(basePackages = "com.atguigu.gulimall.product.dao") @SpringBootApplication public class GulimallProductApplication { public static void main(String[] args) { SpringApplication.run(GulimallProductApplication.class, args); } }
2、注解應用:
/** * 級聯更新所有關聯的數據 * @CacheEvict:失效模式 * 1、同時進行多種緩存操作 * 2、指定刪除某個分區下所有數據:@CacheEvict(value = "category", allEntries = true) * 3、存儲同一類型數據、都可以指定同一個分區 * */ /*方法一:@Caching(evict = { @CacheEvict(value = "category", key = "'level1Categorys'"), @CacheEvict(value = "category", key = "'getCatalogJson'") })*/ //@CacheEvict一旦更改數據就清除緩存 @CacheEvict(value = "category", allEntries = true)//方法二 @Transactional @Override public void updateCascade(CategoryEntity category) { this.updateById(category); categoryBrandRelationService.updateCategory(category.getCatId(), category.getName()); } /** * (1指定生成的緩存使用的key:key屬性指定 * (2指定緩存的數據存活時間: * (3將數據保存為json */ //每一個緩存的數據都要制定要放到那個名字的緩存【緩存分區】 //緩存過期之后,如果多個線程同時請求對某個數據的訪問,會同時去到數據庫,導致數據庫瞬間負荷增高。 //屬性 sync 可以指示底層將緩存鎖住,使只有一個線程可以進入計算,而其他線程堵塞,直到返回結果更新到緩存中。 @Cacheable(cacheNames = "category", key = "'level1Categorys'", sync = true) //代表當前方法結果需要緩存,如果緩存中結果,有方法就不調用,如果沒有則調用方法並返回緩存結果 @Override public List<CategoryEntity> getLevel1Categorys() { System.out.println("----getLevel1Categorys----"); List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0)); return categoryEntities; }
3、將redis緩存數據數據保存為json格式
//可以讀取Properties文件 @EnableConfigurationProperties(CacheProperties.class) @EnableCaching @Configuration public class MyCacheConfig { @Bean RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) { RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig(); //Key和Value JSON序列化方式 config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())); config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer())); //使用默認才會讀出properties文件中的配置,自定義時roperties文件中的配置不生效 CacheProperties.Redis redisProperties = cacheProperties.getRedis(); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; } }
4、SpringCache能解決的事以及不足:
(1)讀模式
緩存擊穿:查詢一個不存在的數據。解決:緩存空數據( ache-null-values=true)
緩存雪崩:大量key剛好過期。解決:加隨機時間和過期時間(spring.cache.redis.time-to-live=3600000)
緩存擊穿:大量並發同時查詢一個正好過期的數據。解決:加鎖(@Cacheable(cacheNames = "category", key = "'level1Categorys'", sync = true))
(2)寫模式
SpringCache並未做太多處理
總結:讀多寫少,即時性、、一致性要求不搞的數據使用SpringCache