Java8函數之旅 (八) - 組合式異步編程


前言

隨着多核處理器的出現,如何輕松高效的進行異步編程變得愈發重要,我們看看在java8之前,使用java語言完成異步編程有哪些方案。

JAVA8之前的異步編程

  • 繼承Thead類,重寫run方法
  • 實現runable接口,實現run方法
  • 匿名內部類編寫thread或者實現runable的類,當然在java8中可以用lambda表達式簡化
  • 使用futureTask進行附帶返回值的異步編程
  • 使用線程池和Future來實現異步編程
  • spring框架下的@async獲得異步編程支持

使用線程池與future來實現異步編程

實現方式可謂是多種多樣,這里我們使用線程池和future來實現異步編程,借着這個例子來講述java8的組合式異步編程有着怎樣的優勢

        //構造線程池
        ExecutorService pool = Executors.newCachedThreadPool();
        try {
            //構造future結果,doSomethingA十分耗時,因此采用異步
            Future<Integer> future = pool.submit(() -> doSomethingA());
            //做一些別的事情
            doSomethingB();
            //從future中獲得結果,設置超時時間,超過了就拋異常
            Integer result = future.get(10, TimeUnit.SECONDS);
            //打印結果
            System.out.printf("the async result is : %d", result);
            //異常捕獲
        } catch (InterruptedException e) {
            System.out.println("任務計算拋出了一個異常!");
        } catch (ExecutionException e) {
            System.out.println("線程在等待的過程中被中斷了!");
        } catch (TimeoutException e) {
            System.out.println("future對象等待時間超時了!");
        }
    }

然而這樣的異步編程方式僅僅能滿足基本的需要,稍微復雜的一些異步處理Future接口似乎就有點束手無策了,例如

  • 將兩個異步計算合並為一個——這兩個異步計算之間相互獨立,同時第二個又依賴於第
    一個的結果。
  • 等待 Future 集合中的所有任務都完成。
  • 僅等待 Future 集合中最快結束的任務完成(有可能因為它們試圖通過不同的方式計算同
    一個值) ,並返回它的結果。
  • 通過編程方式完成一個 Future 任務的執行(即以手工設定異步操作結果的方式) 。
  • 應對 Future 的完成事件(即當 Future 的完成事件發生時會收到通知,並能使用 Future計算的結果進行下一步的操作,不只是簡單地阻塞等待操作的結果)

這種感覺其實就很像沒有stream之前的collections的操作感覺一樣,同樣的,對於future,java8提供了它的函數式升級版本CompletableFuture,從名字就可以看出來這絕對是future的升級版。

JAVA8中的組合式異步編程

使用CompletableFuture進行異步編程

事物的發展往往都是由簡單->復雜->簡單,這里我們同樣遵循這樣的規律,循序漸進。
下面的例子摘取《java8實戰》的異步編程章節,並做了簡化。
我們假設現在我們有一項查詢商品價格的服務十分耗時,所以毫無例外的我們想讓查詢最佳價格的服務以異步的形式執行。
最直接的方式是直接構建一個異步查詢商品價格的api,並且返回,為了演示需要,編寫一個線程等待一秒的方法來模擬長時間的請求。


    public double getPrice(String product) {
        return calculatePrice(product);
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }
    
    public static void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

現在getPrice這方法是一個同步的方法,該方法在經過1秒的延遲之后會返回給我們一個商品的價格(這里只是簡單的根據名字構造了一個隨機數)

我們使用completFuture將getPrice轉化為異步方法,如下

    public Future<Double> getAsyncPrice(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            double price = calculatePrice(product);
            futurePrice.complete(price);
        }).start();;
        return futurePrice;
    }

這里構造一個completableFuture對象,並另起一個異步線程,將異步計算的結果使用futurePrice.complete來接受,無需等待直接返回future結果
調用類使用Integer result = future.get(10, TimeUnit.SECONDS)來接受返回的結果,如果等待超時則拋出異常。
另外,如果異步線程發生異常,並且在排查問題的時候想要知道具體是什么原因導致的,
可以在getAsyncPrice方法中使用completeExcepitonally來得到異常信息並且結束這次異步任務,代碼如下

public Future<Double> getPriceAsync(String product) {
    public Future<Double> getPriceAsync(String product) {
        CompletableFuture<Double> futurePrice = new CompletableFuture<>();
        new Thread(() -> {
            try {
                double price = calculatePrice(product);
                futurePrice.complete(price);
            } catch (Exception ex) {
                futurePrice.completeExceptionally(ex);
            }
        }).start();
        return futurePrice;
    }

這樣,基本的功能就實現了。

使用工廠類簡化異步操作

也許你看到上面的代碼,會說:"我暈,你這寫法比原來還復雜哦,而且我也沒看出啥區別啊。",是的,上文的寫法可以算是原生態的寫法了,目的為為下面的知識做一個簡單的鋪墊。
事實上,CompleteFuture本身提供了大量的工廠方法來供我們十分方便的實現一個異步編程,他封裝了前篇一律的異常與結果接收,你只需要編寫真正的異步邏輯部分就可以了,同時借住於lambda表達式,可以更進一步。

supplyAsync 方法接受一個生產者(Supplier)作為參數,返回一個 CompletableFuture
對象, 該對象完成異步執行后會讀取調用生產者方法的返回值。 生產者方法會交由 ForkJoinPool
池中的某個執行線程(Executor)運行,但是你也可以使用 supplyAsync 方法的重載版本,傳
遞第二個參數指定不同的執行線程執行生產者方法。

於是上文的例子可以改寫如下

    public Future<Double> getPriceAsync(String product) {
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }

是不是簡潔了許多呢?

可現在還有問題,這里我們成功的編寫了一個十分簡潔的異步方法,可實際的情況中,我們所能調用的API大部分都是同步的,因此下面將介紹如何使用異步的方法去操作這些同步API。

使用流異步操作同步API

我們現在有這么一個需求,它接受產品名作為參數,返回一個字符串列表,
這個字符串列表中包括商店的名稱、該商店中指定商品的價格,商店集合以及接口設計如下。

List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"));

public List<String> findPrices(String product);

使用同步的方法實現

這樣的集合變換使用stream流來操作十分容易,代碼如下

    public List<String> findPrices(String product) {
        return shops.stream()
                .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
                .collect(toList());
    }

stream流將 shop映射為了shop的名稱以及該shop中商品的價格的字符串,並使用收集器進行收集。

使用異步的方法實現

事實上,我們完全可以使用流將shop映射成CompletableFuture對象,就好像在操作集合一樣,代碼如下

    List<CompletableFuture<String>> priceFutures = shops.stream()
            .map(shop -> CompletableFuture
                    .supplyAsync(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))))
            .collect(toList());

使用這種方式,你會得到一個,List<CompletableFuture >,列表中的每個CompletableFuture 對象在計算完成后都包含商店的String類型的名稱。但是,由於你用CompletableFutures 實現的findPrices方法要求返回一個List ,你需要等待所有的future 執行完畢,將其包含的值抽取出來,填充到列表中才能返回。

為了實現這個效果,你可以向最初的 List<CompletableFuture > 施加第二個map 操作,對 List 中的所有future對象執行join操作,一個接一個地等待它們運行結束。注意CompletableFuture類中的join方法和Future接口中的get有相同的含義,並且也聲明在Future 接口中,它們唯一的不同是join不會拋出任何檢測到的異常。使用它你不再需要使用try / catch 語句塊讓你傳遞給第二個map方法的Lambda表達式變得過於臃腫。所有這些整合在一起,你就可以重新實現 findPrices 了,具體代碼如下

    public List<String> findPrices(String product) {
        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(
                    shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product)))
                .collect(Collectors.toList());
                
        return priceFutures.stream()
                .map(CompletableFuture::join)
                .collect(toList());
    }

以上的代碼你可能會疑惑,為什么不直接按照shop->completableFuture->join->collect
的方式進行流處理呢?那是因為join這一步本身是阻塞的,對於流操作來說,前一個shop沒有處理完之前,是不會處理下一個shop的,所以對於每一個shop,處理到join這一步的時候就會阻塞住等待1秒,這樣的話,這個流水線本身就會變回阻塞的了。

而上文的編寫方法可以看出 shop->completableFuture->collect 這個操作本身是非阻塞的,順利的將所有的請求都發出去了,隨后再使用join來完成結果的收集。

使用線程池來管控異步方法

前面在介紹工廠方法時提到,可以選擇第二參數放入一個線程池來進行管控。

   private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });

接着在supplyAsync中使用該線程池即可,代碼如下

CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +
shop.getPrice(product), executor);

進階的異步流操作

既然我們已經將異步操作與流相結合了,因此很容易的就會想到對於異步流來說,應該有會有類似於集合流的一些非常好用的API吧?事實上,JAVA8的確為我們提供了這些API。

構造同步和異步操作

如同集合流操作一樣,異步流也可以提前安排一系列的任務,然后讓異步任務有條不紊的按照這個順序去執行。

  • 同步任務
    使用future.thenApply(Function)來實現,該方法接受一個Function對象
    你可以規划這樣的任務 任務A(異步)->任務B(同步),語法可能是這樣的
    stream()
    .map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A
    .map(future->future.thenApply(任務B)//執行同步的任務B
    .collect
  • 異步任務
    與同步幾乎一樣,方法變為future.thenCompose(Function)
    你可以規划這樣的任務 任務A(異步)->任務B(同步)->任務C(異步),語法可能是這樣的
    stream()
    .map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A
    .map(future->future.thenApply(任務B)//執行同步的任務B
    .map(future->future.thenCompose(任務C))//再異步執行任務C
    .collect

將兩個 CompletableFuture 對象整合起來,無論它們是否存在依賴

使用thenCombine來完成,類似任務A與任務B,A是查詢價格,B是查詢匯率,這兩個任務之間本身沒有關聯關系,所以可以同時發起,但你最后需要計算價格乘以匯率,因此在這兩個任務完成之后需要對他們的結果進行合並,代碼如下

        Future<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))//任務A
                .thenCombine(
                     CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)), //任務B
                    (price, rate) -> price * rate); //任務A與任務B的合並操作

注意這里的任務A與任務B是異步的,但他們的合並操作是同步的,如果想要合並操作也是異步的,使用future.thenCombineAsync的異步方法版本。

對結果進行處理

使用thenAccept(Consumer)
以上都是對結果進行一些映射,你現在要對結果只進行處理,說白了就是前面的都是Function,現在要換成consumer,並且參數不再是異步任務,而是任務的結果值,舉個例子,上文的任務A(異步)->任務B(同步)->任務C(異步) 的操作現在想到對他們的操作結果進行打印
就可以使用thenAccept(Consumer)

    stream()
    .map(xxx->supplayAsync(()->任務A)) //這一步已經異步的映射成了任務A
    .map(future->future.thenApply(任務B)//執行同步的任務B
    .map(future->future.thenCompose(任務C))//再異步執行任務C
    .map(future->future.thenAccept(System.out::println))//將結果打印
    .collect

使用allOf與anyOf對結果進行處理

需要注意的是在執行了Accpet方法之后,你得到的是一個 CompletableFuture 流對象
你可以對這些流對象進行類似及早求值的操作,例如這條查詢4個商家的價格服務只要有一個給出了返回結果就結束這次異步流。

CompletableFuture[] futures = findPricesStream("myPhone")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.anyOf(futures).join();

allOf 工廠方法接收一個由 CompletableFuture 構成的數組,數組中的所有 Completable-Future 對象執行完成之后,它返回一個 CompletableFuture 對象。這意味着,如果你需要等待最初 Stream 中的所有 CompletableFuture 對象執行完畢,對 allOf 方法返回的CompletableFuture 執行 join 操作是個不錯的主意。這個方法對“最佳價格查詢器”應用也是有用的,因為你的用戶可能會困惑是否后面還有一些價格沒有返回,使用這個方法,你可以在執行完畢之后打印輸出一條消息“All shops returned results or timed out” 。然而在另一些場景中,你可能希望只要 CompletableFuture 對象數組中有任何一個執行完畢就不再等待,比如,你正在查詢兩個匯率服務器,任何一個返回了結果都能滿足你的需求。在這種情況下,你可以使用一個類似的工廠方法 anyOf 。該方法接收一個 CompletableFuture 對象構成的數組, 返回由第一個執行完畢的 CompletableFuture 對象的返回值構成的 Completable-Future

總結

本文是對Java8實戰中異步編程章節的一些整理和匯總,介紹了利用新增的completableFuture將異步任務與流操作集合起來實現組合式異步編程,利用工廠方法與函數接口可以大大的簡化代碼,同時提高代碼的可閱讀性,想要查看詳細,可以自行翻閱該書。

posted @ 2018-01-01 17:30  祈求者-  閱讀( 2889)  評論( 0編輯  收藏


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM