了不起的Java-CompletableFuture組合異步編程


在多任務程序中,我們比較熟悉的是分支-合並框架的並行計算,他的目的是將一個操作(比如巨大的List計算)切分為多個子操作,充分利用CPU的多核,甚至多個機器集群,並行執行這些子操作。

而CompletableFuture的目標是並發(執行多個操作),而非並行,是利用CPU的核,使其持續忙碌,達成最大吞吐,在並發進行中避免等待遠程服務的返回值,或者數據庫的長時查詢結果等耗時較長的操作,如果解決了這些問題,就能獲得最大的並發(通過避免阻塞)。
而分支-合並框架只是並行計算,是沒有阻塞的情況。

Future接口

Future接口用於對將來某個時刻發生的結果進行建模,它建模了一種異步計算,返回一個執行結果的引用,計算完成后,這個引用被返回給調用方,
在使用Future時,將耗時操作封裝在一個Callable接口對象中,提交給ExecutorService。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

我們在異步執行結果獲取時,設置了get過期時間2秒,否則,如果沒有正常的返回值,會阻塞線程,非常危險。

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureDemo {

    public static void main(String[] args) {
        
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<Integer> result = executor.submit(new Callable<Integer>() {
            public Integer call() throws Exception {
                Util.delay();
                return new Random().nextInt();
            }
        });
        doSomeThingElse();
        executor.shutdown();
        try {
            try {
                System.out.println("result:" + result.get(2,TimeUnit.SECONDS));
            } catch (TimeoutException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    private static void doSomeThingElse() {
         System.out.println("Do Some Thing Else." );   
        
    }
}

因為Future的局限性,如

  • 難異步合並
  • 等待 Future 集合中的所有任務都完成。
  • 僅等待 Future 集合中最快結束的任務完成,並返回它的結果。

一是我們沒有好的方法去獲取一個完成的任務;二是 Future.get 是阻塞方法,使用不當會造成線程的浪費。解決第一個問題可以用 CompletionService 解決,CompletionService 提供了一個 take() 阻塞方法,用以依次獲取所有已完成的任務。對於第二個問題,可以用 Google Guava 庫所提供的 ListeningExecutorService 和 ListenableFuture 來解決。這些都會在后面的介紹。

CompletableFuture組合異步

單一操作異步

我們的演示程序是查詢多個在線商店,把最佳的價格返回給消費者。首先先查詢一個產品的價格(即單一操作異步)。

先建一個同步模型商店類Shop,其中delay()是一個延時阻塞。

calculatePrice(String)同步輸出一個隨機價格,
getPriceAsync(String)是異步獲取隨機價格,依賴calculatePrice同步方法,返回類型是Future<Double>

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread( () -> {
                    double price = calculatePrice(product);
                    futurePrice.complete(price);
        }).start();
        return futurePrice;
    }

    public String getName() {
        return name;
    }

}

delay()阻塞,延遲一秒,注釋的代碼是隨機延遲0.5 - 2.5秒之間

    public static void delay() {
        int delay = 1000;
        //int delay = 500 + RANDOM.nextInt(2000);
        try {
            Thread.sleep(delay);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

調用單一查詢同步:

public class ShopSyncMain {
    public static void main(String[] args) {
        Shop shop = new Shop("BestShop");
        long start = System.nanoTime();
        double price = shop.getPrice("my favorite product");
        System.out.printf("Price is %.2f%n", price);
        long invocationTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Invocation returned after " + invocationTime 
                                                        + " msecs");
        // Do some more tasks
        doSomethingElse();
        long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
        System.out.println("Price returned after " + retrievalTime + " msecs");
      }

      private static void doSomethingElse() {
          System.out.println("Doing something else...");
      }
}

//結果:

Price is 123.26
Invocation returned after 1069 msecs
Doing something else...
Price returned after 1069 msecs

嗯,一秒出頭,這符合delay()阻塞一秒的預期

調用單一查詢異步操作:

public class ShopMain {

  public static void main(String[] args) {
    Shop shop = new Shop("BestShop");
    long start = System.nanoTime();
    Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
    long invocationTime = ((System.nanoTime() - start) / 1_000_000);
    System.out.println("Invocation returned after " + invocationTime 
                                                    + " msecs");
    // Do some more tasks, like querying other shops
    doSomethingElse();
    // while the price of the product is being calculated
    try {
        double price = futurePrice.get();
        System.out.printf("Price is %.2f%n", price);
    } catch (ExecutionException | InterruptedException e) {
        throw new RuntimeException(e);
    }
    long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
    System.out.println("Price returned after " + retrievalTime + " msecs");
  }

  private static void doSomethingElse() {
      System.out.println("Doing something else...");
  }

}

結果:
Invocation returned after 54 msecs
Doing something else...
Price is 123.26
Price returned after 1102 msecs
咦,異步反而時間多了,別忘了,大家都是執行一個方法,不存在多個並發(也沒有多個順序流操作),異步當然顯示不出優勢。

異步異常

new Thread方式新建線程可能會有個糟糕的情況:用於提示錯誤的異常被限制在當前線程內,最終會殺死線程,所以get是無法返回期望值,client調用方會被阻塞。
你當然可以重載get,要求超時過期時間,可以解決問題,但你也應該解決這個錯誤邏輯,避免觸發超時,因為超時又會觸發TimeoutException,client永遠不知道為啥產生了異常。
為了讓client獲知異常原因,需要對CompletableFuture對象使用completeExceptionally方法,將內部錯誤異常跑出。
改寫如下:

    public Future<Double> getPrice(String product) {
        new Thread(() -> {
            try {
                double price = calculatePrice(product);
                futurePrice.complete(price);
            } catch (Exception ex) {
                futurePrice.completeExceptionally(ex);
            }
        }).start();

使用工廠方法創建CompletableFuture對象

前面用門通過手動建立線程,通過調用同步方法創建CompletableFuture對象,還可以通過CompletableFuture的工廠方法來方便的創建對象,重寫如下:

    public Future<Double> getPrice(String product) {
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }

通過原生代碼,我們可以了解supplyAsync和runAsync的區別,runAsync沒有返回值,只是運行一個Runnable對象,supplyAsync有返回值U,參數是Supplier函數接口,或者可以自定義一個指定的執行線程池。

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }

    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }

    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

 

並發單一任務異步

前面是單一的操作異步,異步的內涵不能顯現,現在要查詢一大堆的商店(並發)的同一產品的價格,然后對比價格。
商店List:

private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll")/*,
new Shop("ShopEasy")*/);

使用三種方式處理多個商店的價格查詢:

//順序同步方式
    public List<String> findPricesSequential(String product) {
        return shops.stream()
                .map(shop -> shop.getName() + " price is " + shop.getPrice(product))
                .collect(Collectors.toList());
    }

    //並行流方式
    public List<String> findPricesParallel(String product) {
        return shops.parallelStream()
                .map(shop -> shop.getName() + " price is " + shop.getPrice(product))
                .collect(Collectors.toList());
    }

    //工廠方法異步
    public List<String> findPricesFuture(String product) {
        List<CompletableFuture<String>> priceFutures =
                shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is "
                        + shop.getPrice(product), executor))
                .collect(Collectors.toList());

        List<String> prices = priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
        return prices;
    }

     private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() {
     @Override
     public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });

CompletableFuture::join的作用是異步的開啟任務並發,並且將結果合並聚集。

結果:
sequential done in 4130 msecs
parallel done in 1091 msecs
composed CompletableFuture done in 1010 msecs

如果將list個數變成5個,結果是
同步必然是線性增長
sequential done in 5032 msecs
並行流受到PC內核數的限制(4個),所以一組並行只能指派最多4個線程,第二輪要多費時1秒多
parallel done in 2009 msecs
CompletableFuture異步方式能保持1秒的秘密在於線程池,我們定義了shops.size()大小的線程池,並且使用了守護線程。java提供了倆類的線程:用戶線程和守護線程(user thread and Daemon thread)。用戶線程是高優先級的線程。JVM虛擬機在結束一個用戶線程之前,會先等待該用戶線程完成它的task。
在另一方面,守護線程是低優先級的線程,它的作用僅僅是為用戶線程提供服務。正是由於守護線程是為用戶線程提供服務的,僅僅在用戶線程處於運行狀態時才需要守護線程。另外,一旦所有的用戶線程都運行完畢,那么守護線程是無法阻止JVM退出的

並發多任務異步

假設所有商店開始折扣服務,用5種折扣碼代表。

public class Discount {

    public enum Code {
        NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

        private final int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }
    }

    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }
    private static double apply(double price, Code code) {
        delay();
        return format(price * (100 - code.percentage) / 100);
    }
}

public class Quote {

    private final String shopName;
    private final double price;
    private final Discount.Code discountCode;

    public Quote(String shopName, double price, Discount.Code discountCode) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = discountCode;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.Code getDiscountCode() {
        return discountCode;
    }
}

比如,SILVER(5)代表95折, GOLD(10)代表90折,PLATINUM(15)代表85折。、
Quote類用於包裝輸出。同步的價格計算也做了修改,使用:來分割商店名、價格、折扣碼。折扣碼是隨機產生的。
新的商店類,getPrice同步方法輸出商店名、價格、折扣碼的待分割字符串。parse用於將字符串轉化為Quote實例。
打折延遲1秒。

public class Shop {

    private final String name;
    private final Random random;

    public Shop(String name) {
        this.name = name;
        random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2));
    }

    public String getPrice(String product) {
        double price = calculatePrice(product);
        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
        return name + ":" + price + ":" + code;
    }

    public double calculatePrice(String product) {
        delay();
        return format(random.nextDouble() * product.charAt(0) + product.charAt(1));
    }

    public String getName() {
        return name;
    }
}

運行三種服務

public class BestPriceFinder {

    private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
                                                   new Shop("LetsSaveBig"),
                                                   new Shop("MyFavoriteShop"),
                                                   new Shop("BuyItAll"),
                                                   new Shop("ShopEasy"));

    private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });

    //同步流
    public List<String> findPricesSequential(String product) {
        return shops.stream()
                .map(shop -> shop.getPrice(product))
                .map(Quote::parse)
                .map(Discount::applyDiscount)
                .collect(Collectors.toList());
    }
    //並行流
    public List<String> findPricesParallel(String product) {
        return shops.parallelStream()
                .map(shop -> shop.getPrice(product))
                .map(Quote::parse)
                .map(Discount::applyDiscount)
                .collect(Collectors.toList());
    }

    //CompletableFuture異步
    public List<String> findPricesFuture(String product) {
        List<CompletableFuture<String>> priceFutures = findPricesStream(product)
                .collect(Collectors.<CompletableFuture<String>>toList());

        return priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

    public Stream<CompletableFuture<String>> findPricesStream(String product) {
        return shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
    }

    public void printPricesStream(String product) {
        long start = System.nanoTime();
        CompletableFuture[] futures = findPricesStream(product)
                .map(f -> f.thenAccept(s -> System.out.println(s + " (done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futures).join();
        System.out.println("All shops have now responded in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
    }

}
//結果
sequential done in 10176 msecs
parallel done in 4010 msecs
composed CompletableFuture done in 2016 msecs

 

  • 同步的10多秒是在5個list元素時獲得的,每個一次詢價,一次打折延遲,共2秒。
  • 並行流一輪需要2秒多(第一輪詢價4個+1個共兩個輪次,2秒,第二輪打折4個+1個)。
  • 異步方式第一輪和第二輪都是1秒多,所以2秒多。

這里的三步需要特別說明,這也是並發多任務的核心所在,並發是5條記錄並發,多任務是

  1. 詢價任務(耗時1秒)
  2. 解析字符換成Quote實例
  3. 計算折扣

這三個子任務

  • .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
  • .map(future -> future.thenApply(Quote::parse))
  • .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));

第一行是用異步工廠執行同步方法getPrice,返回字符串,涉及線程池;
第二行thenApply是將字符串轉成Quote實例,這個是同步內存處理,不涉及線程池;
thenapply()是接受一個Function<? super T,? extends U> fn參數用來轉換CompletableFuture,相當於流的map操作,返回的是非CompletableFuture類型,它的功能相當於將CompletableFuture<T>轉換成CompletableFuture<U>.
在這里是把CompletableFuture<String>轉成CompletableFuture<Quote>
第三行也是異步處理,也涉及線程池,獲得折扣后的價格。thenCompose()在異步操作完成的時候對異步操作的結果進行一些操作,Function<? super T, ? extends CompletionStage<U>> fn參數,並且仍然返回CompletableFuture類型,相當於flatMap,用來連接兩個CompletableFuture
如果加入一個不相互依賴的Future對象進行整合,比如需要計算匯率(不需要計算折扣,他們之間無依賴),可以試用thenCombine方法,這里有4種實現,最后一種比較優。

public class ExchangeService {

    public enum Money {
        USD(1.0), EUR(1.35387), GBP(1.69715), CAD(.92106), MXN(.07683);

        private final double rate;

        Money(double rate) {
            this.rate = rate;
        }
    }

    public static double getRate(Money source, Money destination) {
        return getRateWithDelay(source, destination);
    }

    private static double getRateWithDelay(Money source, Money destination) {
        delay();
        return destination.rate / source.rate;
    }

}

public List<String> findPricesInUSD(String product) {
        List<CompletableFuture<Double>> priceFutures = new ArrayList<>();
        for (Shop shop : shops) {
            // Start of Listing 10.20.
            // Only the type of futurePriceInUSD has been changed to
            // CompletableFuture so that it is compatible with the
            // CompletableFuture::join operation below.
            CompletableFuture<Double> futurePriceInUSD = 
                CompletableFuture.supplyAsync(() -> shop.getPrice(product))
                .thenCombine(
                    CompletableFuture.supplyAsync(
                        () ->  ExchangeService.getRate(Money.EUR, Money.USD)),
                    (price, rate) -> price * rate
                );
            priceFutures.add(futurePriceInUSD);
        }
        // Drawback: The shop is not accessible anymore outside the loop,
        // so the getName() call below has been commented out.
        List<String> prices = priceFutures
                .stream()
                .map(CompletableFuture::join)
                .map(price -> /*shop.getName() +*/ " price is " + price)
                .collect(Collectors.toList());
        return prices;
    }

    public List<String> findPricesInUSDJava7(String product) {
        ExecutorService executor = Executors.newCachedThreadPool();
        List<Future<Double>> priceFutures = new ArrayList<>();
        for (Shop shop : shops) {
            final Future<Double> futureRate = executor.submit(new Callable<Double>() { 
                public Double call() {
                    return ExchangeService.getRate(Money.EUR, Money.USD);
                }
            });
            Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() { 
                public Double call() {
                    try {
                        double priceInEUR = shop.getPrice(product);
                        return priceInEUR * futureRate.get();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e.getMessage(), e);
                    }
                }
            });
            priceFutures.add(futurePriceInUSD);
        }
        List<String> prices = new ArrayList<>();
        for (Future<Double> priceFuture : priceFutures) {
            try {
                prices.add(/*shop.getName() +*/ " price is " + priceFuture.get());
            }
            catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        return prices;
    }

    public List<String> findPricesInUSD2(String product) {
        List<CompletableFuture<String>> priceFutures = new ArrayList<>();
        for (Shop shop : shops) {
            // Here, an extra operation has been added so that the shop name
            // is retrieved within the loop. As a result, we now deal with
            // CompletableFuture<String> instances.
            CompletableFuture<String> futurePriceInUSD = 
                CompletableFuture.supplyAsync(() -> shop.getPrice(product))
                .thenCombine(
                    CompletableFuture.supplyAsync(
                        () -> ExchangeService.getRate(Money.EUR, Money.USD)),
                    (price, rate) -> price * rate
                ).thenApply(price -> shop.getName() + " price is " + price);
            priceFutures.add(futurePriceInUSD);
        }
        List<String> prices = priceFutures
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
        return prices;
    }

    public List<String> findPricesInUSD3(String product) {
        // Here, the for loop has been replaced by a mapping function...
        Stream<CompletableFuture<String>> priceFuturesStream = shops
            .stream()
            .map(shop -> CompletableFuture
                .supplyAsync(() -> shop.getPrice(product))
                .thenCombine(
                    CompletableFuture.supplyAsync(() -> ExchangeService.getRate(Money.EUR, Money.USD)),
                    (price, rate) -> price * rate)
                .thenApply(price -> shop.getName() + " price is " + price));
        // However, we should gather the CompletableFutures into a List so that the asynchronous
        // operations are triggered before being "joined."
        List<CompletableFuture<String>> priceFutures = priceFuturesStream.collect(Collectors.toList());
        List<String> prices = priceFutures
            .stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
        return prices;
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM