Caffeine和CompleteFuture實際應用總結


一.Caffeine 原理

1.1 常見緩存淘汰算法

  • FIFO:先進先出,在這種淘汰算法中,先進入緩存的會先被淘汰,會導致命中率很低。
  • LRU:最近最少使用算法,每次訪問數據都會將其放在我們的隊尾,如果需要淘汰數據,就只需要淘汰隊首即可。
  • LFU:最近最少頻率使用,利用額外的空間記錄每個數據的使用頻率,然后選出頻率最低進行淘汰。這樣就避免了 LRU 不能處理時間段的問題。

1.2 LRU和LFU缺點:

  • LRU 實現簡單,在一般情況下能夠表現出很好的命中率,是一個“性價比”很高的算法,平時也很常用。雖然 LRU 對突發性的稀疏流量(sparse bursts)表現很好,但同時也會產生緩存污染,舉例來說,如果偶然性的要對全量數據進行遍歷,那么“歷史訪問記錄”就會被刷走,造成污染。

  • 如果數據的分布在一段時間內是固定的話,那么 LFU 可以達到最高的命中率。但是 LFU 有兩個缺點,第一,它需要給每個記錄項維護頻率信息,每次訪問都需要更新,這是個巨大的開銷;第二,對突發性的稀疏流量無力,因為前期經常訪問的記錄已經占用了緩存,偶然的流量不太可能會被保留下來,而且過去的一些大量被訪問的記錄在將來也不一定會使。

1.3 W-TinyLFU 算法:

TinyLFU 算法:

  • 解決第一個問題是采用了 Count–Min Sketch 算法。

  • 為了解決 LFU 不便於處理隨時間變化的熱度變化問題,TinyLFU 采用了基於 “滑動時間窗” 的熱度衰減算法,簡單理解就是每隔一段時間,便會把計數器的數值減半,以此解決 “舊熱點” 數據難以清除的問題。

W-TinyLFU算法:

img

W-TinyLFU(Windows-TinyLFU):W-TinyLFU 又是 TinyLFU 的改進版本。TinyLFU 在實現減少計數器維護頻率的同時,也帶來了無法很好地應對稀疏突發訪問的問題,所謂稀疏突發訪問是指有一些絕對頻率較小,但突發訪問頻率很高的數據,此時 TinyLFU 就很難讓這類元素通過 Sketch 的過濾,因為它們無法在運行期間積累到足夠高的頻率。應對短時間的突發訪問是 LRU 的強項,W-TinyLFU 就結合了 LRU 和 LFU 兩者的優點,從整體上看是它是 LFU 策略,從局部實現上看又是 LRU 策略。具體做法是將新記錄暫時放入一個名為 Window Cache 的前端 LRU 緩存里面,讓這些對象可以在 Window Cache 中累積熱度,如果能通過 TinyLFU 的過濾器,再進入名為 Main Cache 的主緩存中存儲,主緩存根據數據的訪問頻繁程度分為不同的段(LFU 策略,實際上 W-TinyLFU 只分了兩段),但單獨某一段局部來看又是基於 LRU 策略去實現的(稱為 Segmented LRU)。每當前一段緩存滿了之后,會將低價值數據淘汰到后一段中去存儲,直至最后一段也滿了之后,該數據就徹底清理出緩存。

1.3.1 常用配置參數

expireAfterWrite:寫入間隔多久淘汰;
expireAfterAccess:最后訪問后間隔多久淘汰;
refreshAfterWrite:寫入后間隔多久刷新,當加載不到新數據采用舊數據時便可設置此參數。
maximumSize:緩存 key 的最大個數;
softValues:value設置為軟引用,在內存溢出前可以直接淘汰;
executor:選擇自定義的線程池,默認的線程池實現是 ForkJoinPool.commonPool();
recordStats:緩存的統計數據,比如命中率等;
removalListener:緩存淘汰監聽器;

1.3.2 同步加載數據

實戰代碼:

public class AdsPreCheckConfigListCache {
    private static Logger LOGGER = LoggerFactory.getLogger(AdsPreCheckConfigListCache.class);

    private LoadingCache<Integer, String> cache = null;

    /**
     * @description 初始化caffine
     * @author idslilang 
     * @updateTime 2021/6/2 14:48
     * @Return
     */
    public void init() {

        this.cache = Caffeine.newBuilder().initialCapacity(3000).maximumSize(10000)
                .refreshAfterWrite(1,TimeUnit.SECONDS)
                .expireAfterWrite(1, TimeUnit.SECONDS)
                .executor(ThreadPoolUtils.getThreadPoolExecutor())
                .recordStats()
                .build(initLoader());

    }

    public LoadingCache<Integer, String> getCache(){
        return cache;
    }

    private CacheLoader<Integer, String> initLoader() {
        return new CacheLoader<Integer, String>() {

            @Override
            public @Nullable String load(@NonNull Integer integer) throws Exception {
                System.out.println("i come load---->" + integer);
                return String.valueOf("load---->"+integer);
            }

            @Override
            public @Nullable String reload(@NonNull Integer key, @NonNull String oldValue) throws Exception {

                System.out.println("i come reload---->" + oldValue);
                return String.valueOf("reload---->"+key);

            }
        };
    }

    public Object get(Integer bizId){
        return cache.get(bizId);
    }

    public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException {
        AdsPreCheckConfigListCache cache = new AdsPreCheckConfigListCache();
        cache.init();
        while (true){
            cache.get(1);
            TimeUnit.SECONDS.sleep(1);
        }

    }
}

輸出結果:

i come load---->1
i come load---->1
i come load---->1
i come load---->1
i come load---->1
i come load---->1

當cache如下時:

      this.cache = Caffeine.newBuilder().initialCapacity(3000).maximumSize(10000)
                .refreshAfterWrite(1,TimeUnit.SECONDS)
                .expireAfterWrite(3, TimeUnit.SECONDS)
                .executor(ThreadPoolUtils.getThreadPoolExecutor())
                .recordStats()
                .build(initLoader());

輸出結果:

load---->1
load---->1
reload---->1
reload---->1
reload---->1
reload---->1
reload---->1
reload---->1
reload---->1

監聽過期數據

  this.cache = Caffeine.newBuilder().initialCapacity(3000).maximumSize(10000)
                .refreshAfterWrite(1,TimeUnit.SECONDS)
                .expireAfterWrite(3, TimeUnit.SECONDS)
                .executor(ThreadPoolUtils.getThreadPoolExecutor())
                .removalListener(new RemovalListener<Object, Object>() {
                    @Override
                    public void onRemoval(@Nullable Object key, @Nullable Object value, @NonNull RemovalCause removalCause) {
                        System.out.println(key);
                        System.out.println(value);
                    }
                })
                .recordStats()
                .build(initLoader());

主動請求的時候才會監聽到,不請求的時候無法監聽到。

當要cache自動清除數據,預先加載時候可以如下配置:

但是這種存在一種風險,當大量key同一時間失效的時候,會造成瞬間大量線程訪問數據庫,可以考慮重新申明一個隊列,監聽隊列處理。

    public void init() {

        this.cache = Caffeine.newBuilder().initialCapacity(3000).maximumSize(10000)
                .expireAfterWrite(1, TimeUnit.SECONDS)
                .executor(ThreadPoolUtils.getThreadPoolExecutor())
                .removalListener(new RemovalListener<Integer, String>() {
                    @Override
                    public void onRemoval(@Nullable Integer key, @Nullable String value, @NonNull RemovalCause removalCause) {
                        System.out.println("expire la ");
                        get(key);
                    }
                })
                .scheduler(Scheduler.forScheduledExecutorService( Executors.newSingleThreadScheduledExecutor()))
                .buildAsync(initLoader());

    }

1.3.3 異步加載數據

public class AdsPreCheckConfigListCache {
    private static Logger LOGGER = LoggerFactory.getLogger(AdsPreCheckConfigListCache.class);

    private AsyncLoadingCache<Integer, String> cache = null;

    /**
     * @description 初始化caffine
     * @author idslilang 
     * @updateTime 2021/6/2 14:48
     * @Return
     */
    public void init() {

        this.cache = Caffeine.newBuilder().initialCapacity(3000).maximumSize(10000)
                .expireAfterWrite(1, TimeUnit.SECONDS)
                .executor(ThreadPoolUtils.getThreadPoolExecutor())
                .removalListener(new RemovalListener<Integer, String>() {
                    @Override
                    public void onRemoval(@Nullable Integer key, @Nullable String value, @NonNull RemovalCause removalCause) {
                        System.out.println("expire la ");
                    }
                })
                .buildAsync(initLoader());

    }

    public AsyncLoadingCache<Integer, String> getCache(){
        return cache;
    }

    private AsyncCacheLoader<Integer, String> initLoader() {
        return new AsyncCacheLoader<Integer, String>() {

            @Override
            public @NonNull CompletableFuture<String> asyncLoad(@NonNull Integer key, @NonNull Executor executor) {
                System.out.println("i come load ");
                return    CompletableFuture.supplyAsync(()->{
                    return String.valueOf("load---->"+key);
                },executor);
            }

            @Override
            public @NonNull CompletableFuture<String> asyncReload(@NonNull Integer key, @NonNull String oldValue, @NonNull Executor executor) {
                System.out.println("i come reload ");
                return    CompletableFuture.supplyAsync(()->{
                    return String.valueOf("reload---->"+key);
                },executor);

            }

        };
    }

    public Object get(Integer bizId){
        try {
            return cache.get(bizId).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void main(String[] args) throws InterruptedException, TimeoutException, ExecutionException {
        AdsPreCheckConfigListCache cache = new AdsPreCheckConfigListCache();
        cache.init();
        while (true){
            System.out.println(cache.get(1));
            TimeUnit.SECONDS.sleep(6);
        }

    }
}

異步加載數據時候,可以對future設置超時時間,實現更加靈活的控制。

二:異步編程CompleteFuture實戰

2.1 Future獲取任務結果

使用Future獲得異步執行結果時,要么調用阻塞方法get(),要么輪詢看isDone()是否為true,這兩種方法主線程也會被迫等待。

從Java 8開始引入了CompletableFuture,它針對Future做了改進,可以傳入回調對象,當異步任務完成或者發生異常時,自動調用回調對象的回調方法。

public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> stringFuture = executor.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "future thread";
            }
        });
        Thread.sleep(1000);
        System.out.println("main thread");
        System.out.println(stringFuture.get());

}

2.2 CompletableFuture 異步執行任務

2.2.1 異步任務接口
//接受Runnable,無返回值,使用ForkJoinPool.commonPool()線程池public static CompletableFuture<Void> runAsync(Runnable runnable)//接受Runnable,無返回值,使用指定的executor線程池  public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)  //接受Supplier,返回U,使用ForkJoinPool.commonPool()線程池 這個線程池默認線程數是 CPU 的核數。public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)  //接受Supplier,返回U,使用指定的executor線程池 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

DEMO:

public CompletableFuture<String> getCompletableFutureData(){
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
        return  "return world";
    }, executor);
    return  completableFuture;
}
public void runAsync(){
    CompletableFuture.runAsync(()->{
       //do something async
    }, executor);;
}
2.2.2 設置任務結果
public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);
CompletableFuture.completedFuture();

設置結果DEMO1:

public void runAsync() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "我正在返回異步結果";
        },executor);
        completableFuture.complete("單獨設置返回結果");
        System.out.println(completableFuture.get());

        TimeUnit.SECONDS.sleep(6);
        System.out.println(completableFuture.get());

    }

輸出結果:

單獨設置返回結果
單獨設置返回結果

設置結果DEMO2:

public void runAsync() throws ExecutionException, InterruptedException {    CompletableFuture<String> completableFuture =CompletableFuture.completedFuture("我完成了任務");    completableFuture.complete("單獨設置返回結果");    System.out.println(completableFuture.get());    TimeUnit.SECONDS.sleep(6);    System.out.println(completableFuture.get());}

輸出結果:

我完成了任務我完成了任務

需要注意點:

一旦 complete 設置成功,CompletableFuture 返回結果就不會被更改,即使后續 CompletableFuture 任務執行結束。同樣,申明一個完成任務的future(CompletableFuture.completedFuture("我完成了任務")),后續再對其操作也不起作用。

異常任務設置DEMO3:

public void runAsync() throws ExecutionException, InterruptedException {    CompletableFuture<String> completableFuture =CompletableFuture.supplyAsync(()->{      return "我正在運行任務";    },executor);    completableFuture.completeExceptionally(new RuntimeException("我發生了異常"));    System.out.println(completableFuture.get());}

輸出結果中將不會得到任務結果,將會返回異常信息,項目中可以根據任務結果設置自定義異常信息,便於統一處理任務結果或者做任務監控。

2.2.3 串行關系
//有返回值CompletableFuture#thenApply//無返回值CompletableFuture#thenAccept//異步執行CompletableFuture#thenApplyAsyncCompletableFuture#thenAcceptAsync(Consumer<? super T> action);

代碼DEMO:

   CompletableFuture completableFuture = CompletableFuture.supplyAsync(()->{
       try {
           TimeUnit.SECONDS.sleep(5);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
       return " supplyAsync start --->";
   },executor).thenApply((res)->{
       //收到上層傳遞結果,繼續傳遞結果
       return res+"thenApply continue ---> ";
   }).thenAccept(res->{
       // 收到上層傳遞結果,不繼續傳遞結果,返回null
       System.out.println(res+"thenAccept done ");
   }).thenAccept(res->{
       //收到結果null
       System.out.println(res+"thenAccept done ");
   });

當使用同步執行的時候,需要等到所有串行結果執行完畢future才能獲取到值。調用thenApply方法執行第二個任務時,則第二個任務和第一個任務是共用同一個線程池。調用thenApplyAsync執行第二個任務時,則第一個任務使用的是你自己傳入的線程池,如果沒有傳入線程池,第二個任務使用的是ForkJoin線程池,當獲取completableFuture任務結果時,也需要等待所有串行任務執行完畢才行。

注意點:

當我們完成了一個異步任務,還需要操作一些與異步任務相關的其他操作,如刷緩存,寫日志等。則可以采用如下方式實現

public CompletableFuture runAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        return " supplyAsync start --->";
    }, executor);
	
    //鏈路已經斷開,后續不影響completableFuture 返回值,業務方獲取到的completableFuture信息為“supplyAsync start --->”
    completableFuture.thenAcceptAsync((res) -> {
    //執行任務  
    }, executor);

    return completableFuture;

}

2.2.4 並行執行關系

由於異步執行同上類似,不再進行展示

//等待兩個對象執行完畢,獲取返回結果CompletableFuture#thenCombine//等待兩個對象執行完畢,不獲取返回結果CompletableFuture#runAfterBoth//所有future對象執行完畢,不獲取返回結果CompletableFuture#allOf 

代碼DEMO:

public void runAsync() {    CompletableFuture<Integer> moneyFutrue = CompletableFuture.supplyAsync(() -> {        System.out.println("查詢錢包余額");        return 1000;    }, executor);    CompletableFuture<Integer> foodCostFuture = CompletableFuture.supplyAsync(() -> {        System.out.println("查詢食物價格");        return 100;    }, executor);    CompletableFuture<Integer> resData = moneyFutrue.thenCombine(foodCostFuture, (money, foodCost) -> {        System.out.println("剩余額度:");        return money - foodCost;    });        CompletableFuture<Integer> resData = moneyFutrue.thenCombine(foodCostFuture, (money, foodCost) -> {        System.out.println("剩余額度:");        System.out.println(money - foodCost);     });}

代碼DEMO2: allof 執行 獲取所有任務結果,對於實際的項目中,可以定義需要的對象去接收傳遞參數。

    public CompletableFutur<List<Integer>> runAsync() {        CompletableFuture<Integer> moneyFutrue = CompletableFuture.supplyAsync(() -> {            System.out.println("去王庄收的食物數量:");            return 1000;        }, executor);        CompletableFuture<Integer> foodCostFuture = CompletableFuture.supplyAsync(() -> {            System.out.println("去李庄收的食物數量:");            return 100;        }, executor);        List<CompletableFuture<Integer>> futures = new ArrayList<>();        futures.add(moneyFutrue);        futures.add(foodCostFuture);        CompletableFuture<Void> completableFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));        List<Integer> results = new ArrayList<>();        CompletableFuture<List<Integer>> resFutur=  completableFuture.thenApply((res) -> {            for (CompletableFuture<Integer> cmplfuture:futures){                //如果取不到,設置一個默認值                results.add(cmplfuture.getNow(0));            }            return results;        });        return resFutur;    }

此時可以用resFutur中的get或者join方法去獲取異步任務執行結果。

2.2.5 OR執行關系
//等待任何一個對象執行完畢,獲取返回結果CompletableFuture#acceptEither//等待任何對象執行完畢,不獲取返回結果CompletableFuture#runAfterEither//所有future對象執行完畢,不獲取返回結果CompletableFuture#anyOf

代碼DEMO:

public void runAsync() {    CompletableFuture<String> cf            = CompletableFuture.supplyAsync(() -> {        sleep(5, TimeUnit.SECONDS);        return "坐公交車";    });// 1    CompletableFuture<String> cf2 = cf.supplyAsync(() -> {        sleep(3, TimeUnit.SECONDS);        return "坐地鐵";    });    CompletableFuture<String> cf3 = cf2.applyToEither(cf, s -> s);    System.out.println(cf2.join());}

輸出結果:

坐地鐵

代碼DEMO2:

public void runAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> cf
            = CompletableFuture.supplyAsync(() -> {
        sleep(5, TimeUnit.SECONDS);
        return "坐公交車";
    });// 1

    CompletableFuture<String> cf2 = cf.supplyAsync(() -> {
        sleep(3, TimeUnit.SECONDS);
        return "坐地鐵";
    });

    ArrayList<CompletableFuture<String>> futures = new ArrayList<CompletableFuture<String>>();
    futures.add(cf);
    futures.add(cf2);

    CompletableFuture<Object> res = CompletableFuture.anyOf(cf,cf2);

    System.out.println(res.get());
}

數據結果:

坐地鐵
2.2.6 異常處理
//whenComplete 與 handle 方法就類似於 try..catch..finanlly 中 finally 代碼塊。無論是否發生異常,都將會執行的。這兩個方法區別在於 handle 支持返回結果。
CompletableFuture#handle

CompletableFuture#whenComplete

//使用方式類似於 try..catch 中 catch 代碼塊中異常處理。
CompletableFuture#exceptionally 

代碼DEMO:

public void runAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> cf
            = CompletableFuture.supplyAsync(() -> {
        int a = 1 / 0;
        sleep(5, TimeUnit.SECONDS);
        return "坐公交車";
    });

    cf.exceptionally(ex->{
        return null;
    }).thenAccept(res->{
        System.out.println(res);
    });

}

數據結果:

null

代碼DEMO2:

public void runAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> cf
            = CompletableFuture.supplyAsync(() -> {
        int a = 1 / 0;
        sleep(5, TimeUnit.SECONDS);
        return "坐公交車";
    });


    CompletableFuture<String> cf2 = cf.supplyAsync(() -> {
        sleep(3, TimeUnit.SECONDS);
        return "坐地鐵";
    });

    List<CompletableFuture<String>> futures = new ArrayList<>();
    futures.add(cf);
    futures.add(cf);
    
    //如果不捕獲異常,是無法執行到thenApply去取結果的
    CompletableFuture<List<String>> allRes= CompletableFuture.allOf(cf, cf2).exceptionally((ex) -> {
        return null;
    }).thenApply(res -> {
        List<String> result = new ArrayList<>();
        for (CompletableFuture<String> future : futures) {
            result.add(future.getNow(""));
        }
        return result;
    });


}

代碼DEMO3:

public void runAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> cf
            = CompletableFuture.supplyAsync(() -> {
        int a = 1 / 0;
        sleep(5, TimeUnit.SECONDS);
        return "坐公交車";
    });


    CompletableFuture<String> cf2 = cf.supplyAsync(() -> {
        sleep(3, TimeUnit.SECONDS);
        return "坐地鐵";
    });

    List<CompletableFuture<String>> futures = new ArrayList<>();
    futures.add(cf);
    futures.add(cf);
    CompletableFuture<List<String>> allRes= CompletableFuture.allOf(cf, cf2).whenComplete((res,ex)->{
        System.out.println(res);
    }).thenApply(res -> {
        List<String> result = new ArrayList<>();
        for (CompletableFuture<String> future : futures) {
            result.add(future.getNow(""));
        }
        return result;
    });
    //先異常捕獲,再進行任務處理,
   CompletableFuture<List<String>> allRes= CompletableFuture.allOf(cf, cf2).exceptionally(ex->{
        return null;
    }).thenApply(res -> {
        List<String> result = new ArrayList<>();
        for (CompletableFuture<String> future : futures) {
            result.add(future.getNow(""));
        }
        return result;
    }).whenComplete((res,ex)->{
        System.out.println("fasdfdsf");
    });


}

如上代碼中:當任務中有錯誤的時候,thenApply是無法執行的,除非進行異常捕獲 ,如果任務中沒有錯誤,thenApply可以繼續執行,實際項目中不推薦使用,正常業務邏輯處理中,可以先對批量任務進行異常捕獲,然后再對結果進行處理。

2.2.7 超時時間控制

CompletableFuture 超時時間控制可以采用兩種方式進行控制:

  1. 對CompletableFuture設置超時時間
  2. 業務方通過CompletableFuture獲取結果時候設置超時時間

DEMO代碼1 如下:

public void runAsync() throws ExecutionException, InterruptedException, TimeoutException {    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {        sleep(5, TimeUnit.SECONDS);        return "我超時了";    }).orTimeout(1,TimeUnit.SECONDS);    sleep(2,TimeUnit.SECONDS);    System.out.println(cf.isDone());    sleep(6,TimeUnit.SECONDS);    System.out.println(cf.get());    }

輸出結果:

trueException in thread "main" java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)	at com.idsidslilang.go.future.FutureService.runAsync(FutureService.java:25)	at com.idsidslilang.go.future.FutureService.main(FutureService.java:33)Caused by: java.util.concurrent.TimeoutException	at java.base/java.util.concurrent.CompletableFuture$Timeout.run(CompletableFuture.java:2792)	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)	at java.base/java.lang.Thread.run(Thread.java:834)

可以看出,設置了超時時間以后,在運行超過了兩秒時,future任務就已經終止。

第二種方式:

public void runAsync() {

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        sleep(5, TimeUnit.SECONDS);
        return "我超時了";
    });

    try {
        sleep(2,TimeUnit.SECONDS);
        System.out.println(cf.isDone());
        System.out.println(cf.get(2,TimeUnit.SECONDS));
    }catch (TimeoutException ex){
        ex.printStackTrace();
    }


}

輸出結果:

false
java.util.concurrent.TimeoutException
	at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
	at com.idsidslilang.go.future.FutureService.runAsync(FutureService.java:23)
	at com.idsidslilang.go.future.FutureService.main(FutureService.java:37)
2.2.8 超時時間生效判斷:

CompletableFuture從設置超時時間處開始進行計時,定義future時就已經在進行異步計算。

代碼DEMO:

public void runAsync() throws ExecutionException, InterruptedException, TimeoutException {

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        sleep(6, TimeUnit.SECONDS);
        return "我超時了";
    }).orTimeout(1,TimeUnit.SECONDS);
    System.out.println(cf.isDone());
    System.out.println(cf.get());;

}

輸出結果:

false
Exception in thread "main" java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
	at com.idsidslilang.go.future.FutureService.runAsync(FutureService.java:20)
	at com.idsidslilang.go.future.FutureService.main(FutureService.java:27)
Caused by: java.util.concurrent.TimeoutException
	at java.base/java.util.concurrent.CompletableFuture$Timeout.run(CompletableFuture.java:2792)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

代碼DEMO2:

public void runAsync() throws ExecutionException, InterruptedException, TimeoutException {

    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        sleep(4, TimeUnit.SECONDS);
        return "我超時了";
    });
    System.out.println(cf.isDone());
    sleep(5,TimeUnit.SECONDS);
    cf.orTimeout(1,TimeUnit.SECONDS);
    System.out.println(cf.get());;

}

輸出結果:

false
我超時了

代碼DEMO3:

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        sleep(4, TimeUnit.SECONDS);
        return "我超時了";
    });
    sleep(2,TimeUnit.SECONDS);
    System.out.println(cf.get(1,TimeUnit.SECONDS));
}

輸出結果:

Exception in thread "main" java.util.concurrent.TimeoutException
	at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
	at com.idsidslilang.go.future.FutureService.main(FutureService.java:33)

代碼DEMO4:

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        sleep(4, TimeUnit.SECONDS);
        return "我超時了";
    });
    sleep(4,TimeUnit.SECONDS);
    System.out.println(cf.get(1,TimeUnit.SECONDS));
}

輸出結果:

我超時了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM