ForkJoinPool


  • fork():開啟一個新線程(或是重用線程池內的空閑線程),將任務交給該線程處理。
  • join():等待該任務的處理線程處理完畢,獲得返回值。

ForkJoinPool 的每個工作線程都維護着一個工作隊列WorkQueue),這是一個雙端隊列(Deque),里面存放的對象是任務ForkJoinTask)。

每個工作線程在運行中產生新的任務(通常是因為調用了 fork())時,會放入工作隊列的隊尾,並且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。

每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其他工作線程的工作隊列),竊取的任務位於其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務時,使用的是 FIFO 方式。

在遇到 join() 時,如果需要 join 的任務尚未完成,則會先處理其他任務,並等待其完成。

在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。

 

fork

fork() 做的工作只有一件事,既是把任務推入當前工作線程的工作隊列里。可以參看以下的源代碼:

join

join() 的工作則復雜得多,也是 join() 可以使得線程免於被阻塞的原因——不像同名的 Thread.join()

 

1.檢查調用 join() 的線程是否是 ForkJoinThread 線程。如果不是(例如 main 線程),則阻塞當前線程,等待任務完成。如果是,則不阻塞。

2.查看任務的完成狀態,如果已經完成,直接返回結果。

3.如果任務尚未完成,但處於自己的工作隊列內,則完成它。

4.如果任務已經被其他的工作線程偷走,則竊取這個小偷的工作隊列內的任務(以 FIFO 方式),執行,以期幫助它早日完成欲 join 的任務。

5.如果偷走任務的小偷也已經把自己的任務全部做完,正在等待需要 join 的任務時,則找到小偷的小偷,幫助它完成它的任務。

6.遞歸地執行第5步。

以上就是 fork()join() 的原理,這可以解釋 ForkJoinPool 在遞歸過程中的執行邏輯,但還有一個問題

最初的任務是 push 到哪個線程的工作隊列里的?

這就涉及到 submit() 函數的實現方法了

其實除了前面介紹過的每個工作線程自己擁有的工作隊列以外,ForkJoinPool 自身也擁有工作隊列,這些工作隊列的作用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務,而這些工作隊列被稱為 submitting queue

submit()fork() 其實沒有本質區別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對象,因此當其中的任務被一個工作線程成功竊取時,就意味着提交的任務真正開始進入執行階段。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM