一、簡介
平常在頁面中我們會使用異步調用$.ajax()函數,如果是多個的話他會並行執行相互不影響,實際上Completable我理解也是和它類似,是java 8里面新出的異步實現類,CompletableFuture類實現了Future接口,CompletableFuture與Stream的設計都遵循了類似的設計模式:使用Lambda表達式以及流水線的思想,從這個角度可以說CompletableFuture與Future的關系類似於Stream與Collection的關系。
二、代碼
直接上代碼,運行之后可以看出CompletableFuture是調用的時候就開始執行,當后續代碼調到get的取值方法時,如果內部已經返回結果則直接拿到,如果還沒有返回將阻塞線程等待結果,可以設置超時時間避免長時間等待。
以下是模擬並行調用多個方法的場景,比如查詢頁可能會有多個條件選擇,這些條件需要后台數據相互之間有沒有聯系的場景,就不需要串行執行,異步執行可以節省大量時間
import org.joda.time.LocalDateTime; import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class AppTest { public void printlnConsole(String msg) { String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss"); System.out.println(String.format("[%s]%s", time, msg)); } /** * 多任務單次異步執行 */ @Test public void testManyFunAsync() { long start = System.nanoTime();//程序開始時間 try { int id = 1;//模擬一個參數,如學校Id printlnConsole("調用異步任務..."); //使用異步方式調用方法【調用時就會開始執行方法】 CompletableFuture futureClassCount = CompletableFuture.supplyAsync(() -> getClassCount(id)); CompletableFuture futureStudentCount = CompletableFuture.supplyAsync(() -> getStudentCount(id)); //do something 做了一些其他的事情超過了異步任務執行的時間 printlnConsole("做一些其他的事情..."); Thread.sleep(3000); printlnConsole("其他事情完成"); //下面獲取異步任務的結果,就會立即拿到返回值 printlnConsole("獲取異步任務結果..."); Object classCount = futureClassCount.get(); //Object classCount = futureClassCount.get(2, TimeUnit.SECONDS);//可以設置超時時間,超過這個時間時將不再等待,返回異常 Object studentCount = futureStudentCount.get(); //Object studentCount = futureStudentCount.get(2, TimeUnit.SECONDS); printlnConsole("異步任務結果獲取完成"); printlnConsole("ClassCount:" + classCount); printlnConsole("StudentCount:" + studentCount); } catch (Exception e) { e.printStackTrace(); } long end = System.nanoTime();//程序結束時間 long time = (end - start) / 1000000;//總耗時 System.out.println("運行時間:" + time); } public int getClassCount(int id) { try { Thread.sleep(2000); printlnConsole("getClassCount(" + id + ")執行完畢"); } catch (InterruptedException e) { e.printStackTrace(); } return 20; } public int getStudentCount(int id) { try { Thread.sleep(1000); printlnConsole("getStudentCount(" + id + ")執行完畢"); } catch (InterruptedException e) { e.printStackTrace(); } return 100; } }
anyOf()為任意一個子任務線程執行完畢后返回
allOf()為等待所有子任務線程全部執行完畢后返回
getNow()表示我需要立即拿到結果,如果當前的線程並未執行完成,則使用我傳入的值進行任務調用,參數為無法獲取結果時使用我傳入的值
get()獲取子線程運算的結果,會拋出檢查到的異常
join()獲取子線程運算的結果,不會拋出異常
package com.ysl; import org.joda.time.LocalDateTime; import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; public class AppTest { public void printlnConsole(String msg) { String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss"); System.out.println(String.format("[%s]%s", time, msg)); } /** * 並行執行等待全部結果或等待任意結果 */ @Test public void testAllOfAnyOf() { long start = System.nanoTime(); try { printlnConsole("調用異步任務..."); List<Integer> ids = Arrays.asList(1, 3, 5);//准備的請求參數 //創建異步方法數組 CompletableFuture[] futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getClassName(id))).toArray(size -> new CompletableFuture[size]); //指定該異步方法數組的子任務線程等待類型 CompletableFuture.anyOf(futures).join();//anyOf()為任意一個子任務線程執行完畢后返回 //CompletableFuture.allOf(futures).join();//allOf()為等待所有子任務線程全部執行完畢后返回 printlnConsole("做一些其他的事情..."); Thread.sleep(2000); printlnConsole("其他事情完成"); printlnConsole("獲取異步任務結果:"); for (CompletableFuture f : futures) { //Object obj = f.getNow(1);//getNow()表示我需要立即拿到結果,如果當前的線程並未執行完成,則使用我傳入的值進行任務調用,參數為無法獲取結果時使用我傳入的值 Object obj = f.get();//get()獲取子線程運算的結果,會拋出檢查到的異常 //Object obj = f.join();//join()獲取子線程運算的結果,不會拋出異常 printlnConsole(String.valueOf(obj)); } } catch (Exception e) { e.printStackTrace(); } long end = System.nanoTime(); long time = (end - start) / 1000000; System.out.println("運行時間:" + time); } public String getClassName(int id) { try { Thread.sleep(id * 1000); printlnConsole("getClassName(" + id + ")執行完畢"); } catch (InterruptedException e) { e.printStackTrace(); } return "taiyonghai-" + id; } }
下面是並行流的演示parallelStream也是java 8新特性
ids.stream()轉化為流.map()映射每個元素對應的結果.collect(Collectors.toList)把結果歸納為List;還有.filter()過濾元素.sorted()對元素進行排序.limit()獲取指定數量元素;也可以toArray(size -> new Class[size])轉化為數組
以下是模擬根據Id查詢學生名稱的場景,接收到的是一個集合又都是調用同一個方法獲取,就可以使用並行流同時異步請求等待返回結果
import org.joda.time.LocalDateTime; import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class AppTest { public void printlnConsole(String msg) { String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss"); System.out.println(String.format("[%s]%s", time, msg)); } /** * 單任務多次並行流執行 */ @Test public void testParallelStream() { long start = System.nanoTime(); try { printlnConsole("調用異步任務..."); List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);//准備的請求參數 //串行執行會等待每一個方法執行完畢后在繼續執行下一個 //List<String> names = ids.stream().map(id -> getStudentName(id)).collect(Collectors.toList()); //並行執行會同時調用多個方法待全部執行完畢后一起返回(parallelStream是非線程安全的,配合collect達到線程安全,后續驗證一下) List<String> names = ids.parallelStream().map(id -> getStudentName(id)).collect(Collectors.toList()); //無論stream()或者parallelStream()調用時均會阻斷線程執行 printlnConsole("做一些其他的事情..."); Thread.sleep(3000); printlnConsole("其他事情完成"); printlnConsole("獲取異步任務結果:"); names.forEach(item -> printlnConsole(item)); } catch (Exception e) { e.printStackTrace(); } long end = System.nanoTime(); long time = (end - start) / 1000000; System.out.println("運行時間:" + time); } public String getStudentName(int id) { try { Thread.sleep(2000); printlnConsole("getStudentName(" + id + ")執行完畢"); } catch (InterruptedException e) { e.printStackTrace(); } return "taiyonghai-" + id; } }
上面能看到並行流雖然是並行執行但等待結果是阻塞線程的,所以可以利用異步CompletableFuture配合串行流來實現
以下是采用串行流配合異步實現的並發處理
import org.joda.time.LocalDateTime; import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class AppTest { public void printlnConsole(String msg) { String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss"); System.out.println(String.format("[%s]%s", time, msg)); } /** * 單任務多次異步執行 */ @Test public void testOneFunAsync() { long start = System.nanoTime(); try { printlnConsole("調用異步任務..."); List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);//准備的請求參數 //ids.stream()轉化為流.map()映射每個元素對應的結果.collect()把結果歸納為List;還有.filter()過濾元素.sorted()對元素進行排序.limit()獲取指定數量元素; List<CompletableFuture<String>> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getStudentName(id))).collect(Collectors.toList()); //不用並行流parallelStream()調用時就不會阻斷線程執行 printlnConsole("做一些其他的事情..."); Thread.sleep(3000); printlnConsole("其他事情完成"); printlnConsole("獲取異步任務結果:"); futures.forEach(f -> { try { Object obj = f.get(); printlnConsole(String.valueOf(obj)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); }catch (Exception e){ e.printStackTrace(); } long end = System.nanoTime(); long time = (end - start) / 1000000; System.out.println("運行時間:" + time); } public String getStudentName(int id) { try { Thread.sleep(2000); printlnConsole("getStudentName(" + id + ")執行完畢"); } catch (InterruptedException e) { e.printStackTrace(); } return "taiyonghai-" + id; } }
當我的並行任務數量超過了我機器的核心數就會產生等待,我電腦是8核使用並行流執行數量就可以開8個子線程,當多余這個數量時剩下的就需要等待前面線程執行完再執行
當需要並行執行的任務數量大於核心數的時候,產生的等待是我們不想看到的,這時CompletableFuture就更加適用,它可以手動這只線程池大小,避免並行任務過多時的等待
我們將代碼做些修正
以下是源碼,這樣就可以提高對多任務並行處理的支持了
import org.joda.time.LocalDateTime; import org.junit.Test; import java.util.Arrays; import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; public class AppTest { public void printlnConsole(String msg) { String time = new LocalDateTime().toString("yyyy-MM-dd HH:mm:ss"); System.out.println(String.format("[%s]%s", time, msg)); } /** * 手動配置線程執行器的線程池大小 */ private final Executor myExecutor = Executors.newFixedThreadPool(20, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); //使用守護線程保證不會阻止程序的關停 t.setDaemon(true); return t; } }); /** * 單任務多次異步執行 */ @Test public void testOneFunAsync() { long start = System.nanoTime(); try { printlnConsole("調用異步任務..."); List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);//准備的請求參數 //ids.stream()轉化為流.map()映射每個元素對應的結果.collect()把結果歸納為List;還有.filter()過濾元素.sorted()對元素進行排序.limit()獲取指定數量元素; List<CompletableFuture<String>> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getStudentName(id), myExecutor)).collect(Collectors.toList()); //不用並行流parallelStream()調用時就不會阻斷線程執行 printlnConsole("做一些其他的事情..."); Thread.sleep(3000); printlnConsole("其他事情完成"); printlnConsole("獲取異步任務結果:"); futures.forEach(f -> { try { Object obj = f.get(); printlnConsole(String.valueOf(obj)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } long end = System.nanoTime(); long time = (end - start) / 1000000; System.out.println("運行時間:" + time); } public String getStudentName(int id) { try { Thread.sleep(2000); printlnConsole("getStudentName(" + id + ")執行完畢"); } catch (InterruptedException e) { e.printStackTrace(); } return "taiyonghai-" + id; } }
java 8的新特性也只做到了會用,很多深入的還不了解,還望指導謝謝,下面備份一下別人的總結我覺得挺有用的:
選擇正確的線程池大小
《Java並發編程實戰》中給出如下公式:
Number = NCpu * Ucpu * ( 1 + W/C)
Number : 線程數量
NCpu : 處理器核數
UCpu : 期望cpu利用率
W/C : 等待時間與計算時間比
我們這里:99%d的時間是等待商店響應 W/C = 99 ,cpu利用率期望 100% ,NCpu = 9,推斷出 number = 800。但是為了避免過多的線程搞死計算機,我們選擇商店數與計算值中較小的一個。
並行流與CompletableFuture
目前,我們對集合進行計算有兩種方式:1.並行流 2.CompletableFuture;
1、而CompletableFuture更加的靈活,我們可以配置線程池的大小確保整體的計算不會因為等待IO而發生阻塞。
書上給出的建議如下:如果是計算密集型的操作並且沒有IO推薦stream接口,因為實現簡單效率也高,如果所有的線程都是計算密集型的也就沒有必要創建比核數更多的線程。
2、反之,如果任務涉及到IO,網絡等操作:CompletableFuture靈活性更好,因為大部分線程處於等待狀態,需要讓他們更加忙碌,並且再邏輯中加入異常處理可以更有效的監控是什么原因觸發了等待。