JDK7引入了Fork/Join框架,所謂Fork/Join框架,個人解釋:Fork分解任務成獨立的子任務,用多線程去執行這些子任務,Join合並子任務的結果。這樣就能使用多線程的方式來執行一個任務。
JDK7引入的Fork/Join有三個核心類:
ForkJoinPool,執行任務的線程池
ForkJoinWorkerThread,執行任務的工作線程
ForkJoinTask,一個用於ForkJoinPool的任務抽象類。
因為ForkJoinTask比較復雜,抽象方法比較多,日常使用時一般不會繼承ForkJoinTask來實現自定義的任務,而是繼承ForkJoinTask的兩個子類:
RecursiveTask:子任務帶返回結果時使用
RecursiveAction:子任務不帶返回結果時使用
對於Fork/Join框架的原理,Doug Lea的文章:A Java Fork/Join Framework
在看了網上的很多例子之后,發現在自定義任務類實現compute方法的邏輯一般是這樣的:
if 任務足夠小 直接返回結果 else 分割成N個子任務 依次調用每個子任務的fork方法執行子任務 依次調用每個子任務的join方法合並執行結果
而執行該自定義任務的調用的則是ForkJoinPool的execute方法,因此首先來看的就是ForkJoinPool的execute方法,看看和普通線程池執行任務有什么不同:
public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); forkOrSubmit(task); }
因此forkOrSubmit是真正執行ForkJoinTask的方法:
private <T> void forkOrSubmit(ForkJoinTask<T> task) { ForkJoinWorkerThread w; Thread t = Thread.currentThread(); if (shutdown) throw new RejectedExecutionException(); if ((t instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread)t).pool == this) w.pushTask(task); else // 正常執行的時候是主線程調用的,因此關注addSubmission addSubmission(task); }
那么我們首先要關注的是addSubmission方法,發覺所做的事情和普通線程池很類似,就是把任務加入到隊列中,不同的是直接使用Unsafe操作內存來添加任務對象
private void addSubmission(ForkJoinTask<?> t) { final ReentrantLock lock = this.submissionLock; lock.lock(); try { // 隊列只是普通的數組而不是普通線程池的BlockingQueue, // 喚醒worker線程的工作由下面的signalWork來完成 // 使用Unsafe進行內存操作,把任務放置在數組中 ForkJoinTask<?>[] q; int s, m; if ((q = submissionQueue) != null) { long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; UNSAFE.putOrderedObject(q, u, t); queueTop = s + 1; if (s - queueBase == m) // 數組已滿,為數組擴容 growSubmissionQueue(); } } finally { lock.unlock(); } // 通知有新任務來了:兩種操作,有空閑線程則喚醒該線程 // 否則如果可以新建worker線程則為這個任務新建worker線程 // 如果不可以就返回了,等到有空閑線程來執行這個任務 signalWork(); }
接下來要弄清楚就是在compute中fork時,按道理來說這個動作是和主任務在同一個線程中執行,fork是如果把子任務變成多線程執行的:
public final ForkJoinTask<V> fork() { ((ForkJoinWorkerThread) Thread.currentThread()) .pushTask(this); return this; }
在上面分析forkOrSubmit的時候同樣見到了ForkJoinWorkerThread的pushTask方法調用,那么來看這個方法:
final void pushTask(ForkJoinTask<?> t) { // 代碼的基本邏輯和ForkJoinPool的addSubmission方法基本一致 // 都是把任務加入了任務隊列中,這里是加入到ForkJoinWorkerThread // 內置的任務隊列中 ForkJoinTask<?>[] q; int s, m; if ((q = queue) != null) { // ignore if queue removed long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; UNSAFE.putOrderedObject(q, u, t); queueTop = s + 1; // or use putOrderedInt // 這里不太明白 if ((s -= queueBase) <= 2) pool.signalWork(); else if (s == m) growQueue(); } }
看到這里一下子陷入了僵局,為什么ForkJoinWorkerThread要內建一個隊列呢,而且如果子任務仍舊在同一個線程內的話,何以實現並發執行子任務呢?下一篇文章繼續。