本次內容主要講Fork-Join、CountDownLatch、CyclicBarrier以及Callable、Future和FutureTask,最后再手寫一個自己的FutureTask,絕對干貨滿滿!
1、Fork-Join
1.1 什么是Fork-Join
Java多線程的開發可以我們自己啟用多線程,線程池,還可以使用forkjoin。forkjoin可以讓我們不去了解諸如Thread、Runnable等相關的知識,只要遵循forkjoin的開發模式,就可以寫出很好的多線程並發程序。
forkjoin采用的是分而治之。分而治之思想是:將一個難以直接解決的大問題,分割成一些規模較小的相同問題,以便各個擊破,分而治之。分而治之的策略是:對於一個規模為n的問題,若該問題可以容易地解決(比如說規模n較小)則直接解決,否則將其分解為m個規模較小的子問題,這些子問題互相獨立且與原問題形式相同(子問題相互之間有聯系就會變為動態規范算法),遞歸地解這些子問題,然后將各子問題的解合並得到原問題的解,這種算法設計策略叫做分治法。用一張圖來表示forkjoin原理。
我們可以了解一下計算機的十大經典算法:快速排序、堆排序、歸並排序 、二分查找、BFPRT(線性查找)、DFS(深度優先搜索)、BFS(廣度優先搜索)、Dijkstra、動態規划、朴素貝葉斯分類。其中有哪一些用到的是分而治之呢?有3個,分別是快速排序、歸並排序和二分查找。
歸並排序是建立在歸並操作上的一種有效的排序算法。該算法是采用分治法的一個非常典型的應用。將已有序的子序列合並,得到完全有序的序列;即先使每個子序列有序,再使子序列段間有序。若將兩個有序表合並成一個有序表,稱為2路歸並,與之對應的還有多路歸並。對於給定的一組數據,利用遞歸與分治技術將數據序列划分成為越來越小的半子表,在對半子表排序后,再用遞歸方法將排好序的半子表合並成為越來越大的有序序列。為了提升性能,有時我們在半子表的個數小於某個數(比如15)的情況下,對半子表的排序采用其他排序算法,比如插入排序。下面演示一下歸並排序的過程。
1.2 歸並排序(升序)示例
先將數組划分為左右2個子表:
然后繼續對左右2個子表進行拆分:
對拆分好的4個子表進行排序:
對有序子表進行比較合並:
對合並后的子表繼續比較合並:
第二次合並后,數組呈有序排列。
1.3 Fork-Join工作竊取
工作竊取是指當前線程的Task已經全被執行完畢,則自動取到其他線程的Task隊列中取出Task繼續執行。ForkJoinPool中維護着多個線程在不斷地執行Task,每個線程除了執行自己職務內的Task之外,還會根據自己工作線程的閑置情況去獲取其他繁忙的工作線程的Task,如此一來就能能夠減少線程阻塞或是閑置的時間,提高CPU利用率。用一張圖進行說明。
1.3 Fork-Join使用
Fork-Join使用兩個類來完成以上兩件事情:ForkJoinTask和ForkJoinPool。我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork和join的操作機制,通常我們不直接繼承ForkjoinTask類,只需要直接繼承其子類。
(1)RecursiveAction,用於沒有返回結果的任務
(2)RecursiveTask,用於有返回值的任務
task要通過ForkJoinPool來執行,使用submit 或 invoke 提交,兩者的區別是:invoke是同步執行,調用之后需要等待任務完成,才能執行后面的代碼;submit是異步執行。join()和get方法當任務完成的時候返回計算結果。調用get/join方法的時候會阻塞。還是用一個圖來說明forkjoin的工作流程。
在我們自己實現的compute方法里,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務。如果不足夠小,就必須分割成兩個子任務,每個子任務在調用invokeAll方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成孫任務,如果不需要繼續分割,則執行當前子任務並返回結果。使用join方法會等待子任務執行完並得到其結果。
1.4 Fork-Join VS 單線程
假設有一個業務場景,數據庫中有編號為0到1千萬的會員信息,要統計所有會員的余額總和。為了對比結果的一致性,用戶的余額不用隨機數表示,就用編號代表用戶的余額。現在的做法是每次從數據庫查詢出5000條數據進行統計,直到所有數據統計完成,進行匯總。對比看看單線程和Fork-Join的差異。
先看單線程場景:
public class SingleThreadSumNumber { /** * 每次查詢5000條進行統計 */ private static final int THRESHOLD = 5000; /** * 最小值 */ private static final int MIN = 0; /** * 最大值 */ private static final int MAX = 10000000; public void sumNumber() { long sum = 0; long startTime = System.currentTimeMillis(); int start = MIN; int end = MIN + THRESHOLD; boolean isFirstTime = true; while (end <= MAX) { sum = sum + batchSum(start, end); if (isFirstTime) { start = start + THRESHOLD + 1; isFirstTime = false; } else { start = start + THRESHOLD; } end = end + THRESHOLD; } System.out.println("The result is " + sum + " spend time:" + (System.currentTimeMillis() - startTime) + "ms"); } /** * 統計每次查詢出來的余額總和 * @param start * @param end * @return */ public long batchSum(int start, int end) { long sum = 0; try { Thread.sleep(15);//休眠15毫秒模擬查詢業務 } catch (InterruptedException e) { e.printStackTrace(); } for (int i = start; i <= end; i++) { sum += i; } return sum; } public static void main(String[] args) { SingleThreadSumNumber thread = new SingleThreadSumNumber(); thread.sumNumber(); } }
運行程序輸出以下結果:
余額總和為50000005000000,花費了30119毫秒,下面使用forkjoin來進行統計:
1 import java.util.concurrent.ForkJoinPool; 2 import java.util.concurrent.RecursiveTask; 3 4 public class ForkJoinDemo { 5 /** 6 * 門限值,如果任務門限低於此值,則進行計算 7 */ 8 private static final int THRESHOLD = 5000; 9 10 /** 11 * 最小值 12 */ 13 private static final int MIN = 0; 14 15 /** 16 * 最大值 17 */ 18 private static final int MAX = 10000000; 19 20 private static class SumNumberTask extends RecursiveTask<Long> { 21 private int start; 22 private int end; 23 24 public SumNumberTask(int start, int end) { 25 this.start = start; 26 this.end = end; 27 } 28 29 @Override 30 protected Long compute() { 31 if (end - start < THRESHOLD) { 32 return sumBatch(start, end); 33 } else { 34 int mid = (start + end) / 2; 35 SumNumberTask left = new SumNumberTask(start, mid); 36 SumNumberTask right = new SumNumberTask(mid + 1, end); 37 invokeAll(left, right); 38 long leftResult = left.join(); 39 long rightResult = right.join(); 40 return leftResult + rightResult; 41 } 42 } 43 } 44 45 public void sumNumber() { 46 ForkJoinPool pool = new ForkJoinPool(); 47 long start = System.currentTimeMillis(); 48 int recordMin = MIN; 49 int recordMax = MAX; 50 SumNumberTask sumTask = new SumNumberTask(recordMin, recordMax); 51 pool.invoke(sumTask); 52 System.out.println("Task is Running....."); 53 Long result = sumTask.join(); 54 System.out.println("The result is " + result + " spend time:" 55 + (System.currentTimeMillis() - start) + "ms"); 56 } 57 58 /** 59 * 統計每次任務的總和 60 * @param fromId 61 * @param toId 62 * @return 63 */ 64 public static long sumBatch(int fromId, int toId) { 65 long sum = 0; 66 try { 67 Thread.sleep(15);//休眠15毫秒模擬查詢業務 68 } catch (InterruptedException e) { 69 e.printStackTrace(); 70 } 71 for (int i = fromId; i <= toId; i++) { 72 sum += i; 73 } 74 return sum; 75 } 76 77 public static void main(String[] args) { 78 ForkJoinDemo forkJoinDemo = new ForkJoinDemo(); 79 forkJoinDemo.sumNumber(); 80 } 81 }
輸出結果:
余額總和為50000005000000,和使用單線程統計時一致,使用forkjoin達到了同樣的目的,但是只花費了4078毫秒,性能提升了7倍多。為了使性能有進一步提升,我們可以在第44行指定並發數量。不傳參情況下,默認並發量是當前服務器的邏輯CPU個數。我們把並發量調整成64,即ForkJoinPool pool = new ForkJoinPool(16 * 4),執行程序,輸出結果為:
統計結果一致,花費了567毫秒,比起單線程統計,性能提升了53倍之多,由此可見forkjoin的並發威力。
2、CountDownLatch
2.1 什么是CountDownLatch
JDK對CountDownLatch的解釋是:一種同步輔助器,它允許一個或多個線程等待,直到在其他線程中執行的一組操作完成為止。舉個例子來理解CountDownLatch:隔壁寢室的老王今天要參加學校運動會的400米決賽,跟小王一起爭奪冠軍的還有另外5個人,不管這6位選手的內心多激動多澎湃,也要等裁判的發令槍響了之后才能起跑,裁判不發出指令,選手就只能在起跑線等待,這就是CountDownLatch的作用。但是實際場景並不只有一個發令裁判,參加過學校運動會的同學都知道,還可能需要若干個裁判進行手動計時,要等所有的裁判都就位后,發令槍一響,運動員才能起跑。假設有3個計時裁判,一個發令裁判,用一個圖來說明。
在比賽開始前,發令裁判會用洪荒之力吼一聲,各~就~各~位,此時發令裁判會用炯炯有神的目光和3位計時裁判交流,3位裁判分別點頭示意已經准備好了,此時發令裁判會再次大吼一聲,預備~~~跑!!!此時憋了許久的6位運動員飛奔出去,當然老王遙遙領先,畢竟女神給他說了跑第一名的話晚上有獎勵。發令裁判的任務完成,不用繼續執行下去,而3個計時裁判繼續工作,對6位選手的成績進行一個記錄。
2.1 CountDownLatch實戰
用一段程序來模擬老王參加運動會400米決賽的場景。
import java.util.concurrent.CountDownLatch; public class CountDownLatchDemo { /** * 運動員在計時裁判和發令裁判就位后才能起跑 */ static CountDownLatch sportsManLatch = new CountDownLatch(4); /** * 發令裁判在3個計時裁判准備好之后才能發令 */ static CountDownLatch orderRefereeLatch = new CountDownLatch(3); /** * 計時裁判 */ static class TimeReferee implements Runnable { private int no; public TimeReferee(int no) { this.no = no; } @Override public void run() { System.out.println(no + "號計時裁判就位"); orderRefereeLatch.countDown(); sportsManLatch.countDown(); } } /** * 發令裁判 */ static class OrderReferee implements Runnable { @Override public void run() { try { orderRefereeLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("發令裁判發出指令~~~~~~"); sportsManLatch.countDown(); } } /*** * 運動員 */ static class SportsMan implements Runnable { private int no; public SportsMan(int no) { this.no = no; } @Override public void run() { try { System.out.println(no + "號運動員已經就位"); sportsManLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(no + "號選手說,我要跑第一"); } } public static void main(String[] args) throws InterruptedException { //6個運動員就位 for (int i = 0; i < 6; i++) { new Thread(new SportsMan(i)).start(); } //發令裁判和計時裁判眼神確認,等計時裁判都准備好之后發令 new Thread(new OrderReferee()).start(); //3個計時裁判就位 for (int i = 0; i < 3; i++) { new Thread(new TimeReferee(i)).start(); } } }
程序輸出:
3、CyclicBarrier
3.1 什么是CyclicBarrier
JDK對CyclicBarrier的解釋是:一種同步輔助工具,它允許一組線程全部互相等待以到達一個公共的障礙點。我們可以從字面意思理解它,可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會打開,所有被屏障攔截的線程才能繼續運行。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),parties表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然后當前線程被阻塞。CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties,Runnable barrierAction),用於在parties個線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。還用一張圖來說明。
3.2 CyclicBarrier實戰
CyclicBarrier可以用於多線程計算數據,最后合並計算結果的場景。我們模擬3個子線程向一個map中添加數據,它們添加數據完成后,到一個屏障點進行等待,由統計線程對數據進行打印,統計線程工作結束后,3個子線程再被統一釋放去干其他工作。我們設置2個屏障點來演示,,體現其可循環使用的特征。
public class CyclicBarrierDemo { private static CyclicBarrier barrier = new CyclicBarrier(3, new CollectThread()); /**存放子線程產生數據的容器*/ private static ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>(); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { Thread thread = new Thread(new WorkThread()); thread.start(); } Thread.sleep(5); } /** * 負責對子線程的結果進行其他處理 */ private static class CollectThread implements Runnable { @Override public void run() { StringBuilder result = new StringBuilder(); for (Map.Entry<String, Long> workResult : map.entrySet()) { result.append("[" + workResult.getValue() + "]"); } System.out.println("the result = " + result); System.out.println("CollectThread do other things"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CollectThread end........"); } } /** * 實際工作的子線程 */ private static class WorkThread implements Runnable { @Override public void run() { long id = Thread.currentThread().getId(); map.put(id + "", id); Random r = new Random(); try { Thread.sleep(r.nextInt(1000)); System.out.println("Thread_" + id + " first do something "); //第一次到達屏障 barrier.await(); System.out.println("Thread_" + id + " first do other things"); Thread.sleep(r.nextInt(500)); map.put(id * 2 + "", id * 2); System.out.println("Thread_" + id + " second do something "); //第二次到達屏障 barrier.await(); System.out.println("Thread_" + id + " second other things "); } catch (Exception e) { e.printStackTrace(); } } } }
程序輸出:
3.3 CountDownLatch和CyclicBarrier對比
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以反復使用。CountDownLatch.await()一般阻塞工作線程,所有的進行預備工作的線程執行countDown(),而CyclicBarrier通過工作線程調用await()從而自行阻塞,直到所有工作線程達到指定屏障,再大家一起往下走。在控制多個線程同時運行上,CountDownLatch可以不限線程數量,而CyclicBarrier是固定線程數。同時,CyclicBarrier還可以提供一個barrierAction,合並多線程計算結果。
4、Callable、Future和FutureTask
4.1 Runnable、Callable、Future和FutureTask之間的關系
Runnable是一個接口,在它里面只聲明了一個run()方法,由於run()方法返回值為void類型,所以在執行完任務之后無法返回任何結果。Callable位於java.util.concurrent包下,它也是一個接口,在它里面也只聲明了一個方法,只不過這個方法叫做call(),這是一個泛型接口,call()函數返回的類型就是傳遞進來的V類型。Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。要獲取返回結果時可以調用get方,該方法會阻塞直到任務返回結果。因為Future只是一個接口,所以是無法直接用來創建對象使用的,因此就有了FutureTask。FutureTask類實現了RunnableFuture接口,RunnableFuture繼承了Runnable接口和Future接口,所以它既可以作為Runnable被線程執行,又可以作為Future得到Callable的返回值。用一個圖來說明。
因此當我們想通過一個線程運行Callable,但是Thread不支持構造方法中傳遞Callable的實例,我們需要通過FutureTask把一個Callable包裝成Runnable,然后再通過這個FutureTask拿到Callable運行后的返回值。要想new出一個FutureTask的實例,有2種方式,直接貼出代碼。
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
4.2 Callable和FutureTask實戰
這個例子比較簡單,在一個主線程中創建一個callable來對1到10000進行累加,再休眠3秒,然后把這個callable封裝成一個futureTask,交給一個線程去運行,最終查看callable的返回結果和阻塞效果。
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class FutureTaskDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { Callable<Long> callable = new Callable<Long>() { long sum = 0; @Override public Long call() throws Exception { for (int i = 0; i <= 10000; i++) { sum += i; } Thread.sleep(3000);//主要是為了演示get()時候的阻塞效果 return sum; } }; FutureTask<Long> futureTask = new FutureTask<>(callable); new Thread(futureTask).start(); Thread.sleep(10); System.out.println("main線程繼續執行"); System.out.println("獲取callable計算結果 = " + futureTask.get()); System.out.println("main線程繼續執行 "); } }
程序輸出:
可以看到當futureTask.get()沒有獲取到返回結果時,主線程是處於阻塞狀態。
4.3 手寫一個FutureTask
要實現一個簡易的FutureTask,通過上面對幾個接口之間關系的介紹,以及閱讀FutureTask代碼可以看出,只需定義一個類,實現Runnable和Future接口,並實現run()方法和get()方法就可以了,核心思想就是上一篇文章中提到的通知/等待機制。直接上代碼:
import java.util.concurrent.*; public class MyFutureTask<V> implements Runnable, Future<V> { private Callable<V> callable; private V result = null; public MyFutureTask(Callable<V> callable) { this.callable = callable; } @Override public void run() { V temp = null; try { temp = callable.call(); } catch (Exception e) { e.printStackTrace(); } synchronized (this) { result = temp; this.notifyAll(); } } @Override public V get() throws InterruptedException { if (result != null) { return result; } System.out.println("等待結果執行完成。。。。。"); synchronized (this) { this.wait(); } return result; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return false; } @Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return null; } }
為了驗證效果,把上一段代碼中的FutureTask改成MyFutureTask,其余代碼都不變。
import java.util.concurrent.Callable; public class FutureTaskDemo { public static void main(String[] args) throws InterruptedException { Callable<Long> callable = new Callable<Long>() { long sum = 0; @Override public Long call() throws Exception { for (int i = 0; i <= 10000; i++) { sum += i; } Thread.sleep(3000);//主要是為了演示get()時候的阻塞效果 return sum; } }; MyFutureTask<Long> futureTask = new MyFutureTask<>(callable); new Thread(futureTask).start(); Thread.sleep(10); System.out.println("main線程繼續執行"); System.out.println("獲取callable計算結果 = " + futureTask.get()); System.out.println("main線程繼續執行 "); } }
運行程序,可以看到輸出結果和阻塞現象與使用FutureTask一致:
5、結語
這篇隨筆就介紹這么多內容,希望大家看了有收獲。原子操作CAS在下一篇文章中介紹,閱讀過程中如發現描述有誤,請指出,謝謝。