CompletableFuture筆記


CompletableFuture是java8引入的一個很實用的特性,可以視為Future的升級版本,以下幾個示例可以說明其主要用法(注:示例來自《java8實戰》一書第11章)

一、引子:化同步為異步

為了方便描述,假設"查詢電商報價"的場景:有一個商家Shop類,對外提供價格查詢的服務getPrice

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;


public class Shop {

    public String name;

    private Random random = new Random();

    public Shop(String name) {
        this.name = name;
    }

    /**
     * 計算價格
     *
     * @param product
     * @return
     */
    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    /**
     * 模擬計算價格的耗時
     */
    private static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 對外提供的報價服務方法
     *
     * @param product
     * @return
     */
    public double getPrice(String product) {
        return calculatePrice(product);
    }


}

平台可以調用getPrice方法獲取某個商家的報價:

    public static void main(String[] args) {
        testSyncGetPrice();
    }

    private static void doSomethingElse() {
        System.out.println("do something else");
    }

    public static void testSyncGetPrice() {
        Shop shop = new Shop("BestShop");
        long start = System.currentTimeMillis();
        System.out.printf("Price is %.2f\n", shop.getPrice("my favorite product"));
        doSomethingElse();
        System.out.println("(SyncGetPrice) Invocation returned after : " + (System.currentTimeMillis() - start) + " ms\n");
    }

顯然,這是1個同步調用,在shop.getPrice()方法執行完前,后面的doSomethingElse()只能等着,輸出結果如下:

Price is 222.01
do something else
(SyncGetPrice) Invocation returned after : 1015 ms

為了消除同步阻塞,可以借用Future將同步的getPrice方法調用,轉換成異步。

    public Future<Double> getPriceAsync(String product) {
        Future<Double> submit = Executors.newFixedThreadPool(1).submit(() -> calculatePrice(product));
        return submit;
    }

上面的submit方法,最終調用的是java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable<T>)

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

如果繼續追下去的話,execute方法,又是調用的java.util.concurrent.ThreadPoolExecutor#execute方法,創建一個線程來異步執行。將同步轉換成異步后,doSomethingElse方法,在getPriceAsync執行期間,就能並發執行了。

    public static void doSomethingElse() {
        System.out.println("do something else");
    }

    public static void testAsyncGetPrice() {
        Shop shop = new Shop("BestShop");
        long start = System.currentTimeMillis();
        Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
        doSomethingElse();
        try {
            Double price = futurePrice.get();
            System.out.printf("Price is %.2f\n", price);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        System.out.println("(AsyncGetPrice) Invocation returned after : " + (System.currentTimeMillis() - start) + " ms\n");
    }

    public static void main(String[] args) {
        testAsyncGetPrice();
    }

輸出結果:

do something else
Price is 201.69
(AsyncGetPrice) Invocation returned after : 1111 ms

 

二、同步轉換成異步的其它方式

CompletableFuture出現后,"同步調用"轉換成"異步調用"的方式,有了新的選擇:

public Future<Double> getPriceAsync1(String product) {
    CompletableFuture<Double> futurePrice = new CompletableFuture<Double>();
    new Thread(() -> {
        try {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        } catch (Exception e) {
            futurePrice.completeExceptionally(e);
        }
    }).start();
    return futurePrice;
}

public Future<Double> getPriceAsync2(String product) {
    return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

上面這2種方法效果等價,顯然第2種supplyAsync的寫法更簡潔。需要說明的是:CompletableFuture內部其實也是使用線程池來處理的,只不過這個線程池的類型默認是ForkJoinPool,這一點可以從java.util.concurrent.CompletableFuture#asyncPool源碼看出來:

    /**
     * Default executor -- ForkJoinPool.commonPool() unless it cannot
     * support parallelism.
     */
    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

  

三、CompletableFuture中使用自定義線程池

如果需要查詢報價的商家有很多,比如:6個,逐一同步調用getPrice方法,時長大約就是6個商家的總時長累加

    static List<Shop> shops = Arrays.asList(
            new Shop("1-shop"),
            new Shop("2-shop"),
            new Shop("3-shop"),
            new Shop("4-shop"),
            new Shop("5-shop"),
            new Shop("6-shop")
    );

    public static void main(String[] args) {
        testFindPrices();
    }

    public static List<String> findPrices(String product) {
        return shops.stream()
                .map(shop -> String.format("%s price is %.2f", shop.name, shop.getPrice(product)))
                .collect(Collectors.toList());
    }

輸出:

[1-shop price is 180.36, 2-shop price is 206.13, 3-shop price is 205.49, 4-shop price is 184.62, 5-shop price is 222.73, 6-shop price is 143.19]
do something else
(findPrices-Stream) Invocation returned after : 6102 ms

這顯然太慢了,要知道現代計算機都是多核cpu體系,很容易想到把stream換成parallelStream,可以充分發揮多核優勢:

    public static List<String> findPricesParallel(String product) {
        return shops.parallelStream()
                .map(shop -> String.format("%s price is %.2f", shop.name, shop.getPrice(product)))
                .collect(Collectors.toList());
    }

還是剛才的測試場景,這時輸出結果類似下面這樣:(注:測試機器為mac 4核筆記本)

[1-shop price is 137.42, 2-shop price is 168.93, 3-shop price is 182.89, 4-shop price is 154.60, 5-shop price is 192.70, 6-shop price is 179.06]
do something else
(findPrices-parallelStream) Invocation returned after : 2102 ms

比剛才好多了,耗時從6s縮短到2s,但仔細想一想:6個商家的getPrice處理,分攤到4個核上,還是有2個核會出現阻塞(即:平均1個核並行處理1個task,6-4=2,仍然有2個task要排隊)。

如果換成用CompletableFuture默認的ForkJoinPool呢,性能會不會好一些?

    public static List<String> findPricesFuture() {
        List<CompletableFuture<String>> priceFutures = shops.parallelStream()
                .map(shop -> CompletableFuture.supplyAsync(() ->
                        String.format("%s price is %.2f", shop.name, shop.getPrice("myPhone27"))))
                .collect(Collectors.toList());
        return priceFutures.parallelStream().map(CompletableFuture::join).collect(Collectors.toList());
    }


    public static void testFindPricesCompletableFuture() {
        long start = System.currentTimeMillis();
        System.out.printf(findPricesFuture().toString() + "\n");
        doSomethingElse();
        System.out.println("(findPrices-CompletableFuture) Invocation returned after : " + (System.currentTimeMillis() - start) + " ms\n");
    }

輸出結果(注:上面代碼中的parallelStream換成stream,下面的輸出結果也差不多)

[1-shop price is 168.57, 2-shop price is 159.43, 3-shop price is 200.08, 4-shop price is 165.64, 5-shop price is 195.11, 6-shop price is 206.83]
do something else
(findPrices-CompletableFuture) Invocation returned after : 2092 ms

從結果上看,使用CompletableFuture與使用僅使用parallelStream的耗時差不多,並沒有性能上的提升。原因在於默認的ForkJoinPool,其默認線程數也是跟CPU核數相關的。在這個場景中,我們至少要6個線程(即:shops.size()),才能讓6個商家的getPrice並發處理。按這個思路,我們可以自定義一個線程池,然后傳入supplyAsync方法中:

    private static final Executor executor =
            Executors.newFixedThreadPool(Math.min(shops.size(), 100),
                    new ThreadFactory() {
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r);
                            t.setDaemon(true);
                            return t;
                        }
                    });

    public static List<String> findPricesFutureWithExecutor() {
        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() ->
                        String.format("%s price is %.2f", shop.name, shop.getPrice("myPhone27")), executor))
                .collect(Collectors.toList());
        return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }

    public static void testFindPricesExecutor() {
        long start = System.currentTimeMillis();
        System.out.printf(findPricesFutureWithExecutor().toString() + "\n");
        doSomethingElse();
        System.out.println("(findPrices-FutureWithExecutor) Invocation returned after : " + (System.currentTimeMillis() - start) + " ms\n");
    }

    public static void main(String[] args) {
        testFindPricesExecutor();
    }

輸出結果如下:

[1-shop price is 177.26, 2-shop price is 227.09, 3-shop price is 179.98, 4-shop price is 127.19, 5-shop price is 208.93, 6-shop price is 229.91]
do something else
(findPrices-FutureWithExecutor) Invocation returned after : 1121 ms

從耗時上看,僅相當於單個商家getPrice的耗時,已經達到最佳效果。

 

四、多個異步操作組合

前面提到的商家報價場景,我們再加點料,引入“打折”功能。先把shop調整下:

package future;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;


public class Shop {

    public String name;

    private static Random random = new Random();

    public Shop(String name) {
        this.name = name;
    }

    private double calculatePrice(String product) {
        randomDelay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public static void randomDelay() {
        int delay = 500 + random.nextInt(2000);
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 查詢“(原始)價格”及"對應的折扣"
     *
     * @param product
     * @return
     */
    public String getPriceWithDiscount(String product) {
        double price = calculatePrice(product);
        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
        String result = String.format("%s:%.2f:%s", name, price, code);
        System.out.println(result);
        return result;
    }


}

主要有2處改動:

1 是delay方法引入了隨機數,模擬不同商家查詢價格時,有着不同的處理時間,顯得更真實。

2 是getPriceWithDiscount方法,返回的價格不再是1個double,而是類似下面這樣的字符串

1-shop:212.78:NONE
2-shop:182.22:DIAMOND
3-shop:148.91:PLATINUM
4-shop:203.78:SILVER
5-shop:152.75:DIAMOND
6-shop:212.43:NONE

同時包括了原始的價格,以及打折等級(無折扣、白銀等級、鑽石等級...之類),這里有一個Discount類,代碼如下:

package future;


import java.math.BigDecimal;

public class Discount {

    /**
     * 打折類型
     */
    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        /**
         * 折扣百分比
         */
        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }


    /**
     * 計算折扣后的價格
     * @param price
     * @param code
     * @return
     */
    private static double apply(double price, Code code) {
        Shop.randomDelay();
        return format(price * (100 - code.percentage) / 100);
    }

    private static double format(double d) {
        BigDecimal decimal = new BigDecimal(d);
        return decimal.setScale(2, BigDecimal.ROUND_HALF_DOWN).doubleValue();
    }

    /**
     * 應用折扣,輸出最后的處理結果 
     *
     * @param quote
     * @return
     */
    public static String applyDiscount(Quote quote) {
        return quote.shopName + " price is " + apply(quote.price, quote.discountCode);
    }
}

apply模擬了計算折扣價時,需要一定的耗時randomDelay(),而getPriceWithDiscount返回的字符串,還需要有1個Quota類專門解析其中的原始價格以及折扣等級

/**
 * 帶折扣的報價
 */
public class Quote {

    public final String shopName;
    public final double price;
    public final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code code) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = code;
    }

    /**
     * 解析價格結果
     *
     * @param s
     * @return
     */
    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }
}

引入折扣功能后,原來的“查詢商家價格”,可分解成3個步驟:

1. 先調用shop.getPriceWithDiscount 返回“原始價格及折扣等級”字符串

2. 解析1中返回的字符串,將price與discount信息提取出來,並最終封裝成Quota對象

3. 調用Discount的applyDiscount,返回最終打折后的價格信息

而且,上面的步驟,3依賴2的完成,2依賴1的完成,用標准寫出來的話,大致是下面這個樣子:

    public static List<String> findDiscountPrices() {
        return shops.stream()
                .map(shop -> shop.getPriceWithDiscount("myPhone27"))
                .map(Quote::parse)
                .map(Discount::applyDiscount)
                .collect(Collectors.toList());
    }

    public static void testFindDiscountPrices() {
        long start = System.currentTimeMillis();
        System.out.printf(findDiscountPrices().toString() + "\n");
        doSomethingElse();
        System.out.println("(findDiscountPrices-stream) Invocation returned after : " + (System.currentTimeMillis() - start) + " ms\n");
    }
    
    public static void main(String[] args) {
        testFindDiscountPrices();
    }

這是同步的調用方式,可想而知,最終耗時會很大:

1-shop:157.77:DIAMOND
2-shop:138.03:DIAMOND
3-shop:204.60:DIAMOND
4-shop:202.52:NONE
5-shop:155.14:GOLD
6-shop:224.15:SILVER
[1-shop price is 126.22, 2-shop price is 110.42, 3-shop price is 163.68, 4-shop price is 202.52, 5-shop price is 139.63, 6-shop price is 212.94]
do something else
(findDiscountPrices-stream) Invocation returned after : 16449 ms

使用CompletableFuture,可以把1-2-3 這3個步驟都轉換成異步,且保證相互之間的依賴關系,代碼如下:

    public static List<String> findDiscountPricesFuture() {
        List<CompletableFuture<String>> list = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPriceWithDiscount("myPhone27"), executor))
                .map(f -> f.thenApply(Quote::parse))
                .map(f -> f.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
                .collect(Collectors.toList());

        return list.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }

輸出結果如下:

從結果上看,確實已經是異步了(1個線程處理1個商家的getPrice及Discount計算),整體耗時也大幅下降。但是有一個細節問題,6個商家的最終結果(即:最后的[...]列表輸出),是等所有異步操作都執行完,1次性輸出的,這在實際應用中,意味着,最終買家能多快看到價格輸出,取決於最慢的那個商家,這是不能接受的,理想情況下,應該是哪個商家的服務快,能先計算出結果 ,就應該第1時間展示這家店的價格。

修正后的代碼如下:

    public static void findDiscountPricesFuture() {
        long start = System.currentTimeMillis();
        CompletableFuture[] futureArray = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPriceWithDiscount("myPhone27"), executor))
                .map(f -> f.thenApply(Quote::parse))
                .map(f -> f.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
                .map(f -> f.thenAccept(s -> System.out.println(s + " (done in " + (System.currentTimeMillis() - start) + " ms)")))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futureArray).join();
    }

解釋:主要是利用了CompletableFuture.allOf()方法,該方法會把數組結果,按完成時間快慢,快的先返回。

從運行效果上看,最終的報價輸出,不再是等6個商家全計算好才返回。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM