在使用redis做緩存中間件時會存在以下幾個大問題:
- 緩存粒度問題:未合適的控制好數據緩存的粒度(全部數據和部分數據的粒度);
- 緩存擊穿問題:數據剛好失效或者所需的數據根本不存在緩存中,此時來了大量的並發請求;
- 緩存穿透問題:數據庫和緩存中都沒有這條數據;
- 緩存雪崩問題:由於前兩個問題及時未處理導致請求全部被直接打到數據庫中,導致機器宕機;
1.緩存粒度控制
通俗來講,緩存粒度問題就是我們在使用緩存時,是將所有數據緩存還是緩存部分數據?

緩存粒度問題是一個容易被忽視的問題,如果使用不當,可能會造成很多無用空間的浪費,可能會造成網絡帶寬的浪費,可能會造成代碼通用性較差等情況,必須學會綜合數據通用性、空間占用比、代碼維護性 三點評估取舍因素權衡使用。
2.緩存穿透問題
緩存穿透是指查詢一個一定不存在的數據,由於緩存不命中,並且出於容錯考慮, 如果從存儲層查不到數據則不寫入緩存,這將導致這個不存在的數據每次請求都要到存儲層去查詢,失去了緩存的意義。
可能造成原因:
- 業務代碼自身問題
- 惡意攻擊。爬蟲等等
危害
對底層數據源壓力過大,有些底層數據源不具備高並發性。 例如mysql一般來說單台能夠扛1000-QPS就已經很不錯了
解決方案
1)緩存空對象
public class NullValueResultDO implements Serializable{
private static final long serialVersionUID = -6550539547145486005L;
}
public class UserManager {
UserDAO userDAO;
LocalCache localCache;
public UserDO getUser(String userNick) {
Object object = localCache.get(userNick);
if(object != null) {
if(object instanceof NullValueResultDO) {
return null;
}
return (UserDO)object;
} else {
User user = userDAO.getUser(userNick);
if(user != null) {
localCache.put(userNick,user);
} else {
localCache.put(userNick, new NullValueResultDO());
}
return user;
}
}
}
2)布隆過濾器
(1)Google布隆過濾器的缺點
基於JVM內存的一種布隆過濾器
重啟即失效
本地內存無法用在分布式場景
不支持大數據量存儲
(2)Redis布隆過濾器
可擴展性Bloom過濾器:一旦Bloom過濾器達到容量,就會在其上創建一個新的過濾器
不存在重啟即失效或者定時任務維護的成本:基於Google實現的布隆過濾器需要啟動之后初始化布隆過濾器
缺點:需要網絡IO,性能比Google布隆過濾器低
3.緩存擊穿.熱點key重建緩存問題
緩存擊穿是指緩存中沒有但數據庫中有的數據(一般是緩存時間到期),這時由於並發用戶特別多,同時讀緩存沒讀到數據,又同時去數據庫去取數據,引起數據庫壓力瞬間增大,造成過大壓力
我們知道,使用緩存,如果獲取不到,才會去數據庫里獲取。但是如果是熱點 key,訪問量非常的大,數據庫在重建緩存的時候,會出現很多線程同時重建的情況。因為高並發導致的大量熱點的 key 在重建還沒完成的時候,不斷被重建緩存的過程,由於大量線程都去做重建緩存工作,導致服務器拖慢的情況。
解決方案
1.互斥鎖
第一次獲取緩存的時候,加一個鎖,然后查詢數據庫,接着是重建緩存。這個時候,另外一個請求又過來獲取緩存,發現有個鎖,這個時候就去等待,之后都是一次等待的過程,直到重建完成以后,鎖解除后再次獲取緩存命中。
public String getKey(String key){
String value = redis.get(key);
if(value == null){
String mutexKey = "mutex:key:"+key; //設置互斥鎖的key
if(redis.set(mutexKey,"1","ex 180","nx")){ //給這個key上一把鎖,ex表示只有一個線程能執行,過期時間為180秒
value = db.get(key);
redis.set(key,value);
redis.delete(mutexKety);
}else{
// 其他的線程休息100毫秒后重試
Thread.sleep(100);
getKey(key);
}
}
return value;
}
互斥鎖的優點是思路非常簡單,具有一致性,但是互斥鎖也有一定的問題,就是大量線程在等待的問題。存在死鎖的可能性。
2.使用setnx
setnx key value 只有在 key 不存在時設置 key 的值。
因為可能存在獲取到鎖的業務突然掛掉了,那么nx鎖永遠無法釋放,那么很容易造成死鎖,
故需要設置一個過期時間,時間到了,就會自動釋放鎖;而且nx和ex不能分開寫,保證原子性。
@Test
public void testRedis(){
Jedis jedis=new Jedis("111.111.111.94",6379);
jedis.auth("123456");
SetParams setParams=new SetParams();
setParams.ex(6); //setex 設置值的同時設置過期時間
setParams.nx(); //
String s = UUID.randomUUID().toString();
String lock = jedis.set("lock", s,setParams);
// Long setnx = jedis.setnx("lock", "value2");
// if(setnx==1){
// jedis.expire("lock",10);
// }
System.out.println(lock);
}
3.分布式鎖redisson
4.自己實現普通的分布式鎖
@Component
public class RedisLock implements Lock {
@Autowired
private JedisPool jedisPool;
private static final String key="lock";
private ThreadLocal<String> threadLocal=new ThreadLocal<>();
private static AtomicBoolean isHappened = new AtomicBoolean(true);
//加鎖
@Override
public void lock() {
boolean b = tryLock(); //嘗試加鎖
if(b){
//拿到了鎖
return;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock();
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
//嘗試加鎖
@Override
public boolean tryLock() {
SetParams setParams=new SetParams();
setParams.ex(2); //2s
setParams.nx();
String s = UUID.randomUUID().toString();
Jedis resource = jedisPool.getResource();
String lock = resource.set(key, s,setParams);
// String lock = resource.set(key,s,"NX","PX",5000);
resource.close();
if("OK".equals(lock)){
//拿到了鎖
threadLocal.set(s);
if(isHappened.get()){
ThreadUtil.newThread(new MyRUnble(jedisPool)).start();
isHappened.set(false);
}
return true;
}
return false;
}
static class MyRUnble implements Runnable{
private JedisPool jedisPool;
public MyRUnble(JedisPool jedisPool){
this.jedisPool=jedisPool;
}
@Override
public void run() {
Jedis jedis = jedisPool.getResource();
while (true){
Long ttl = jedis.ttl(key);
if(ttl!=null && ttl>0){
jedis.expire(key, (int) (ttl+1));
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
//第一步判斷設置時候的value 和 此時redis的value是否相同
//解鎖
@Override
public void unlock() throws Exception{
String script="if redis.call(\"get\",KEYS[1])==ARGV[1] then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
Jedis resource = jedisPool.getResource();
resource.del(key);
// Object eval = resource.eval(script, Arrays.asList(key), Arrays.asList(threadLocal.get()));
// if(Integer.valueOf(eval.toString())==0){
// resource.close();
// throw new Exception("解鎖失敗");
// }
resource.close();
}
@Override
public Condition newCondition() {
return null;
}
}
4.緩存雪崩問題
緩存雪崩是指機器宕機或在我們設置緩存時采用了相同的過期時間,導致緩存在某一時刻同時失效,請求全部轉發到DB,DB瞬時壓力過重雪崩。
解決辦法
1:在緩存失效后,通過加鎖或者隊列來控制讀數據庫寫緩存的線程數量。比如對某個key只允許一個線程查詢數據和寫緩存,其他線程等待。
2:做二級緩存,A1為原始緩存,A2為拷貝緩存,A1失效時,可以訪問A2,A1緩存失效時間設置為短期,A2設置為長期
3:不同的key,設置不同的過期時間,讓緩存失效的時間點盡量均勻。
4:如果緩存數據庫是分布式部署,將熱點數據均勻分布在不同搞得緩存數據庫中。
5:搭建高可用集群
