構建高效且可伸縮的結果緩存


你好呀,我是歪歪。

我來填坑來啦。

上周發布了《當Synchronized遇到這玩意兒,有個大坑,要注意!》這篇文章。

文章的最后,我提到了《Java並發編程實戰》的第 5.6 節的內容,說大家可以去看看。

我不知道有幾個同學去看了,但是我知道絕大部分同學都沒去看的,所以這篇文章我也給大家安排一下,怎么去比較好的實現一個緩存功能。

感受一下大師的代碼方案演進的過程。

需求

這不都二月中旬了嘛,馬上就要出考研成績了,我就拿這個來舉個例子吧。

需求很簡單:從緩存中查詢,查不到則從數據庫獲取,並放到緩存中去,供下次使用。

核心代碼大概就是這樣的:

Integer score = map.get("why");
if(score == null){
   score = loadFormDB("why");
   map.put("why",score);
}

有了核心代碼,所以我把代碼補全之后應該是這樣的:

public class ScoreQueryService {

    private final Map<String, Integer> SCORE_CACHE = new HashMap<>();

    public Integer query(String userName) throws InterruptedException {
        Integer result = SCORE_CACHE.get(userName);
        if (result == null) {
            result = loadFormDB(userName);
            SCORE_CACHE.put(userName, result);
        }
        return result;
    }

    private Integer loadFormDB(String userName) throws InterruptedException {
        System.out.println("開始查詢userName=" + userName + "的分數");
        //模擬耗時
        TimeUnit.SECONDS.sleep(1);
        return ThreadLocalRandom.current().nextInt(380, 420);
    }
}

然后搞一個 main 方法測試一下:

public class MainTest {

    public static void main(String[] args) throws InterruptedException {
        ScoreQueryService scoreQueryService = new ScoreQueryService();
        Integer whyScore = scoreQueryService.query("why");
        System.out.println("whyScore = " + whyScore);
        whyScore = scoreQueryService.query("why");
        System.out.println("whyScore = " + whyScore);
    }
}

把代碼兒跑起來:

好家伙,第一把就跑了個 408 分,我考研要是真能考到這個分數,怕是做夢都得笑醒。

Demo 很簡單,但是請你注意,我要開始變形了。

首先把 main 方法修改為這樣:

public class MainTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        ScoreQueryService scoreQueryService = new ScoreQueryService();
        for (int i = 0; i < 3; i++) {
            executorService.execute(()->{
                try {
                    Integer why = scoreQueryService.query("why");
                    System.out.println("why = " + why);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

利用線程池提交任務,模擬同一時間發起三次查詢請求,由於 loadFormDB 方法里面有模擬耗時的操作,那么這三個請求都不會從緩存中獲取到數據。

具體是不是這樣的呢?

看一下運行結果:

輸出三次,得到了三個不同的分數,說明確實執行了三次 loadFormDB 方法。

好,同學們,那么問題就來了。

很明顯,在這個場景下,我只想要一個線程執行 loadFormDB 方法就行了,那么應該怎么操作呢?

看到這個問題的瞬間,不知道你的腦袋里面有沒有電光火石般的想起緩存問題三連擊:緩存雪崩、緩存擊穿、緩存穿透。

畢竟應對緩存擊穿的解決方案之一就是只需要一個請求線程去做構建緩存,其他的線程就輪詢等着。

然后腦海里面自然而然的就浮現出了 Redis 分布式鎖的解決方案,甚至還想到了應該用 setNX 命令來保證只有一個線程放行成功。嘴角漏出一絲不易察覺的笑容,甚至想要關閉這篇文章。

不好意思,收起你的笑容,不能用 Redis,不能用第三方組件,只能用 JDK 的東西。

別問為什么,問就是沒有引入。

這個時候你怎么辦?

初始方案

聽說不能用第三方組件之后,你也一點不慌,大喊一聲:鍵來。

拿着鍵盤只需要啪啪啪三下就寫完了代碼:

加上一個 synchronized 關鍵字就算完事,甚至你還記得程序員的自我修養,完成了一波自測,發現確實沒有問題:

loadFromDB 方法只執行了一次。

但是,朋友,你有沒有想過你這個鎖的粒度有點太大了啊。

直接把整個方法給鎖了。

本來一個好好的並行方法,你給咔一下,搞成串行的了:

而且你這是無差別亂殺啊,比如上面這個示意圖,你要是說當第二次查詢 why 的成績的時候,把這個請求給攔下來,可以理解。

但是你同時也把第一次查詢 mx 的成績給攔截了。弄得 mx 同學一臉懵逼,搞不清啥情況。

注意,這個時候自然而然就會想到縮小鎖的粒度,把鎖的范圍從全局修改為局部,拿出比如用 why 對象作為鎖的這一類解決方案。

比如偽代碼改成這樣:

Integer score = map.get("why");
if(score == null){
   synchronized("why"){
       score = loadFormDB("why");
       map.put("why",score);
   }    
}

如果到這里你還沒反應過來,那么我再換個例子。

假設我這里的查詢條件變 Integer 類型的編號呢?

比如我的編號是 200,是不是偽代碼就變成了這樣:

Integer score = map.get(200);
if(score == null){
   synchronized(200){
       score = loadFormDB(200);
       map.put(200,score);
   }    
}

看到這里你要是還沒反應過來的話我只能大喊一聲:你個假讀者!之前發的文章肯定沒看吧?

之前的《當Synchronized遇到這玩意兒,有個大坑,要注意!》這篇文章不全篇都在說這個事兒嗎?

你要不知道問題是啥,你就去翻一下。

這篇文章肯定也不會往這個方向去寫。不能死盯着 synchronize 不放,不然思路打不開。

我們這里不能用 synchronized 這個玩意。

但是你仔細一看,如果不用 synchronized 的話,這個 map 也不行啊:

private final Map<String, Integer> SCORE_CACHE = new HashMap<>();

這是個 HashMap,不是線程安全的呀。

怎么辦?

演進唄。

演進一下

這一步非常簡單,和最開始的程序相比,只是把 HashMap 替換為 ConcurrentHashMap。

然后就啥也沒干了。

是不是感覺有點懵逼,甚至感覺有一定被耍了的感覺?

有就對了,因為這一步改變就是書里面的一個方案,我第一次看到的時候反正也是感覺有點懵逼:

我真沒騙你,不信我拍照給你看:

這個方案和初始方案比,唯一的優點就是並發度上來了,因為 ConcurrentHashMap 是線程安全的。

但是,整個方案作為緩存來說,從上面的示意圖也能看出,就是一句話:卵用沒有。

因為根本就不能滿足“相同的請求下,如果緩存中沒有,只有一個請求線程執行 loadFormDB 方法”這個需求,比如 why 的短時間內的兩次查詢操作就執行兩次 loadFormDB 方法。

它的毛病在哪兒呢?

如果多個線程都是查 why 這個人成績的前提下,如果一個線程去執行 loadFormDB 方法了,而另外的線程根本感知不到有線程在執行該方法,那么它們沖進來后一看:我去,緩存里面壓根沒有啊?那我也去執行 loadFormDB 方法吧。

完犢子了,重復執行了。

那么在 JDK 原生的方法里面有沒有一種機制來表示已經有一個請求查詢 why 成績的線程在執行 loadFormDB 方法了,那么其他的查詢 why 成績的線程就等這個結果就行了,沒必要自己也去執行一遍。

這個時候就考驗你的知識儲備了。

你想到了什么?

繼續演進

FutureTask 是異步編程里面的一個非常重要的組成部分。

比如線程池的應用中,當你使用 submit 的方式提交任務時,它的返回類型就是 Future:

反正基於 Future 這個東西,可以玩出花兒來。

比如我們的這個場景中,如果要用到 FutureTask,那么我們的 Map 就需要修改為這樣:

Map<String, Future > SCORE_CACHE = new ConcurrentHashMap<>();

通過維護姓名和 Future 的關系來達到我們的目的。

Future 本身就代表一個任務,對於緩存維護這個需求來說,這個任務到底是在執行中還是已經執行完成了它並不關心,這個“它”指的是 SCORE_CACHE 這個 Map。

對於 Map 來說,只要有個任務放進來就行了。

而任務到底執行完成沒有,應該是從 Map 里面 get 到對應 Future 的這個線程關心的。

它怎么關心?

通過調用 Future.get() 方法。

整個代碼寫出來就是這樣的:

public class ScoreQueryService {

    private final Map<String, Future<Integer>> SCORE_CACHE = new ConcurrentHashMap<>();

    public Integer query(String userName) throws Exception {
        Future<Integer> future = SCORE_CACHE.get(userName);
        if (future == null) {
            Callable<Integer> callable = () -> loadFormDB(userName);
            FutureTask futureTask = new FutureTask<>(callable);
            future = futureTask;
            SCORE_CACHE.put(userName, futureTask);
            futureTask.run();
        }
        return future.get();
    }

    private Integer loadFormDB(String userName) throws InterruptedException {
        System.out.println("開始查詢userName=" + userName + "的分數");
        //模擬耗時
        TimeUnit.SECONDS.sleep(1);
        return ThreadLocalRandom.current().nextInt(380, 420);
    }
}

怕你不熟悉 futureTask ,所以簡單解釋一下關於 futureTask 的四行代碼,但是我還是強烈建議你把這個東西掌握了,畢竟說它是異步編程的基石之一也不為過。

基石還是得拿捏明白,否則就容易被面試官拿捏。

Callable<Integer> callable = () -> loadFormDB(userName);
FutureTask futureTask = new FutureTask<>(callable);
futureTask.run();
return future.get();

首先我構建了一個 Callable 作為 FutureTask 構造函數的入參。

構造函數上面的描述翻譯過來就是:創建一個 FutureTask,運行時將執行給定的 Callable。

“運行時”指的就是 futureTask.run() 這一行代碼,而“給定的 Callable ”就是 loadFormDB 任務。

也就是說調用了 futureTask.run() 之后,才有可能會執行到 loadFormDB 方法。

然后調用 future.get() 就是獲取 Callable 的結果 ,即獲取 loadFormDB 方法的結果。如果該方法還沒有運行結束,就死等。

對於這個方案,書上是這樣說的:

主要關注我划線的部分,我一句句的說

它只有一個缺陷,即仍然存在兩個線程計算出相同值的漏洞。

這句話其實很好理解,因為代碼里面始終有一個“①獲取-②判斷-③放置”的動作。

這個動作就不是原子性的,所以有一定的幾率兩個線程都沖進來,然后發現緩存中沒有,就都走到 if 分支里面去了。

但是標號為 ① 和 ② 的地方,從需求實現的角度來說,肯定是必不可少的。

能想辦法的地方也就只有標號為 ③ 的地方了。

到底啥辦法呢?

不着急,下一小節說,我先把后半句話給解釋了:

這個漏洞的發生概率要遠小於 Memoizer2 中發生的概率。

Memoizer2 就是指前面用 ConcurrentHashMap 替換 HashMap 后的方案。

那么為什么引入 Future 之后的這個方案,觸發剛剛說到的 bug 的概率比之前的方案小呢?

答案就藏在這兩行代碼里面:

之前是要把業務邏輯執行完成,拿到返回值之后才能維護到緩存里面。

現在是先維護緩存,然后再執行業務邏輯,節約了執行業務邏輯的時間。

而一般來說最耗時的地方就是業務邏輯的執行,所以這個“遠小於”就是這樣來的。

那怎么辦呢?

接着演進呀。

最終版

書里面,針對上面那個“若沒有則添加”這個非原子性的動作的時候,提到了 map 的一個方法:

Map 的 putIfAbsent,這個方法就厲害了。帶你看一下:

首先從標號為 ① 的地方我們可以知道,這個方法傳進來的 key 如果還沒有與一個值相關聯(或被映射為null),則將其與給定的值進行映射並返回 null ,否則返回當前值。

如果我們只關心返回值的話,那就是:如果有就返回對應的值,如果沒有就返回 null。

標號為 ② 的地方說的是啥呢?

它說默認的實現沒有對這個方法的同步性或原子性做出保證。如果你要提供原子性保證,那么就請覆蓋此方法,自己去寫。

所以,我們接着就要關注一下 ConcurrentHashMap 的這個方法是怎么搞得了:

還是通過 synchronized 方法來保證了原子性,當操作的是同一個 key 的時候保證只有一個線程去執行 put 的操作。

所以書中給出的最終實現,是這樣的:

public class ScoreQueryService {

    public static final Map<String, Future<Integer>> SCORE_CACHE = new ConcurrentHashMap<>();

    public Integer query(String userName) throws Exception {
        while (true) {
            Future<Integer> future = SCORE_CACHE.get(userName);
            if (future == null) {
                Callable<Integer> callable = () -> loadFormDB(userName);
                FutureTask futureTask = new FutureTask<>(callable);
                future = SCORE_CACHE.putIfAbsent(userName, futureTask);
                //如果為空說明之前這個 key 在 map 里面不存在
                if (future == null) {
                    future = futureTask;
                    futureTask.run();
                }
            }
            try {
                return future.get();
            } catch (CancellationException e) {
                System.out.println("查詢userName=" + userName + "的任務被移除");
                SCORE_CACHE.remove(userName, future);
            } catch (Exception e) {
                throw e;
            }
        }
    }

    private Integer loadFormDB(String userName) throws InterruptedException {
        System.out.println("開始查詢userName=" + userName + "的分數");
        //模擬耗時
        TimeUnit.SECONDS.sleep(5);
        return ThreadLocalRandom.current().nextInt(380, 420);
    }
}

與前一個方案,有三個不一樣的地方。

  • 第一個是采用了 putIfAbsent 替換 put 方法。
  • 第二個是加入了 while(true) 循環。
  • 第三個是 future.get() 拋出 CancellationException 異常后執行了清除緩存的動作。

第一個沒啥說的,前面已經解釋了。

第二個和第三個,說實話當他們組合在一起用的時候,我沒看的太明白。

首先,從程序上講,這兩個是相輔相成的代碼,因為 while(true) 循環我理解只有 future.get() 拋出 CancellationException 異常的時候才會起到作用。

拋出 CancellationException 異常,說明當前的這個任務被其他地方調用了 cancel 方法,而由於 while(true) 的存在,且當前的這個任務被 remove 了,所以 if 條件成功,就會再次構建一個一樣的任務,然后繼續執行:

也就是說移除的任務和放進去的任務是一模一樣的。

那是不是就不用移除?

沒轉過彎的話沒關系,我先給你上個代碼看看,你就明白了:

其中 ScoreQueryService 的代碼我前面已經給了,就不截圖了。

可以看到這次只往線程池里面扔了一個任務,然后接着把緩存里面的任務拿出來,調用 cancel 方法取消掉。

這個程序的輸出結果是這樣的:

所以,由於 while(true) 的存在,導致 cancel 方法失效。

然后我前面說:移除的任務和放進去的任務是一模一樣的。那是不是就不用移除?

表現在代碼里面就是這樣的:

不知道作者為啥要專門搞個移除的動作,經過這一波分析,這一行代碼完全是可以注釋掉的嘛。

但是...

對嗎?

這是不對的,老鐵。如果這行代碼被注釋了,那么程序的輸出就是這樣的:

變成一個死循環了。

為什么變成死循環了?

因為 FutureTask 這個玩意是有生命周期的:

被 cancelled 之后,生命周期就完成了,所以如果不從緩存里面移走那就芭比Q了,取出來的始終是被取消的這個,那么就會一直拋出異常,然后繼續循環。

死循環就是這樣來的。

所以移除的動作必須得有, while(true) 就看你的需求了,加上就是 cannel 方法“失效”,去掉就是可以調用 cannel 方法。

關於 FutureTask 如果你不熟悉的話,我寫過兩篇文章,你可以看看。

《老爺子這代碼,看跪了!》

《Doug Lea在J.U.C包里面寫的BUG又被網友發現了。》

接着,我們再驗證一下最終代碼是否運行正常:

三個線程最終查出來的分數是一樣的,沒毛病。

如果你想觀察一下阻塞的情況,那么可以把睡眠時間拉長一點:

然后,把代碼跑起來,看堆棧信息:

一個線程在 sleep,另外兩個線程執行到了 FutureTask 的 get 方法。

sleep 的好理解,為什么另外兩個線程阻塞在 get 方法上呢?

很簡單,因為另外兩個線程返回的 future 不是 null,這是由 putIfAbsent 方法的特性決定的:

好了,書中給出的最終方案的代碼也解釋完了。

但是書里面還留下了兩個“坑”:

一個是不支持緩存過期機制。

一個是不支持緩存淘汰機制。

等下再說,先說說我的另一個方案。

還有一個方案

其實我也還有一個方案,拿出來給大家看看:

public class ScoreQueryService2 {

    public static final Map<String, Future<Integer>> SCORE_CACHE = new ConcurrentHashMap<>();

    public Integer query(String userName) throws Exception {
        while (true) {
            Future<Integer> future = SCORE_CACHE.get(userName);
            if (future == null) {
                Callable<Integer> callable = () -> loadFormDB(userName);
                FutureTask futureTask = new FutureTask<>(callable);
                FutureTask<Integer> integerFuture = (FutureTask) SCORE_CACHE.computeIfAbsent(userName, key -> futureTask);
                future = integerFuture;
                integerFuture.run();
            }
            try {
                return future.get();
            } catch (CancellationException e) {
                SCORE_CACHE.remove(userName, future);
            } catch (Exception e) {
                throw e;
            }
        }
    }

    private Integer loadFormDB(String userName) throws InterruptedException {
        System.out.println("開始查詢userName=" + userName + "的分數");
        //模擬耗時
        TimeUnit.SECONDS.sleep(1);
        return ThreadLocalRandom.current().nextInt(380, 420);
    }
}

和書中給出的方案差異點在於用 computeIfAbsent 代替了 putIfAbsent:

V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction)

computeIfAbsent,首先它也是一個線程安全的方法,這個方法會檢查 Map 中的 Key,如果發現 Key 不存在或者對應的值是 null,則調用 Function 來產生一個值,然后將其放入 Map,最后返回這個值;否則的話返回 Map 已經存在的值。

putIfAbsent,如果 Key 不存在或者對應的值是 null,則將 Value 設置進去,然后返回 null;否則只返回 Map 當中對應的值,而不做其他操作。

所以這二者的區別之一在於返回值上。

用了 computeIfAbsent 之后,每次返回的都是同一個 FutureTask,但是由於 FutureTask 的生命周期,或者說是狀態扭轉的存在,即使三個線程都調用了它的 run 方法,這個 FutureTask 也只會執行成功一次。

可以看一下,這個 run 方法的源碼,一進來就是狀態和當前操作線程的判斷:

所以執行完一次 run 方法之后,再次調用 run 方法並不會真的執行。

但是從程序實現的優雅角度來說,還是 putIfAbsent 方法更好。

坑怎么辦?

前面不是說最終的方案有兩個坑嘛:

  • 一個是不支持緩存過期機制。
  • 一個是不支持緩存淘汰機制。

在使用 ConcurrentHashMap 的前提下,這兩個特性如果要支持的話,需要進行對應的開發,比如引入定時任務來解決,想想就覺得麻煩。

同時也我想到了 spring-cache,我知道這里面有 ConcurrentHashMap 作為緩存的實現方案。

我想看看這個組件里面是怎么解決這兩個問題的。

二話不說,我先把代碼拉下來看看:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
</dependency>

由於 spring-cache 也不是本文重點,所以我就直接說關鍵地方的源碼了。

至於是怎么找到這里來的,就不詳細介紹了,以后安排文章詳細解釋。

另外我不得不說一句:spring-cache 這玩意真的是優雅的一比,不論是源碼還是設計模式的應用,都非常的好。

首先,我們可以看到 @Cacheable 注解里面有一個參數叫做 sycn,默認值是 false:

關於這個參數,官網上的解釋是這樣的:

https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#cache-annotations-cacheable-cache-resolver

就是針對我們前面提到的緩存如何維護的情況的一個處理方案。使用方法也很簡單。

該功能對應的核心部分的源碼在這個位置:

org.springframework.cache.interceptor.CacheAspectSupport#execute(org.springframework.cache.interceptor.CacheOperationInvoker, java.lang.reflect.Method, org.springframework.cache.interceptor.CacheAspectSupport.CacheOperationContexts)

在上面這個方法中會判斷是不是 sync=true 的方法,如果是則進入到 if 分支里面。

接着會執行到下面這個重要的方法:

org.springframework.cache.interceptor.CacheAspectSupport#handleSynchronizedGet

在這個方法里面,入參 cache 是一個抽象類,Spring 提供了六種默認的實現:

而我關心的是 ConcurrentMapCache 實現,點進去一看,好家伙,這方法我熟啊:

org.springframework.cache.concurrent.ConcurrentMapCache#get

computeIfAbsent 方法,我們不是剛剛才說了嘛。但是我左翻右翻就是找不到設置過期時間和淘汰策略的地方。

於是,我又去翻官網了,發現答案就直接寫在官網上的:

https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#cache-specific-config

這里說了,官方提供的是一個緩存的抽象,而不是具體的實現。而緩存過期和淘汰機制不屬於抽象的范圍內。

為什么呢?

比如拿 ConcurrentHashMap 來說,假設我提供了緩存過期和淘汰機制的抽象,那你說 ConcurrentHashMap 怎么去實現這個抽象方法?

實現不了,因為它本來就不支持這個機制。

所以官方認為這樣的功能應該由具體的緩存實現類去實現而不是提供抽象方法。

這里也就回復了前面的最終方案引申出的這兩個問題:

  • 一個是不支持緩存過期機制。
  • 一個是不支持緩存淘汰機制。

別問,問就是原生的方法里面是支持不了的。如果要實現自己去寫代碼,或者換一個緩存方案。

再說兩個點

最后,再補充兩個點。

第一個點是之前的《當Synchronized遇到這玩意兒,有個大坑,要注意!》這篇文章里面,有一個地方寫錯了。

框起來的地方是我后面加上來的。

上周的文章發出去后,大概有十來個讀者給我反饋這個問題。

我真的特別的開心,因為真的有人把我的示例代碼拿去跑了,且認真思考了,然后來和我討論,幫我指正我寫的不對的地方。

再給大家分享一下我的這篇文章《當我看技術文章的時候,我在想什么?》

里面表達了我對於看技術博客的態度:

看技術文章的時候多想一步,有時候會有更加深刻的理解。

帶着懷疑的眼光去看博客,帶着求證的想法去證偽。

多想想 why,總是會有收獲的。

第二個點是這樣的。

關於 ConcurrentHashMap 的 computeIfAbsent 我其實也專門寫過文章的:《震驚!ConcurrentHashMap里面也有死循環,作者留下的“彩蛋”了解一下?》

老讀者應該是讀到過這篇文章的。

之前在 seata 官網上閑逛的時候,看到了這篇博客:

https://seata.io/zh-cn/blog/seata-dsproxy-deadlock.html

名字叫做《ConcurrentHashMap導致的Seata死鎖問題》,我就隨便這么點進去一看:

這里提到的這篇文章,就是我寫的。

在 seata 官網上偶遇自己的文章是一種很神奇的體驗。

四舍五入,我也算是給 seata 有過貢獻的男人。

而且你看這篇文章其實也提到了我之前寫過的很多文章,這些知識都通過一個小小的點串起來了,由點到線,由線到面,這也是我堅持寫作的原因。

共勉之。

最后,呼應一下文章的開頭部分,考研馬上要查分了,我知道我的讀者里面還是有不少是今年考研的。

如果你看到了這里,那么下面這個圖送給你:

本文已收錄至個人博客,更多原創好文,歡迎大家來玩:

https://www.whywhy.vip/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM