現在Redis基本上沒家公司都在使用,只是各自使用的場景不以,但Redis最出名的還是做為緩存服務器,提搞服務器的的吞吐量,下面我們來圍繞這個作為緩存做一個總結
今天的目標其實是Redis的分布式鎖,但索性全部理一理吧,正好最近在找工作
SpringBoot2.0后默認使用lettuce作為底層操作Redis的客戶端,它使用Netty進行網絡 通信,lettuce的bug會導致netty堆外內存的溢出,netty默認使用的內存為-Xms300m,但是我們不可能去無限的加大這個內存來防止這個異常的發生
解決方案:1:升級lettuce客戶端,2:切換使用jedis
當我們整合Redis時,這里給大家解釋一下,無論lettuce還是jedis都是RedisTemplate底層操作Redis的依賴,SpringBoot自動配置的,我們只需要加入任一依賴,RedisTemplate就會被裝配成功
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>
緩存穿透即是查詢一個數據庫不存在的數據,我們並沒有將null存入緩存,導致所有請求都會去查詢數據庫,一般網站攻擊會采用的一種方式擊垮對方的數據庫,導致服務癱瘓
方案:即使數據庫沒有數據,我們也將數據存入Redis,設置較短的過期時間即可,二三分鍾
緩存雪崩指的是,大量的緩存同時到期失效,全部請求連接數據庫,壓垮數據庫
方案:雪崩的原因在於緩存中大量的緩存短時間內全部到期,我們只需要不讓他們同時到期即可防止雪崩的發生,比如我們可以在原有失效時間的基礎上增加一個隨機的值,比如1到5分鍾隨機,這樣每一個緩存的生命周期就會不一樣,同時到期的幾率就很難發生
緩存擊穿指的是,單個緩存在高並發請求下(熱點數據),突然緩存到期失效,全部請求壓垮數據庫,
解決方案:當前場景下,我們只有加鎖,只讓一個線程去連接數據庫查詢數據,其余線程全部處於等待狀態,連接數據庫的線程查詢到數據庫后,並將結果緩存到Reids之中,其余等待的線程全部去Redis中獲取數據,不讓其全部連接數據庫
我們都知道SpringBoot的Bean都是單列的,所以這里我們使用這一特點進行上鎖
@Service public class CourseServiceImpl implements CourseService { @Autowired private CourseMapper courseMapper; //springboot自動配置的,直接注入到類中即可使用 @Autowired private RedisTemplate<Object, Object> redisTemplate; /**指定key的序列化方式,字符串的方式*/ private RedisSerializer keyRedisSerializer = new StringRedisSerializer(); /**指定value的序列化方式,字符串的方式*/ private RedisSerializer valueRedisSerializer = new Jackson2JsonRedisSerializer<Course>(Course.class); /** * 查詢所有課程信息,帶有緩存 * * @return */ public List<Course> getAllCourse() { //KEY 是按照字符串方式序列化,可讀性較好 redisTemplate.setKeySerializer(keyRedisSerializer); //獲取緩存數據 List<Course> courseList = (List<Course>)redisTemplate.opsForValue().get("allCourse"); //如果緩存數據為空,此時並發請求 if (null == courseList) { //1000個請求, 999個等,1個進入 synchronized (this) { //雙重就在這兒雙重,再次去緩存中取數據,999個等待的線程都會在這里拿到數據 courseList = (List<course>)redisTemplate.opsForValue().get("allCourse"); //只有第獲得鎖的請求才會走下面查詢數據庫的操作 if (null == courseList) { courseList = coursetMapper.getAllCourse(); redisTemplate.opsForValue().set("allCourse", courseList); System.out.println("請求的數據庫后將數據存入緩存"); //如果查到數據后放入緩存后直接return,這樣999個線程獲得鎖就會從緩存中得到數據 return courseList; } else { System.out.println("請求的緩存"); } } } else { System.out.println("請求的緩存"); } return courseList; } }
在單體應用下,上面的代碼已經可以防止緩存擊穿問題,但是如果我們的服務是分布式的,這樣就會有多個Spring容器,就會有多個Bean實例,明顯上述的本地鎖就會造成一個服務鎖自己的請求,顯得不合理
原理:這里我們使用redis作為分布式鎖的中間件,就利用setnx的不可重復性即可實現,設想,加入我的課程服務一共安排了10台機器,為了避免緩存擊穿,我們需要這10台機器上的線程使用同一把鎖,我們就使用Redis的setnx即可實現,setnx里面的數據時不可重復的,一個線程setnx進去后,其他線程就stnx不進去相同的值了,我們以誰setnx返回的布爾值為確定誰獲得鎖,
可能面對的問題:
死鎖問題:鎖是搶到了,如果在執行業務代碼中拋出異常或者在釋放鎖之前網絡等問題,造成沒有刪掉我們設置的鎖值,導致鎖一直未被釋放,形成死鎖,還有就是我們在獲取鎖、已經設置鎖的過期時間應該是原子性,不然獲取到鎖,再去設置過期時間期間,發生問題,那也會造成死鎖問題,這個redis的api已經解決了
解決方案:搶占鎖時設置自動過期時間,原子操作
Boolean lock = redisTemplaet.opsForValue().setIfAbsent("lock","hello",300,TimeUnit.SECONDS);
服務超時問題:加入我們的程序代碼比較耗時,耗時已經超過我們上鎖的時間閥值,導致鎖自動釋放,更多請求獲取到鎖,然后耗時業務執行完畢,刪除我們約定好的鎖,相當於釋放當前所有鎖,此時鎖已經失效,無法鎖住
解決方案:之前我們都setIfAbsent("lock","hello",300,TimeUnit.SECONDS);lock的value固定為hello,是有問題的,就會造成上面那種情況,此時我們不用固定值,采用UUID生成隨機值,在釋放鎖時,先查詢當前lock的值,如果為當前我們預設的UUID,則刪除,如果不是,說明鎖已經過期,自動釋放,解決方案如下所示
第一個就是加長鎖的生命周期,起碼得保證,生命時長超過業務耗時時長吧,但分布式中,服務調用之間耗時的不穩定性,導致我們這個時長的設置也不穩定,太長了吧,影響吞吐,太斷了又自動釋放,所以這不是一個好解決方案
第二個方案就是鎖的自動續命策略,下面我們再詳細說明,主要是Redisson的看門狗策略
刪鎖時機問題:和我們設置值且原子操作值的有效時間一個道理,當我們准備業務執行完畢准備刪除鎖的時候,先獲取lock的值UUID,對比和我們的預先設置的UUID是否相同,如果相同再刪除,如果再比較的過程中,該lock生命到期,自動刪除,我們刪除的lock其實是別人的UUID,這就要求,我們獲取lock的值判斷和刪除lock的操作是原子性的
解決方案:Lua腳本解鎖
Redis 使用單個 Lua 解釋器去運行所有腳本,並且, Redis 也保證腳本會以原子性(atomic)的方式執行: 當某個腳本正在運行的時候,不會有其他腳本或 Redis 命令被執行。
下面這個語句我做一個解釋吧,
execute函數接受三個參數,一個lua腳本,一個刪除key集合,一個刪除key對應value的值
Lua腳本的唯一實現類DefaultRedisScript,泛型指定為刪除成功或者失敗返回的類型
腳本字符串,執行腳本之后的返回數據類型,刪除的話,成功是1,失敗為0
這個得根據命令執行的返回值確定,Integer會包類型轉換異常,切換時Long正常
調用execute函數時,會將key:lock,寫入到腳本中KEYS[1],會將value:uuid,寫入到腳本ARGV[1]
//官方給的lua腳本
String script = "if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end" //刪除鎖 Long lock = redisTemplate.execute(new DefaultRedisScript<Long>(script,Long.class),Araays.asList("lock"),uuid);
SpringBoot集成更多詳細信息: Go Redisson
<!--以后使用redisson作為所有分布式鎖,分布式對象等功能的框架--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.3.0</version> </dependency>
/** * @Description redisson的程序化配置 * @Author Ninja * @Date 2020/7/25 **/ @Configuration public class RedissonConfig { @Bean(destroyMethod="shutdown") public RedissonClient redisson(){ Config config = new Config(); // config.useClusterServers().addNodeAddress("192.168.29.130:6379","192.168.29.140:6379") //當前為單機模式 config.useSingleServer().setAddress("redis://192.168.29.130:6379"); config.useSingleServer().setPassword("123456"); return Redisson.create(config); } }
@Autowired private RedissonClient redissonClient;
基於Redis的Redisson分布式可重入鎖RLoc,Java對象實現了java.util.concurrent.locks.Lock
接口
RLock lock = redisson.getLock("my_lock"); // 上鎖,阻塞式等待 lock.lock();
上述代碼中,我們上鎖,並沒有指定鎖的失效時間,默認是30秒,Redisson內部提供了一個監控鎖的看門狗,當我們的業務耗時較長超過鎖的生命周期時,看門狗會我們的鎖續命,這一點可以在redis的客戶端中刷新看生命周期觀察得到,所以這就不會出現:因為業務耗時過長,導致鎖的自動釋放帶來的問題,而且就算我們不主動解鎖,Redisson也會在我們線程執行之后30S內自動釋放該鎖,這里就不會出現死鎖問題
如果負責儲存這個分布式鎖的Redisson節點宕機以后,而且這個鎖正好處於鎖住的狀態時,這個鎖會出現鎖死的狀態。為了避免這種情況的發生,Redisson內部提供了一個監控鎖的看門狗,它的作用是在Redisson實例被關閉前,不斷的延長鎖的有效期。默認情況下,看門狗的檢查鎖的超時時間是30秒鍾,也可以通過修改Config.lockWatchdogTimeout來另行指定。
另外Redisson還通過加鎖的方法提供了leaseTime
的參數來指定加鎖的時間。超過這個時間后鎖便自動解開了。
// 加鎖以后10秒鍾自動解鎖 // 無需調用unlock方法手動解鎖 lock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { //解鎖 lock.unlock(); } }
Redisson同時還為分布式鎖提供了異步執行的相關方法:
RLock lock = redisson.getLock("anyLock"); lock.lockAsync(); lock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
RLock對象完全符合Java的Lock規范。也就是說只有擁有鎖的進程才能解鎖,其他進程解鎖則會拋出IllegalMonitorStateException
錯誤。但是如果遇到需要其他進程也能解鎖的情況,請使用分布式信號量Semaphore
對象.
分布式可重入讀寫鎖允許同時有多個讀鎖和一個寫鎖處於加鎖狀態。
@GetMapping("/read") @ResponseBody public String redValue(){ String uuid = null; RReadWriteLock readLock = redissonClient.getReadWriteLock("rw-loc"); RLock lock = readLock.readLock(); lock.lock(); try { log.info("讀鎖加鎖成功"); uuid = (String) redisTemplate.opsForValue().get("writeValue"); Thread.sleep(30000); } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); log.info("讀鎖釋放"); } return uuid; } @GetMapping("/write") @ResponseBody public String writeValue(){ RReadWriteLock writeLock = redissonClient.getReadWriteLock("rw-loc"); String uuid = null; RLock lock = writeLock.writeLock(); lock.lock(); try { log.info("寫鎖加鎖成功"); uuid = UUID.randomUUID().toString(); redisTemplate.opsForValue().set("writeValue",uuid); Thread.sleep(30000); } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); log.info("寫鎖釋放"); } return uuid; }
可以發現
寫鎖是一個排他鎖(互斥鎖),二讀鎖是一個共享鎖
只要有寫鎖的存在,就會發生互斥
寫 + 寫 : 阻塞方式
讀 + 寫 :寫阻塞,等待讀釋放鎖
寫 + 讀 :讀阻塞,等待寫鎖釋放
讀 + 讀 :相當於無鎖,不是排他鎖,全部上鎖成功
就比如小學關校門吧,加入有五個班,當五個班都走了之后,才能關校門,下面我們用代碼來掩飾一下
@GetMapping("/closedoor") @ResponseBody public String closeDoor() throws InterruptedException { RCountDownLatch door = redissonClient.getCountDownLatch("door"); door.trySetCount(5);//代表有五個班級,我必須收到五個班級的回應 log.info("放學鈴聲響起,等待五個班級學生離開校園"); door.await(); log.info("確認五個班級已經全部走完"); return "開始關閉校門,學生禁止出入"; } //模擬學生回家,id為班級號 @GetMapping("/gohome/{id}") @ResponseBody public String goHome(@PathVariable Integer id){ RCountDownLatch door = redissonClient.getCountDownLatch("door"); door.countDown(); //回應一下 log.info(id + "班的學生已經回家了"); return id + "班的學生已經回家了"; }
可以通過Redis的客戶端觀察到Reids內部有一個可以為door,有一個值Value被我們初始化為5,沒當調用door.countDown();函數的時候,那個Value值就會減1操作,直到變為0時,閉鎖釋放,執行
我們用代碼來模擬一個場景,一群乞丐等着小姑娘給他們做饅頭吃,做一個就可以吃一個,沒有做出來,就只有吃泥巴
@GetMapping("/build") @ResponseBody public String build(){ RSemaphore pack = redissonClient.getSemaphore("bread"); pack.release(); //做一個饅頭,只做一個 return "OK"; } @GetMapping("/eat") @ResponseBody public String eat() throws InterruptedException { RSemaphore pack = redissonClient.getSemaphore("bread"); //獲取一個饅頭,沒有饅頭一直等着餓死算球 //pack.acquire(); //嘗試獲取饅頭,不會堵塞,獲取不到饅頭就去挖泥巴吃 boolean flag = pack.tryAcquire(); return flag ? "饅頭好香!!" : "還是繼續吃泥巴吧!!"; }
我們可以先調用build接口,多做幾個饅頭,就會發現Redis中有個key為bread,值為我們訪問接口的次數,也就是有幾個饅頭備貨,然后調用下面的eat接口,可以發現redis中bread的值會遞減,當撿到為0的時候,根據我們的api,要么處於堵塞,等待饅頭做出來,要么獲得布爾值,直接響應false
可以訪問官網
先說兩種方案,然后我們再總結一下大致上的解決方案
數據更新,寫數據庫,寫緩存,也就是直接吧緩存讀出來更新后再保存回去
兩個請求模擬場景
A請求寫入數據庫,然后准備寫入緩存的時候發生了卡頓
B請求寫入數據庫,然后寫入緩存,此時卡頓的A請求繼續完成寫入緩存的操作,此時緩存中的數據為A請求帶來的數據,產生了臟數據
臟數據的話,只是暫時的臟數據問題,在緩存過期或者數據穩定的時候會自動解決掉這個問題,會得到正確的數據
我們數據更新,寫入數據庫,然后把緩存刪掉即可,下次查詢更新緩存
三個線程模擬場景
A線程,寫入數據庫,刪除緩存完成
B線程,寫入數據庫發生卡頓,
C線程,讀取緩存,發現緩存沒有,因為A線程寫入后刪掉了,然后C線程去連接數據庫查詢數據
如果運行比較順利,他會讀取A線程的老數據,並將其寫入到緩存中
如果運行卡頓一點,此時B線程寫入數據庫成功,並將最新數據加載到緩存,但是C線程會將讀取的老數據對B線程的最新緩存產生覆蓋問題
無論是雙寫模式,還是失效模式,緩存數據都會有不一致問題,
我們放入緩存的數據本來就應該對實時性的要求不能太嚴格,對實時性要求超高的,我們建議還是直接查詢數據庫,對於實時性要求不是很高的,我們應該設置過期時間,以確保隔斷時間能拿到最新的數據
通過加鎖的方式保證並發讀寫,寫寫的時候按照阻塞的方式排隊進行寫,讀取就無所謂啦,所以應該考慮考慮適當的運用上面我說過的分布式讀寫鎖來控制並發讀寫帶來的數據不一致問題,當然如果你的數據長期處於頻繁寫的狀態,我們建議還是不要加緩存,或者緩存的生命周期短一點,可能隨時都不是最新的數據,哈哈哈
相關依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency>
配置文件,其他配置,根據項目而定
spring
啟動類開啟注解功能,就可以使用注解完成緩存的操作了
//每一個需要緩存的數據,我們都來指定他存放的緩存區 //緩存的分區可以根據業務來划分,不是Redis的key哦 //默認key是自動生成的 //默認生命周期為-1,表示永遠不過期 //存儲的value值采用的是jdk的序列化機制,將序列化后的數據存入到redis //而且當我們訪問這個接口的時候,會優先訪問緩存。如果緩存中有數據,直接就返回了,而不是執行函數 @Cacheable("course") @GetMapping("/create") @ResponseBody public String cacheCreate() { System.out.println("模擬查詢數據庫"); return "Hello World"; }
該注解可以自動將函數的返回值,緩存起來,如果緩存中有值,是不會走方法的,直接返回
//自定義key key = "" :該函數的名字 // 自定義生命周期 在配置文件中配置,並將其寫入配置類中 // 自定義序列化為json,也是通過配置類 //如果想自定義key,可以為: "'keyName'",默認是表達式取值,直接"keyName"不行 //解決緩存擊穿問題,在Cacheable注解中,配置sync = true,獲取時加鎖,但只是本地鎖,不是分布式鎖 @Cacheable(value = "course" , key = "#root.method.name") @GetMapping("/custom") @ResponseBody public UserTest customKeyName() { System.out.println("模擬查詢數據庫"); UserTest user = new UserTest("張三", 23); return user; } 配置類如下所示 /** * @Description * @Author Ninja * @Date 2020/7/25 **/ @Configuration @EnableCaching //屬性讀入一個配置類,下面可以使用,讀取的spring cache下面的屬性 //我們只配置了生命周期,通過Redis客戶端發現已經生效了,更多配置,直接在配置文件中配置即可 @EnableConfigurationProperties(CacheProperties.class) public class MyCacheConfig { //配置redis @Bean RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) { RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())); config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer())); CacheProperties.Redis redisProperties = cacheProperties.getRedis(); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { config = config.prefixKeysWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; } }
該注解是失效模式,即當刪除數據時,會根據分區和key,將緩存一起干掉,詳細看下方代碼
//自定義key key = "" :該函數的名字 // 自定義生命周期 在配置文件中配置,並將其寫入配置類中 // 自定義序列化為json,也是通過配置類 //如果想自定義key,可以為: "'keyName'",默認是表達式取值,直接"keyName"不行 @Cacheable(value = "course" , key = "'userTest1'") @GetMapping("/custom1") @ResponseBody public UserTest userTest1() { System.out.println("模擬查詢數據庫"); UserTest user = new UserTest(1,"userTest1", 1); return user; } @Cacheable(value = "course" , key = "'userTest2'") @GetMapping("/custom2") @ResponseBody public UserTest userTest2() { System.out.println("模擬查詢數據庫"); UserTest user = new UserTest(2,"userTest2", 2); return user; } //CacheEvict 失效模式,修改時直接刪除緩存 //注意分區和key是用來定位緩存中的數據的,一定要匹配好 //@CacheEvict有個屬性 allentrise = false,該為true,表示刪除該分區下所有緩存 @CacheEvict(value = "course" , key = "'userTest1'") @PutMapping("/update") @ResponseBody public String updateUser1() { System.out.println("執行了刪除程序"); return "Ok"; } //加入一個操作會級聯刪除多個緩存 : @Caching注解 @Caching( evict = { @CacheEvict(value = "course" , key = "'userTest1'"), @CacheEvict(value = "course" , key = "'userTest2'") } ) @PutMapping("/updateAny") @ResponseBody public String updateAny() { System.out.println("模擬刪除 : userTest1"); System.out.println("模擬刪除 : userTest2"); return "Ok"; }
//CachePut 雙寫模式,當修改某個數據后,如果有返回(最新數據),將返回存入緩存 @CachePut(value = "course" , key = "'userTest2'") @PutMapping("/update2") @ResponseBody public UserTest updateUser2() { UserTest user = new UserTest(22,"userTest22", 22); return user; }
想要get到它的不足,首先就要看他是否解決我們面對的問題,我們的問題大部分區分為兩個問題,一個是讀面對的問題:緩存穿透、緩存擊穿、緩存雪崩,另一個就是寫的問題,面對的問題就是緩存數據和真實數據的不一致性,主要方案為讀寫鎖使用、Canal中間件實現數據庫一致性(有點像logstash)、如果讀多寫也多,那就只有直接拜訪數據庫了
-
緩存穿透:查詢一個並不存在的數據,緩存失效,壓垮數據庫
cache-null-values: true #是否儲存null值,開啟后解決緩存穿透問題
-
緩存擊穿:某個key失效時,高並發查詢請求,壓垮數據庫
加鎖?之前我們說到單體應用中的雙重檢測鎖可以預防單體應用的緩存擊穿問題,而分布式的緩存擊穿問題,我們則要運用上面講到過的分布式鎖,可是在Spring Cache中,當我們的緩存為Redis時,RedisCache底層默認是沒有任何加鎖操作的,通過我們的配置在Cacheable注解中,sync = true,獲取時加鎖,沒有其余任何的鎖操作,而這個鎖操作也是本地鎖,並不是分布式鎖,原理就是獲取時獲取加上一把本地鎖,一個線程去獲取數據,獲取后將數據緩存起來,其他堵塞線程去訪問緩存,相當於就是在本地配置了一個雙重檢測鎖
-
緩存雪崩:大量key失效,高並發壓垮數據庫
錯開緩存時間,只要保證不在短期集體失效即可
或者設置永不失效,但要做好數據一致性防護措施
常規數據:(讀多寫少,即時性、一致性要求不是很高的數據,Spring Cache完全可以勝任)
針對一些即時性和一致性要求高的,
-
要么配合緩存生命周期配置,
-
要么使用類似於logstash的中間件Canal
-
要么直接查詢數據庫