0.概述
服務端編程的一個經典場景是在接收和處理客戶端請求時,為了避免對每一個請求都分配線程而帶來的資源開銷,服務一般會預先分配一個固定大小的線程池(比如Tomcat connector maxThreads),當客戶端請求到來時,從線程池里尋找空閑狀態的線程來處理請求,請求處理完畢后會回到線程池,繼續服務下一個請求。當線程池內的線程都處於繁忙狀態時,新來的請求需要排隊直到線程池內有可用的線程,或者當超出隊列容量后(Tomcat connector acceptCount屬性)請求被拒絕(connection refused error)。
為了提高服務的吞吐量,我們應當確保主線程盡快處理盡快返回,盡量使服務端的任務處理線程池始終有可分配的線程來處理新的客戶端請求。
當主線程執行一個任務時,如果該任務較耗時, 通常的做法是利用Future/Promise來異步化處理任務。從JDK1.5開始,J.U.C中提供了Future來代表一個異步操作。JDK1.8中則新增了lambda表達式和CompletableFuture, 可以幫助我們更好的用函數式編程風格來實現任務的異步處理。
1. Future
代碼例子:
ExecutorService executor = Executors.newFixedThreadPool(1); Future<String> future = executor.submit(() -> { // long running task return "task finish."; });
Future實在是太雞肋了,僅暴露了get/cancel/isDone/isCancelled方法。我們無法通過Future去手動結束一個任務,也無法非阻塞的去獲取Future的任務結果,因為future.get()方法是阻塞的。假設有下面這個場景,當前有兩個任務,后一個任務依賴了前一個任務的處理結果,這種場景也無法通過Future來實現異步流程任務處理。
2. CompletableFuture
CompletableFuture實現了Future和CompletionStage兩個接口,CompletionStage可以看做是一個異步任務執行過程的抽象。我們可以基於CompletableFuture方便的創建任務和鏈式處理多個任務。下面我們通過實例來介紹它的用法。
2.1 創建任務
可以使用runAsync方法新建一個線程來運行Runnable對象(無返回值)
CompletableFuture<Void> futureAsync = CompletableFuture.runAsync(() -> { // long running task without return value System.out.println("task finish."); });
也可以使用supplyAysnc方法新建線程來運行Supplier<T>對象(有返回值)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // long running task return "task result"; });
這里執行任務的線程來自於ForkJoinPool.commonPool() , 也可以自定義線程池
ExecutorService exector = Executors.newFixedThreadPool(5); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // long running task return "task result"; }, executor);
2.2 任務的異步處理
不論Future.get()方法還是CompletableFuture.get()方法都是阻塞的,為了獲取任務的結果同時不阻塞當前線程的執行,我們可以使用CompletionStage提供的方法結合callback來實現任務的異步處理。
2.2.1 使用callback基於特定任務的結果進行異步處理
程序中經常需要主線程創建新的線程來處理某一任務,然后基於任務的完成結果(返回值或者exception)來執行特定邏輯, 對於這種場景我們可以很方面的使用whenComplete或者whenCompleteAsync來注冊callback方法
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // long running task return "task result"; }); future.whenComplete((result, exception) -> { if (null == exception) { System.out.println("result from previous task: " + result); } });
對於任務執行中拋錯的情況:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // long running task throw new RuntimeException("error!"); }); future.whenComplete((result, exception) -> { if (null == exception) { System.out.println("result from previous task: " + result); } else { System.err.println("Exception thrown from previous task: " + exception.getMessage()); } });
也可以用exceptionally來顯示的處理錯誤:
CompletableFuture.supplyAsync(() -> { throw new IllegalArgumentException("error"); }).exceptionally(ex -> { System.out.println("Exception caught: " + ex.getMessage()); return ex.getMessage(); }).thenAccept(result -> { System.out.println("result: " + result); });
如果不需關心任務執行中是否有exception,則可以使用thenAccept方法, 需要注意的是如果執行中拋了exception, 則thenAccept里面的回調方法不會被執行
CompletableFuture.supplyAsync(() -> { // long running task return "task result"; }).thenAccept((result) -> { System.out.println("result from previous task: " + result); });
2.2.2 任務的鏈式處理
在應用中經常會遇到任務的pipeline處理,任務A執行完后觸發任務B,任務B執行完后觸發任務C,上一個任務的結果是下一個任務的輸入,對於這種場景,我們可以使用thenApply方法。
CompletableFuture.supplyAsync(() -> { // long running task return "task1"; }).thenApply(previousResult -> { return previousResult + " task2"; }).thenApply(previousResult -> { return previousResult + " task3"; }).thenAccept(previousResult -> { System.out.println(previousResult); }); output: task1 task2 task3
讓我們再看下面這個例子,某一應用需要先根據accountId從數據庫找到對應的賬號信息,然后對該賬號執行特定的處理邏輯:
CompletableFuture<Account> getAccount(String accountId) { return CompletableFuture.supplyAsync(() -> { return accountService.findAccount(accountId); }); } CompletableFuture<String> processAccount(Account account) { return CompletableFuture.supplyAsync(() -> { return accountService.updateAccount(account); }); }
如果使用thenApply方法,其返回的結果是一個嵌套的CompletableFuture對象:
CompletableFuture<CompletableFuture<String>> res = getAccount("123").thenApply(account -> { return processAccount(account); });
如果不希望結果是嵌套的CompletableFuture,我們可以使用thenCompose方法來替代thenApply
CompletableFuture<String> res = getAccount("123").thenCompose(account -> { return processAccount(account); });
2.2.3 多任務的並行處理
另一種常見的場景是將一個大的任務切分為數個子任務,並行處理所有子任務,當所有子任務都成功結束時再繼續處理后面的邏輯。以前的做法是利用CountDownLatch, 主線程構造countDownLatch對象,latch的大小為子任務的總數,每一個任務持有countDownLatch的引用,任務完成時對latch減1,主線程阻塞在countDownLatch.await方法上,當所有子任務都成功執行完后,latch=0, 主線程繼續執行。
int size = 5; CountDownLatch latch = new CountDownLatch(size); for (int i = 0; i < size; i++) { Executors.newFixedThreadPool(size).submit(() -> { try { // long running task System.out.println(Thread.currentThread().getName() + " " + latch.getCount()); } finally { latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } // continue... System.out.println(Thread.currentThread().getName());
這樣的代碼繁瑣且很容易出錯,我們可以用CompletableFuture.allOf來方便的處理上述場景。直接貼例子, 根據一組賬戶ID並行查找對應賬戶:
CompletableFuture<String> findAccount(String accountId) { return CompletableFuture.supplyAsync(() -> { // mock finding account from database return "account" + accountId; }); } public void batchProcess(List<String> accountIdList) { // 並行根據accountId查找對應account List<CompletableFuture<String>> accountFindingFutureList = accountIdList.stream().map(accountId -> findAccount(accountId)).collect(Collectors.toList()); // 使用allOf方法來表示所有的並行任務 CompletableFuture<Void> allFutures = CompletableFuture .allOf(accountFindingFutureList.toArray(new CompletableFuture[accountFindingFutureList.size()])); // 下面的方法可以幫助我們獲得所有子任務的處理結果 CompletableFuture<List<String>> finalResults = allFutures.thenApply(v -> { return accountFindingFutureList.stream().map(accountFindingFuture -> accountFindingFuture.join()) .collect(Collectors.toList()); }); }
如果后續邏輯沒有必要等待所有子任務全部結束,而是只要任一一個任務成功結束就可以繼續執行,我們可以使用CompletableFuture.anyOf方法:
CompletableFuture<Object> anyOfFutures = CompletableFuture.anyOf(taskFutureA, taskFutureB, taskFutureC);
假設三個任務中taskFutureA最先執行完畢並成功返回,則anyOfFutures里得到的是taskFutureA的執行結果.
3.展望
基於JDK1.8的lambda表達式和CompletableFuture, 我們可以寫出更具有函數式編程風格的代碼,可以更方便的實現任務的異步處理,只用很少的代碼便可以實現任務的異步pipeline和並行調用。在異步開發模型(nodeJs/Vert.x)越來越火的今天,我們就從今天開始使用lambda+CompletableFuture來改造我們的Java應用吧。