JDK8的異步處理方式-CompletableFuture的使用


一、背景

jdk8中加入了實現類CompletableFuture,用於異步編程。底層做任務使用的是ForkJoin, 顧名思義,是將任務的數據集分為多個子數據集,而每個子集,都可以由獨立的子任務來處理,最后將每個子任務的結果匯集起來。它是ExecutorService接口的一個實現,它把子任務分配給線程池(稱為ForkJoinPool)中的工作線程。從api文檔看,它實現了2個接口CompletionStage和Future。CompletionStage支持lambda表達式,接口的方法的功能都是在某個階段得到結果后要做的事情。因此,CompletableFuture不僅擁有Future的所有特性,而且還內置了lambda表達式,支持異步回調,結果轉換等功能,它有以下Future實現不了的功能:

  1. 合並兩個相互獨立的異步計算的結果

  2. 等待異步任務的所有任務都完成

  3. 等待異步任務的其中一個任務完成就返回結果

  4. 任務完成后調用回調方法

  5. 任務完成的結果可以用於下一個任務。

  6. 任務完成時發出通知提供原生的異常處理api

二、代碼

     

package com.example.demo;


import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

public class CompletableFutureDemo {
     //CPU核數
    private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(AVAILABLE_PROCESSORS,
            3 * AVAILABLE_PROCESSORS,
            3, TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(20));

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        System.out.println("demo start....." + startTime);
        demo3();
        System.out.println("demo end.....costTime = " + (System.currentTimeMillis() - startTime));
    }

    /**
     * 基於allOf,並行處理多個任務,等待所有任務執行完畢后返回
     */

    public static void demo3() throws Exception {
       //用戶整體接收各個任務的返回值
        Map<String,String> dataMap = new ConcurrentHashMap<>();
        List<CompletableFuture<String>> futureList = new ArrayList<>();
        futureList.add(doSomethingA("A", dataMap));
        futureList.add(doSomethingB("B", dataMap));
        futureList.add(doSomethingC("C", dataMap));
        CompletableFuture<Void> result = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
        try {
                result.get(3, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("dataMap = " + dataMap);
       //結果為:{doSomeThingB=B, doSomeThingA=A}
    }

    /**
     * 基於thenCompose,第一個任務執行完后,第二個任務使用第一個任務的返回作為參數
     */
    public static void demo1() throws Exception {
        Map<String,String> dataMap = new HashMap<>();
        CompletableFuture<String> completableFuture = doSomethingA("A", dataMap)
                .thenCompose(id -> doSomethingB(id, dataMap));
        String result = completableFuture.get(3, TimeUnit.SECONDS);
        System.out.println("result = " + result);
        //結果為:A is done is done

    }

    /**
     * 基於thenCombine,當兩個任務都完成后,使用兩者的結果作為參數再執行一個異步任務
     */
    public static void demo2() throws Exception {
        Map<String,String> dataMap = new HashMap<>();
        CompletableFuture<String> completableFuture = doSomethingA("A", dataMap)
                .thenCombine(doSomethingB("B", dataMap), (a, b) -> a + " - " + b);
        String result = completableFuture.get(3, TimeUnit.SECONDS);
        System.out.println("result = " + result);
//結果為:A is done - B is done
    }

    /**
     * @param dataMap 用戶整體接收方法的返回值
     * @return
     */
    public static CompletableFuture<String> doSomethingA(String taskId, Map<String,String> dataMap) {
        System.out.println("doSomethingA start....." + System.currentTimeMillis());
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            dataMap.put("doSomeThingA", "A");
            System.out.println(taskId + " is done and dataMap"+dataMap);
            return taskId + " is done";
        }, threadPoolExecutor);
    }

    public static CompletableFuture<String> doSomethingB(String taskId, Map<String,String> dataMap) {
        System.out.println("doSomethingB start....." + System.currentTimeMillis());
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            dataMap.put("doSomeThingB", "B");
            System.out.println(taskId + " is done and dataMap"+dataMap);
            return taskId + " -> B is done";
        }, threadPoolExecutor);
    }

    public static CompletableFuture<String> doSomethingC(String taskId, Map<String,String> dataMap) {
        System.out.println("doSomethingC start....." + System.currentTimeMillis());
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            dataMap.put("doSomeThingC", "C");
            System.out.println(taskId + " is done and dataMap"+dataMap);
            return taskId + " is done";
        }, threadPoolExecutor);

    }

}

三、效率比較

很明顯,異步更快

package com.example.demo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
 * @author d00018641
 * @date 2021/11/4 15:10
 */
public class TestDemo2 {
    private static final String key = "llllllllllllllllllllllll";
    public static void main(String[] args) {

        List<String> requestList = new ArrayList<>();
        requestList.add("3");
        requestList.add("4");
        requestList.add("5");
        requestList.add("6");
        // 響應參數list
        String[] returnArray = new String[requestList.size()];
        // 異步查詢每一列,定義響應列數的futures
        List<CompletableFuture<String>> futures = new ArrayList<>();
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < requestList.size(); i++) {
            final int a = i;
            CompletableFuture<String> tf = CompletableFuture.supplyAsync(() -> {
                return calc(requestList.get(a));
            }).whenComplete((m, e) -> returnArray[a] = m);
            futures.add(tf);
        }
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        //CompletableFuture end.....costTime = 147
        System.out.println("CompletableFuture end.....costTime = " + (System.currentTimeMillis() - startTime));
        long startTime1 = System.currentTimeMillis();
        for(int i = 0; i < requestList.size(); i++){
            returnArray[i] = calc(requestList.get(i));
        }
        //連續 end.....costTime = 432
        System.out.println("連續 end.....costTime = " + (System.currentTimeMillis() - startTime1));
        System.out.println(Arrays.asList(returnArray));

    }

    private static String calc(String source) {
        int as = Integer.parseInt(source);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return String.valueOf(Math.pow(as, 3));
    }
}

 四、編程實戰

Map<String, ExchangeRateVO> cacheMap = new ConcurrentHashMap<>();
        List<CompletableFuture<Void>> batchFutureList = new ArrayList<>();
        for (ExchangeRateVO vo : paramList) {
            CompletableFuture<Void> batchItem = CompletableFuture.runAsync(() -> {
                List<CompletableFuture<Void>> itemFutureList = new ArrayList<>();
                for (LookupItemVO toCurrency : toCurrencyList) {
                    CompletableFuture<Void> cfItem = CompletableFuture.runAsync(() -> {
                        // 查詢接口返回匯率數據
                        ExchangeRateVO resultVo;
                        try {
                            buildBasicContext();
                            resultVo = iExchangeRateService.findExchangeRate(vo.getCountryCode(), vo.getFromCurrency(),
                                toCurrency.getItemCode(), vo.getStartDate());
                            if (CommonUtils.isNotEmpty(resultVo)) {
                                if (CommonUtils.isEmpty(resultVo.getFromCurrency())) {
                                    resultVo.setFromCurrency(vo.getFromCurrency());
                                    resultVo.setToCurrency(toCurrency.getItemCode());
                                    resultVo.setCountryCode(vo.getCountryCode());
                                }
                                resultVo.setStartDate(vo.getStartDate());
                                resultList.add(resultVo);
                                cacheMap.put(vo.getCountryCode() + vo.getFromCurrency() + toCurrency.getItemCode(),
                                    resultVo);
                            }
                        } catch (Exception er) {
                            LOGGER.error("findExchangeRate has some error:", er);
                        }
                    });
                    itemFutureList.add(cfItem);
                }
                CompletableFuture.allOf(itemFutureList.toArray(new CompletableFuture[itemFutureList.size()])).join();
            });
            batchFutureList.add(batchItem);
        }
        LOGGER.info2("before submit completable");
        CompletableFuture.allOf(batchFutureList.toArray(new CompletableFuture[batchFutureList.size()])).join();

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM