制造一個輪子線程池


很早之前就看過線程池源碼(知道大概的運行原理),但是只是知道怎么用,並沒有深究。這次為了幫助自己深入理解線程池,決定手動寫一個極簡(陋)的線程池,順便記錄思考和造輪過程。

雖然不太可能和jdk自帶的那么完美,但是該有的功能還是要有:

  • 新建線程池,有核心線程數和最大線程數,線程存活時間,隊列
  • 在線程池加入線程,當前線程數不超過核心線程數就新建線程,超過核心放隊列,隊列滿了再新建線程,達到最大線程
  • 全部線程運行完成后會保留核心線程數,支持線程存活時間
  • 立即關閉線程池
  • 優雅的關閉線程池
1.新建一個輪子線程池類,就一個構造函數,把需要的參數都傳進來

2.用ThreadPoolExecutor的時候,新建了線程池就會往里面提交線程了,我們寫的也一樣,而且往線程池里面加線程的時候就會判斷:當前運行線程數是否大於核心線程數,是否大於最大線程數等,這里需要一個當前運行線程數的變量。

所以這里增加一個成員變量activeCount,初始值為0,運行一個線程就加1,線程運行結束就減1,這里面減的時候是在不同線程里面,所以為了線程安全用AtomicInteger類型

    /** 當前活動線程數 */
    private AtomicInteger activeCount = new AtomicInteger(0);

提交線程的方法,看過ThreadPoolExecutor源碼的應該知道,里面每個線程都是包裝了一個Worker類,新增線程的時候就會新建一個worker,為什么要這么做呢? 我第一次看的時候也不理解,想着如果直接把傳進來的線程start一下怎么樣,如果直接新建線程會立馬就發現問題,怎么知道線程什么運行完,怎么把activeCount減1? 所以這里不能直接start,必須新建一個線程(異步執行,廢話),這個線程必須要運行參數的線程run方法(廢話,不然參數還有啥用,我們的業務邏輯咋執行),線程運行完成后activeCount減1。

所以這里有了worker類,worker本身也是一個線程。順便把線程名字也解決了,新建一個threadNum從0開始,新建一個worker就+1

提交線程

3.現在已經有點樣子了,但是問題還很多,主要的一個問題:比如我建一個core=2,max=5,queue=5的線程池,假設往里面放了8個線程,會出現只運行了3個線程,跑完也就結束了,剩下的5個會在隊列里面沒處理,而且也不會保留2個核心線程

WheelThreadPool2的運行結果

現在想想怎么把隊列的線程拿出來運行,沒看ThreadPoolExecutor源碼前我第一次想着是不是在創建線程池的時候默認啟動一個線程去隊列里面獲取並執行,然而一想立馬否定了,因為線程池是多線程運行的,隊列里面的線程需要max(參數最大線程數)個線程同時執行。 所以在我們新建的worker里面要能不斷的循環獲取隊列的線程去執行,如果隊列為空了,則退出循環,讓線程結束

改造一下worker的run方法,在execute方法創建的worker線程執行完通過參數傳進來的runnable之后,循環獲取隊列並執行隊列線程的run方法

這樣還有點問題,如果try里面出現異常,比如runnable.run異常或者r.run異常,這個線程就退出了,不能保持max個線程並行執行

所以如果異常了需要重新創建一個線程繼續跑循環,改造后

這樣改造后如果隊列空了會把所有線程都結束掉,所以現在要解決執行完隊列后保留core個線程的問題,怎么保留線程其實是通過阻塞隊列實現的,

當隊列為空時,通過queue.take()方法阻塞住當前線程,直到又有線程提交。如果當前活動線程超出core,結束當前線程

這樣改造后大概輪廓出來了,因為queue是阻塞隊列,而且各個方法都加了線程鎖,所以本身也是線程安全的,這部分代碼貌似不需要加鎖,跑個測試用例試下,貌似很合理

WheelThreadPoolTest3結果

先放着,再看看下個功能,要支持線程存活時間,這個存活時間的意思是:比如上面WheelThreadPoolTest3里面的線程池運行了10個線程,跑完之后剩余2個線程,3個消亡了(完成任務了)。 后面再提交10個線程,又新建3個線程(從控制台的線程名字可以看出),如果我們設置一個存活時間,讓第一批的10個跑完后的那3個線程不消亡,比如存活5秒,第二批的10個跑的時候就可以復用,不需要重新創建線程。 因為線程是稀缺資源,能復用就復用,新建線程也影響效率

目前的代碼線程消亡的標記是因為queue.poll獲取到了null,導致循環退出,線程完成。而阻塞隊列的poll方法還有一個多態方法E poll(long timeout, TimeUnit unit),可以在一定時間內poll,在時間內獲取到了就會返回,這個時間剛好用於是線程的存活時間(死亡倒計時??)。

構造方法已經傳了存活時間和單位,直接加上這兩個參數

再來測試下,存活時間設置為5秒,那樣第二批只能提交5個線程,否則會導致線程池慢

結果

發生一個大問題,最后線程都沒了,而且主線程也退出了

原因:第一批的10個線程執行完后因為線程存活5秒,所以都保留了

堆棧打印出來也證實了,都在poll里面阻塞,然后第二批5個線程已提交,這存活的5個線程就會立馬開始執行,執行完后再次阻塞再poll,等過了存活時間,線程全部結束!

在第二批執行結束后再次打印堆棧,結果果然是這樣

問題知道了,解決方法

這里的lock是一個線程鎖,防止多個線程同時判斷(同時判斷了寫這個還有意義么。。)

   /** 線程鎖 */
   private Lock lock = new ReentrantLock();

至此,寫好了主要部分,測試下

不過有個地方我自己代碼走讀,感覺是有問題

感覺這里會存在線程安全問題,假設線程池隊列為空,當前activeCount大於core,並發情況下,多個線程同時滿足activeCount.get()>coreCount,之后所有線程都會走queue.poll分支,因為隊列為空,所有線程queue.poll返回為null,所有線程全部結束掉,這樣和保留core個線程沖突了。 糾結許久后來發現這個假設不成立,要在沒有進入while之前就出現隊列為空,且activeCount>core,這種情況不會出現,因為在提交線程的方法(execute)已經限制了這種情況,但是這個代碼看起來會有歧義,還是決定改造改造

這回走讀一次,感覺好多了,終於把主要功能寫完了,也能正常跑了

4.立即關閉線程池

線程池里面的線程跑完了,但是還有core個線程阻塞着,這么一直阻塞着也不是辦法,所以要有個關閉的方法,先寫暴力關閉,當前運行的線程中斷,隊列拋棄

思考時間:中斷線程肯定是調用Thread.interrupt方法,這樣我得拿到正在運行得線程才行,所以在新增線程的時候得保存在一個集合里,而且線程執行得時候異常了,也會新增線程,所以這個保存集合要線程安全,而且存取速度要快 這里需要一個線程安全得set集合,ConcurrentHashMap里面有個newKeySet方法,看了下源碼是通過ConcurrentHashMap的key來的,是線程安全的,直接用

   /** 保存正在運行的線程 */
   private Set<Worker> workers = ConcurrentHashMap.newKeySet();

還需要一個狀態標識當前線程池是否關閉,這個狀態要在線程並發(獲取隊列線程)情況下可以判斷,所以用volatile修飾,默認正在運行

   /** 線程池狀態,-1:正在運行,0:暴力關閉,1:優雅關閉 */ 
   private volatile int status = -1;

新建暴力關閉方法stopNow

在新建worker的時候都加到workers里面去,這時候一想,activeCount和workers.size是不是重復了,順帶把activeCount刪掉,用workers.size代替,線程執行完成workers.remove掉

新增方法addWorker

提交線程方法改造,刪掉了activeCount,用workers.size代替

worker的run方法改造,刪掉了activeCount,用workers.size代替,activeCount-1用workers.remove(this)代替

改造完了,查看一下,暴力關閉需要立即中斷線程,拋棄隊列,所以在while獲取隊列那里要增加判斷

提交線程方法增加狀態判斷

改造完成,測試下暴力關閉

結果,整個程序也退出了,線程池結束了,隊列只運行了一個線程

5.優雅關閉線程池

優雅的關閉線程池,是要讓所有的線程和隊列都運行完畢再關閉所有線程,這樣就不能直接interrupt線程了,先設置status=1未優雅關閉

新增優雅關閉方法

提交線程方法增加狀態,限制提交

這里調用stop方法時分情況:

  • 1.存在線程還在執行(隊列或者當前線程),執行完成后調用workers所有線程的interrupt,防止存在線程在queue.take處阻塞
  • 2.不存在線程還在執行(隊列或者當前線程),調用workers所有線程的interrupt,防止存在線程在queue.take處阻塞

所以這里需要一個標記,是否還存在線程在執行,我們可以用一個數字標識當前還需要執行的線程數量,執行完一個線程就-1

增加成員變量remainingCount,標識剩余線程數

   /** 剩余線程數 */
   private AtomicInteger remainingCount = new AtomicInteger(0); 

每次提交一個線程+1

每次執行完一個線程-1

第一種情況,存在線程還在執行,在執行完成后判斷remainingCount是否為0

第二種情況,不存在線程還在執行,在stop的時候增加判斷

抽象封裝下方法

順便把worker的run方法也優化下,一個屏幕都截不下了

跑個第一種情況的測試用例

結果出乎意料,居然沒有結束所有線程

加日志調試

出現了更誇張的錯誤

根據控制台信息可以想象,原本的5個線程全部被interrupt,又不斷地創建線程,又不斷的被interrupt。 這里會創建線程的地方只有在worker的run方法異常,finally代碼段里面,而且沒加日志的時候沒有出現這種情況,加了日志就出現了。 多次代碼走讀后,發現一種可能,在最初5個線程同時將隊列消耗完后,2個線程進入take阻塞,3個線程開始進入interruptWorkers方法,導致那2個線程出現異常,異常后會退出線程,再次創建新線程,並且interrupt新線程,由此陷入死循環

改造getQueueTask方法,不拋出異常,出現異常返回null。順便走讀一下status=0的情況,發現不影響

再次運行測試用例,結果符合預期了,這里的異常堆棧可以忽略

再測試優雅關閉的第二種情況

結果正常

去掉調試日志,至此,這個輪子線程池完成,具備線程池基礎功能

總結

寫這個線程池過程曲折,各種問題不斷出現,特別時兩種關閉方法,判斷比較煩,代碼走讀和調試良久,才堪堪解決,由此聯想ThreadPoolExecutor是多么強大,多么不簡單

圖片較多,代碼可以在Github上找到

參考資料:ThreadPoolExecutor源碼

感謝crossoverjie的文章:https://crossoverjie.top/2019/05/20/concurrent/threadpool-01/

本文來自chentiefeng的博客


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM