fork()
:開啟一個新線程(或是重用線程池內的空閑線程),將任務交給該線程處理。join()
:等待該任務的處理線程處理完畢,獲得返回值。
ForkJoinPool
的每個工作線程都維護着一個工作隊列(WorkQueue
),這是一個雙端隊列(Deque),里面存放的對象是任務(ForkJoinTask
)。
每個工作線程在運行中產生新的任務(通常是因為調用了 fork()
)時,會放入工作隊列的隊尾,並且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其他工作線程的工作隊列),竊取的任務位於其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務時,使用的是 FIFO 方式。
在遇到 join()
時,如果需要 join 的任務尚未完成,則會先處理其他任務,並等待其完成。
在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。
fork
fork()
做的工作只有一件事,既是把任務推入當前工作線程的工作隊列里。可以參看以下的源代碼:
join
join()
的工作則復雜得多,也是 join()
可以使得線程免於被阻塞的原因——不像同名的 Thread.join()
。
1.檢查調用 join()
的線程是否是 ForkJoinThread 線程。如果不是(例如 main 線程),則阻塞當前線程,等待任務完成。如果是,則不阻塞。
2.查看任務的完成狀態,如果已經完成,直接返回結果。
3.如果任務尚未完成,但處於自己的工作隊列內,則完成它。
4.如果任務已經被其他的工作線程偷走,則竊取這個小偷的工作隊列內的任務(以 FIFO 方式),執行,以期幫助它早日完成欲 join 的任務。
5.如果偷走任務的小偷也已經把自己的任務全部做完,正在等待需要 join 的任務時,則找到小偷的小偷,幫助它完成它的任務。
6.遞歸地執行第5步。
以上就是 fork()
和 join()
的原理,這可以解釋 ForkJoinPool 在遞歸過程中的執行邏輯,但還有一個問題
最初的任務是 push 到哪個線程的工作隊列里的?
這就涉及到 submit()
函數的實現方法了
其實除了前面介紹過的每個工作線程自己擁有的工作隊列以外,ForkJoinPool
自身也擁有工作隊列,這些工作隊列的作用是用來接收由外部線程(非 ForkJoinThread
線程)提交過來的任務,而這些工作隊列被稱為 submitting queue 。
submit()
和 fork()
其實沒有本質區別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對象,因此當其中的任務被一個工作線程成功竊取時,就意味着提交的任務真正開始進入執行階段。