redis專題之緩存存在的幾大問題(穿透、擊穿、雪崩)


在使用redis做緩存中間件時會存在以下幾個大問題:

  1. 緩存粒度問題:未合適的控制好數據緩存的粒度(全部數據和部分數據的粒度);
  2. 緩存擊穿問題:數據剛好失效或者所需的數據根本不存在緩存中,此時來了大量的並發請求;
  3. 緩存穿透問題:數據庫和緩存中都沒有這條數據;
  4. 緩存雪崩問題:由於前兩個問題及時未處理導致請求全部被直接打到數據庫中,導致機器宕機;

1.緩存粒度控制

通俗來講,緩存粒度問題就是我們在使用緩存時,是將所有數據緩存還是緩存部分數據?

 

緩存粒度問題是一個容易被忽視的問題,如果使用不當,可能會造成很多無用空間的浪費,可能會造成網絡帶寬的浪費,可能會造成代碼通用性較差等情況,必須學會綜合數據通用性、空間占用比、代碼維護性 三點評估取舍因素權衡使用。

2.緩存穿透問題

 緩存穿透是指查詢一個一定不存在的數據,由於緩存不命中,並且出於容錯考慮, 如果從存儲層查不到數據則不寫入緩存,這將導致這個不存在的數據每次請求都要到存儲層去查詢,失去了緩存的意義。

可能造成原因:

  • 業務代碼自身問題
  • 惡意攻擊。爬蟲等等

危害

對底層數據源壓力過大,有些底層數據源不具備高並發性。  例如mysql一般來說單台能夠扛1000-QPS就已經很不錯了

解決方案

1)緩存空對象

public class NullValueResultDO implements Serializable{
     private static final long serialVersionUID = -6550539547145486005L;
}
 
public class UserManager {
     UserDAO userDAO;
     LocalCache localCache;
 
     public UserDO getUser(String userNick) {
          Object object = localCache.get(userNick);
          if(object != null) {
               if(object instanceof NullValueResultDO) {
                    return null;
               }
               return (UserDO)object;
          } else {
               User user = userDAO.getUser(userNick);
               if(user != null) {
                    localCache.put(userNick,user);
               } else {
                    localCache.put(userNick, new NullValueResultDO());
               }
               return user;
          }
     }          
}

2)布隆過濾器

(1)Google布隆過濾器的缺點

基於JVM內存的一種布隆過濾器

重啟即失效

本地內存無法用在分布式場景

不支持大數據量存儲

(2)Redis布隆過濾器

可擴展性Bloom過濾器:一旦Bloom過濾器達到容量,就會在其上創建一個新的過濾器

不存在重啟即失效或者定時任務維護的成本:基於Google實現的布隆過濾器需要啟動之后初始化布隆過濾器

缺點:需要網絡IO,性能比Google布隆過濾器低

3.緩存擊穿.熱點key重建緩存問題

緩存擊穿是指緩存中沒有但數據庫中有的數據(一般是緩存時間到期),這時由於並發用戶特別多,同時讀緩存沒讀到數據,又同時去數據庫去取數據,引起數據庫壓力瞬間增大,造成過大壓力

我們知道,使用緩存,如果獲取不到,才會去數據庫里獲取。但是如果是熱點 key,訪問量非常的大,數據庫在重建緩存的時候,會出現很多線程同時重建的情況。因為高並發導致的大量熱點的 key 在重建還沒完成的時候,不斷被重建緩存的過程,由於大量線程都去做重建緩存工作,導致服務器拖慢的情況。

解決方案

1.互斥鎖

第一次獲取緩存的時候,加一個鎖,然后查詢數據庫,接着是重建緩存。這個時候,另外一個請求又過來獲取緩存,發現有個鎖,這個時候就去等待,之后都是一次等待的過程,直到重建完成以后,鎖解除后再次獲取緩存命中。

public String getKey(String key){
    String value = redis.get(key);
    if(value == null){
        String mutexKey = "mutex:key:"+key; //設置互斥鎖的key
        if(redis.set(mutexKey,"1","ex 180","nx")){ //給這個key上一把鎖,ex表示只有一個線程能執行,過期時間為180秒
          value = db.get(key);
          redis.set(key,value);
          redis.delete(mutexKety);
  }else{
        // 其他的線程休息100毫秒后重試
        Thread.sleep(100);
        getKey(key);
  }
 }
 return value;
}

互斥鎖的優點是思路非常簡單,具有一致性,但是互斥鎖也有一定的問題,就是大量線程在等待的問題。存在死鎖的可能性。

2.使用setnx

setnx key value 只有在 key 不存在時設置 key 的值。

因為可能存在獲取到鎖的業務突然掛掉了,那么nx鎖永遠無法釋放,那么很容易造成死鎖,
故需要設置一個過期時間,時間到了,就會自動釋放鎖;而且nx和ex不能分開寫,保證原子性。

@Test
    public void testRedis(){
        Jedis jedis=new Jedis("111.111.111.94",6379);
        jedis.auth("123456");
        SetParams setParams=new SetParams();
        setParams.ex(6);  //setex     設置值的同時設置過期時間
        setParams.nx();  //
        String s = UUID.randomUUID().toString();
        String lock = jedis.set("lock", s,setParams);
//        Long setnx = jedis.setnx("lock", "value2");
//        if(setnx==1){
//            jedis.expire("lock",10);
//        }

        System.out.println(lock);
    }

3.分布式鎖redisson

redisson分布式鎖

4.自己實現普通的分布式鎖

@Component
public class RedisLock implements Lock {


    @Autowired
    private JedisPool jedisPool;

    private static final String key="lock";

    private ThreadLocal<String> threadLocal=new ThreadLocal<>();

    private static AtomicBoolean isHappened = new AtomicBoolean(true);

    //加鎖
    @Override
    public void lock() {
        boolean b = tryLock();  //嘗試加鎖
        if(b){
            //拿到了鎖
            return;
        }
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        lock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    //嘗試加鎖
    @Override
    public boolean tryLock() {
        SetParams setParams=new SetParams();
        setParams.ex(2);  //2s
        setParams.nx();
        String s = UUID.randomUUID().toString();
        Jedis resource = jedisPool.getResource();
        String lock = resource.set(key, s,setParams);
//        String lock = resource.set(key,s,"NX","PX",5000);
        resource.close();
        if("OK".equals(lock)){
            //拿到了鎖
            threadLocal.set(s);
            if(isHappened.get()){
                ThreadUtil.newThread(new MyRUnble(jedisPool)).start();
                isHappened.set(false);
            }
            return true;
        }
        return false;
    }




    static class MyRUnble implements Runnable{

        private JedisPool jedisPool;
        public MyRUnble(JedisPool jedisPool){
            this.jedisPool=jedisPool;
        }
        @Override
        public void run() {
            Jedis jedis = jedisPool.getResource();
            while (true){
                Long ttl = jedis.ttl(key);
                if(ttl!=null && ttl>0){
                    jedis.expire(key, (int) (ttl+1));
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }


    //第一步判斷設置時候的value 和 此時redis的value是否相同
    //解鎖
    @Override
    public void unlock() throws Exception{
        String script="if redis.call(\"get\",KEYS[1])==ARGV[1] then\n" +
                "    return redis.call(\"del\",KEYS[1])\n" +
                "else\n" +
                "    return 0\n" +
                "end";

        Jedis resource = jedisPool.getResource();
        resource.del(key);
//        Object eval = resource.eval(script, Arrays.asList(key), Arrays.asList(threadLocal.get()));
//        if(Integer.valueOf(eval.toString())==0){
//            resource.close();
//            throw new Exception("解鎖失敗");
//        }
        resource.close();
    }

    @Override
    public Condition newCondition() {
        return null;
    }

}

4.緩存雪崩問題

緩存雪崩是指機器宕機或在我們設置緩存時采用了相同的過期時間,導致緩存在某一時刻同時失效,請求全部轉發到DB,DB瞬時壓力過重雪崩。

解決辦法  

1:在緩存失效后,通過加鎖或者隊列來控制讀數據庫寫緩存的線程數量。比如對某個key只允許一個線程查詢數據和寫緩存,其他線程等待。

2:做二級緩存,A1為原始緩存,A2為拷貝緩存,A1失效時,可以訪問A2,A1緩存失效時間設置為短期,A2設置為長期

3:不同的key,設置不同的過期時間,讓緩存失效的時間點盡量均勻。

4:如果緩存數據庫是分布式部署,將熱點數據均勻分布在不同搞得緩存數據庫中。

5:搭建高可用集群


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM