Java面試必問之-緩存


緩存問題

緩存穿透

​ 緩存穿透是指緩存和數據庫中都沒有的數據,而用戶不斷發起請求,如發起為id為“-1”的數據或id為特別大不存在的數據。這時的用戶很可能是攻擊者,攻擊會導致數據庫壓力過大。

解決方案:

  1. 接口層增加校驗,如用戶鑒權校驗,id做基礎校驗,id<=0的直接攔截;
  2. 從緩存取不到的數據,在數據庫中也沒有取到,這時也可以將key-value對寫為key-null,緩存有效時間可以設置短點,如30秒(設置太長會導致正常情況也沒法使用)。這樣可以防止攻擊用戶反復用同一個id暴力攻擊

緩存擊穿

​ 緩存擊穿是指緩存中沒有但數據庫中有的數據(一般是緩存時間到期),這時由於並發用戶特別多,同時讀緩存沒讀到數據,又同時去數據庫去取數據,引起數據庫壓力瞬間增大,造成過大壓力

解決方案:

  1. 設置熱點數據永遠不過期。
  2. 加互斥鎖,互斥鎖參考代碼如下:

image-20200916132808397

緩存雪崩

​ 緩存雪崩是指緩存中數據大批量到過期時間,而查詢數據量巨大,引起數據庫壓力過大甚至down機。和緩存擊穿不同的是,緩存擊穿指並發查同一條數據,緩存雪崩是不同數據都過期了,很多數據都查不到從而查數據庫。

解決方案

  1. 緩存數據的過期時間設置隨機,防止同一時間大量數據過期現象發生。
  2. 如果緩存數據庫是分布式部署,將熱點數據均勻分布在不同的緩存數據庫中。
  3. 設置熱點數據永遠不過期。

緩存策略

1、存儲集合的選擇

實現本地緩存,存儲容器肯定是 key/value 形式的數據結構,在 Java 中,也就是我們常用的 Map 集合。Map 中有 HashMap、Hashtable、ConcurrentHashMap 幾種供我們選擇,如果不考慮高並發情況下數據安全問題,我們可以選擇HashMap,如果考慮高並發情況下數據安全問題,我們可以選擇 Hashtable、ConcurrentHashMap 中的一種集合,但是我們優先選擇 ConcurrentHashMap,因為 ConcurrentHashMap 的性能比 Hashtable 要好。

2、過期緩存處理

因為緩存直接存儲在內存中,如果我們不處理過期緩存,內存將被大量無效緩存占用,這不是我們想要的,所以我們需要清理這些失效的緩存。過期緩存處理可以參考 Redis 的策略來實現,Redis 采用的是定期刪除 + 懶惰淘汰策略。

定期刪除策略

定期刪除策略是每隔一段時間檢測已過期的緩存,並且降之刪除。這個策略的優點是能夠確保過期的緩存都會被刪除。同時也存在着缺點,過期的緩存不一定能夠及時的被刪除,這跟我們設置的定時頻率有關系,另一個缺點是如果緩存數據較多時,每次檢測也會給 cup 帶來不小的壓力。

懶惰淘汰策略

懶惰淘汰策略是在使用緩存時,先判斷緩存是否過期,如果過期將它刪除,並且返回空。這個策略的優點是只有在查找的時候,才判斷是否過期,對 CUP 影響較小。同時這種策略有致命的缺點,當存入了大量的緩存,這些緩存都沒有被使用並且已過期,都將成為無效緩存,這些無效的緩存將占用你大量的內存空間,最后導致服務器內存溢出。

我們簡單的了解了一下 Redis 的兩種過期緩存處理策略,每種策略都存在自己的優缺點。所以我們在使用過程中,可以將兩種策略組合起來,結合效果還是非常理想的。

3、緩存淘汰策略

緩存淘汰跟過期緩存處理要區別開來,緩存淘汰是指當我們的緩存個數達到我們指定的緩存個數時,畢竟我們的內存不是無限的。如果我們需要繼續添加緩存的話,我們就需要在現有的緩存中根據某種策略淘汰一些緩存,給新添加的緩存騰出位置,下面一起來認識幾種常用的緩存淘汰策略。

先進先出策略

最先進入緩存的數據在緩存空間不夠的情況下會被優先被清除掉,以騰出新的空間接受新的數據。該策略主要比較緩存元素的創建時間。在一些對數據實效性要求比較高的場景下,可考慮選擇該類策略,優先保障最新數據可用。

最少使用策略

無論是否過期,根據元素的被使用次數判斷,清除使用次數較少的元素釋放空間。該策略主要比較元素的hitCount(命中次數),在保證高頻數據有效性場景下,可選擇這類策略。

最近最少使用策略

無論是否過期,根據元素最后一次被使用的時間戳,清除最遠使用時間戳的元素釋放空間。該策略主要比較緩存最近一次被get使用時間。在熱點數據場景下較適用,優先保證熱點數據的有效性。

隨機淘汰策略

無論是否過期,隨機淘汰某個緩存,如果對緩存數據沒有任何要求,可以考慮使用該策略。

不淘汰策略

當緩存達到指定值之后,不淘汰任何緩存,而是不能新增緩存,直到有緩存淘汰時,才能繼續添加緩存。

本地緩存

ConcurrentHashMap

可以采用 ConcurrentHashMap 作為存儲集合,這樣即使在高並發的情況下,我們也能夠保證緩存的安全。過期緩存處理在這里我只使用了定時刪除策略,並沒有使用定時刪除 + 懶惰淘汰策略,你可以自己動手嘗試一下使用這兩種策略進行過期緩存處理。在緩存淘汰方面,我在這里采用了最少使用策略。好了,技術選型都知道了,我們一起來看看代碼實現。

緩存對象類

public class Cache implements Comparable<Cache>{
    // 鍵
    private Object key;
    // 緩存值
    private Object value;
    // 最后一次訪問時間
    private long accessTime;
    // 創建時間
    private long writeTime;
    // 存活時間
    private long expireTime;
    // 命中次數
    private Integer hitCount;

添加緩存

/**
 * 添加緩存
 *
 * @param key
 * @param value
 */
public void put(K key, V value,long expire) {
    checkNotNull(key);
    checkNotNull(value);
    // 當緩存存在時,更新緩存
    if (concurrentHashMap.containsKey(key)){
        Cache cache = concurrentHashMap.get(key);
        cache.setHitCount(cache.getHitCount()+1);
        cache.setWriteTime(System.currentTimeMillis());
        cache.setAccessTime(System.currentTimeMillis());
        cache.setExpireTime(expire);
        cache.setValue(value);
        return;
    }
    // 已經達到最大緩存
    if (isFull()) {
        Object kickedKey = getKickedKey();
        if (kickedKey !=null){
            // 移除最少使用的緩存
            concurrentHashMap.remove(kickedKey);
        }else {
            return;
        }
    }
    Cache cache = new Cache();
    cache.setKey(key);
    cache.setValue(value);
    cache.setWriteTime(System.currentTimeMillis());
    cache.setAccessTime(System.currentTimeMillis());
    cache.setHitCount(1);
    cache.setExpireTime(expire);
    concurrentHashMap.put(key, cache);
}

獲取緩存

/**
 * 獲取緩存
 * @param key
 * @return
 */
public Object get(K key) {
    checkNotNull(key);
    if (concurrentHashMap.isEmpty()) return null;
    if (!concurrentHashMap.containsKey(key)) return null;
    Cache cache = concurrentHashMap.get(key);
    if (cache == null) return null;
    cache.setHitCount(cache.getHitCount()+1);
    cache.setAccessTime(System.currentTimeMillis());
    return cache.getValue();
}

獲取最少使用的緩存

    /**
     * 獲取最少使用的緩存
     * @return
     */
    private Object getKickedKey() {
        Cache min = Collections.min(concurrentHashMap.values());
        return min.getKey();
    }

過期緩存檢測方法

/**
 * 處理過期緩存
 */
class TimeoutTimerThread implements Runnable {
    public void run() {
        while (true) {
            try {
                TimeUnit.SECONDS.sleep(60);
                expireCache();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 創建多久后,緩存失效
     *
     * @throws Exception
     */
    private void expireCache() throws Exception {
        System.out.println("檢測緩存是否過期緩存");
        for (Object key : concurrentHashMap.keySet()) {
            Cache cache = concurrentHashMap.get(key);
            long timoutTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()
                    - cache.getWriteTime());
            if (cache.getExpireTime() > timoutTime) {
                continue;
            }
            System.out.println(" 清除過期緩存 : " + key);
            //清除過期緩存
            concurrentHashMap.remove(key);
        }
    }
}

guava cache組件

某些熱點數據在短時間內可能會被成千上萬次訪問,所以除了放在redis之外,還可以放在本地內存,也就是JVM的內存中。

我們可以使用google的guava cache組件實現本地緩存,之所以選擇guava是因為它可以控制key和value的大小和超時時間,可以配置LRU策略且guava是線程安全的。

首先引入guava cache

<dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>18.0</version>
</dependency>

編寫CacheService接口

public interface CacheService {
    //存方法
    void setCommonCache(String key,Object value);
    //取方法
    Object getFromCommonCache(String key);
}

實現CacheService(@PostConstruct注解的方法將會在依賴注入完成后被自動調用。)

@Service
public class CacheServiceImpl implements CacheService {

    private Cache<String,Object> commonCache=null;


    @PostConstruct
    public void init(){
        commonCache= CacheBuilder.newBuilder()
                //緩存初始容量10
                .initialCapacity(10)
                //最多100個key,超過按LRU策略移除
                .maximumSize(100)
                //寫入后多少秒過期
                .expireAfterWrite(60, TimeUnit.SECONDS).build();
    }
    @Override
    public void setCommonCache(String key, Object value) {
        commonCache.put(key,value);
    }

    @Override
    public Object getFromCommonCache(String key) {
        return commonCache.getIfPresent(key);
    }
}

使用CacheService

@RequestMapping(value = "/get",method = {RequestMethod.GET})
    @ResponseBody
    public CommonReturnType getItem(@RequestParam(name = "id")Integer id){
        //在本地緩存中查找
        ItemModel itemModel= (ItemModel) cacheService.getFromCommonCache("item_"+id);

        if(itemModel==null){
            //本地緩存沒有則到redis緩存中查找
            itemModel= (ItemModel) redisTemplate.opsForValue().get("item_"+id);
            if(itemModel==null){
                //都沒有則到數據庫查找,找到后放入redis中
                itemModel = itemService.getItemById(id);
                redisTemplate.opsForValue().set("item_"+id,itemModel);
                redisTemplate.expire("item_"+id,10, TimeUnit.MINUTES);
            }
            //本地緩存沒有時,在redis或數據庫找到后再放入本地緩存
            cacheService.setCommonCache("item_"+id,itemModel);
        }


        ItemVO itemVO = convertVOFromModel(itemModel);

        return CommonReturnType.create(itemVO);

    }

面試

緩存雪崩、緩存穿透、緩存預熱、緩存更新、緩存降級等問題

一、緩存雪崩

我們可以簡單的理解為:由於原有緩存失效,新緩存未到期間 (例如:我們設置緩存時采用了相同的過期時間,在同一時刻出現大面積的緩存過期),所有原本應該訪問緩存的請求都去查詢數據庫了,而對數據庫CPU和內存造成巨大壓力,嚴重的會造成數據庫宕機。從而形成一系列連鎖反應,造成整個系統崩潰。
解決辦法
大多數系統設計者考慮用加鎖( 最多的解決方案)或者隊列的方式保證來保證不會有大量的線程對數據庫一次性進行讀寫,從而避免失效時大量的並發請求落到底層存儲系統上。還有一個簡單方案就是將緩存失效時間分散開,例如設置隨機緩存時間。

二、緩存穿透

緩存穿透是指用戶查詢數據,在數據庫沒有,自然在緩存中也不會有。這樣就導致用戶查詢的時候,在緩存中找不到,每次都要去數據庫再查詢一遍,然后返回空(相當於進行了兩次無用的查詢)。這樣請求就繞過緩存直接查數據庫,這也是經常提的緩存命中率問題。
解決辦法;
最常見的則是采用布隆過濾器,將所有可能存在的數據哈希到一個足夠大的bitmap中,一個一定不存在的數據會被這個bitmap攔截掉,從而避免了對底層存儲系統的查詢壓力。
另外也有一個更為簡單粗暴的方法,如果一個查詢返回的數據為空(不管是數據不存在,還是系統故障),我們仍然把這個空結果進行緩存,但它的過期時間會很短,最長不超過五分鍾。通過這個直接設置的默認值存放到緩存,這樣第二次到緩沖中獲取就有值了,而不會繼續訪問數據庫,這種辦法最簡單粗暴。

5TB的硬盤上放滿了數據,請寫一個算法將這些數據進行排重。如果這些數據是一些32bit大小的數據該如何解決?如果是64bit的呢?

對於空間的利用到達了一種極致,那就是Bitmap和布隆過濾器(Bloom Filter)。
Bitmap: 典型的就是哈希表
缺點是,Bitmap對於每個元素只能記錄1bit信息,如果還想完成額外的功能,恐怕只能靠犧牲更多的空間、時間來完成了。

Bloom-Filter布隆過濾器(推薦)

就是引入了k(k>1相互獨立的哈希函數,保證在給定的空間、誤判率下,完成元素判重的過程。
它的優點是空間效率和查詢時間都遠遠超過一般的算法,缺點是有一定的誤識別率和刪除困難。
Bloom-Filter算法的核心思想就是利用多個不同的Hash函數來解決“沖突”。
Hash存在一個沖突(碰撞)的問題,用同一個Hash得到的兩個URL的值有可能相同。為了減少沖突,我們可以多引入幾個Hash,如果通過其中的一個Hash值我們得出某元素不在集合中,那么該元素肯定不在集合中。只有在所有的Hash函數告訴我們該元素在集合中時,才能確定該元素存在於集合中。這便是Bloom-Filter的基本思想。

Bloom-Filter一般用於在大數據量的集合中判定某元素是否存在。

緩存穿透與緩存擊穿的區別

緩存擊穿:指一個key非常熱點,大並發集中對這個key進行訪問,當這個key在失效的瞬間,仍然持續的大並發訪問就穿破緩存,轉而直接請求數據庫。
解決方案;在訪問key之前,采用SETNX(set if not exists)來設置另一個短期key來鎖住當前key的訪問,訪問結束再刪除該短期key。

三、緩存預熱

緩存預熱就是系統上線后,將相關的緩存數據直接加載到緩存系統。這樣就可以避免在用戶請求的時候,先查詢數據庫,然后再將數據緩存的問題!用戶直接查詢事先被預熱的緩存數據!
解決思路:
1、直接寫個緩存刷新頁面,上線時手工操作下;
2、數據量不大,可以在項目啟動的時候自動進行加載;
3、定時刷新緩存;

四、緩存更新

除了緩存服務器自帶的緩存失效策略之外(Redis默認的有6種策略可供選擇),我們還可以根據具體的業務需求進行自定義的緩存淘汰,常見的策略有兩種:
(1)定時去清理過期的緩存;
(2)當有用戶請求過來時,再判斷這個請求所用到的緩存是否過期,過期的話就去底層系統得到新數據並更新緩存。
兩者各有優劣,第一種的缺點是維護大量緩存的key是比較麻煩的,第二種的缺點就是每次用戶請求過來都要判斷緩存失效,邏輯相對比較復雜!具體用哪種方案,大家可以根據自己的應用場景來權衡。

五、緩存降級

當訪問量劇增、服務出現問題(如響應時間慢或不響應)或非核心服務影響到核心流程的性能時,仍然需要保證服務還是可用的,即使是有損服務。系統可以根據一些關鍵數據進行自動降級,也可以配置開關實現人工降級。
降級的最終目的是保證核心服務可用,即使是有損的。而且有些服務是無法降級的(如加入購物車、結算)。
以參考日志級別設置預案:
(1)一般:比如有些服務偶爾因為網絡抖動或者服務正在上線而超時,可以自動降級;
(2)警告:有些服務在一段時間內成功率有波動(如在95~100%之間),可以自動降級或人工降級,並發送告警;
(3)錯誤:比如可用率低於90%,或者數據庫連接池被打爆了,或者訪問量突然猛增到系統能承受的最大閥值,此時可以根據情況自動降級或者人工降級;
(4)嚴重錯誤:比如因為特殊原因數據錯誤了,此時需要緊急人工降級。

服務降級的目的,是為了防止Redis服務故障,導致數據庫跟着一起發生雪崩問題。因此,對於不重要的緩存數據,可以采取服務降級策略,例如一個比較常見的做法就是,Redis出現問題,不去數據庫查詢,而是直接返回默認值給用戶。

熱點數據和冷數據是什么

熱點數據,緩存才有價值
對於冷數據而言,大部分數據可能還沒有再次訪問到就已經被擠出內存,不僅占用內存,而且價值不大。頻繁修改的數據,看情況考慮使用緩存
對於上面兩個例子,壽星列表、導航信息都存在一個特點,就是信息修改頻率不高,讀取通常非常高的場景。
對於熱點數據,比如我們的某IM產品,生日祝福模塊,當天的壽星列表,緩存以后可能讀取數十萬次。再舉個例子,某導航產品,我們將導航信息,緩存以后可能讀取數百萬次。
數據更新前至少讀取兩次,緩存才有意義。這個是最基本的策略,如果緩存還沒有起作用就失效了,那就沒有太大價值了。
那存不存在,修改頻率很高,但是又不得不考慮緩存的場景呢?有!比如,這個讀取接口對數據庫的壓力很大,但是又是熱點數據,這個時候就需要考慮通過緩存手段,減少數據庫的壓力,比如我們的某助手產品的,點贊數,收藏數,分享數等是非常典型的熱點數據,但是又不斷變化,此時就需要將數據同步保存到Redis緩存,減少數據庫壓力。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM