大家好,我是小黑,一個在互聯網苟且偷生的農民工。
本期帶來線程池的第二期內容,如果對線程池的基本概念還不是很清楚,可以先看我上一篇文章。
本期內容會從以下幾個方面解析線程池的具體實現:
- 線程池狀態
- 線程池初始化
- 如何執行任務
- 鈎子方法
- 等待隊列和排隊策略
- 自定義拒絕策略
- 線程池關閉
- 動態調整容量
- 合理配置容量
線程池狀態
ThreadPoolExecutor中定義了如下幾種線程池狀態:
- RUNNING :運行狀態,該線程池可以接受新任務和處理排隊任務
- SHUTDOWN:關閉狀態,不接受新任務,但處理排隊任務
- STOP:停止狀態,不接受新任務、不處理排隊任務和中斷進行中任務
- TIDYING:整理狀態,所有的線程任務已終止,workerCount為零,轉換到整理狀態的線程將運行terminated()鈎子方法
- TERMINATED:終止狀態,terminated()執行完畢,線程池將會被設置為TERNINATED狀態。
我們在上面代碼中看到了對於線程池狀態的定義,但是並沒有發現有定義一個int類型的變量表示當前線程池的狀態,那是怎么做的呢?
我們看到在最上面有定義一個AtomicInteger ctl
這樣一個原子類型的Integer,這個ctl不光可以表示線程池的運行狀態,同時能夠表示線程池的有效線程數workerCount
。那么是怎么做到的呢?我們都知道Integer類型的內存大小是4個字節,對應32個bit,ctl將32位的高三位用來表示線程池的運行狀態,低29位表示有效線程數。
這里可以想一下為什么是用高三位表示runState,不是兩位,也不是4位呢?
因為線程池的狀態定義了5種,而二進制要能夠表示5種值最少要用3位。比如1位只能表示0和1,兩位能表示00/01/10/11四種,那3位能表示的值則是2^3
也就是8種,所以要標識5種狀態則最少需要3位。
因為這一點,也限制了一個線程池理論上能設置的最大線程數是2^29-1個。
那如果想要從這一個字段里取出runState或者workCount的值應該怎么做的?
可以看到是通過位運算來實現的。這里先給大家插播一下位運算的邏輯。
- 按位與&:兩數同為1則為1,其他情況為0,如0101& 0100 的結果是0100
- 按位或|:兩數只要有1個是1,則為1,如 0101|0100的結果是0101
- 按位取反~:0變1,1變0,如0101按位取反則等於1010
要獲取高位的runState則使用ctl的值c和容量(也就是2^29-1
)取反做與運算;
取低位的workerCount則不用取反。
通過一個字段來表示兩個概念,並且使用Atomic可以保證操作的原子性,不得不說Doug Lea,YYDS!!!
線程池初始化
通過ThreadPoolExecutor的構造方法我們來看一下線程池在創建的時候都做了些什么。
可以發現,在構造方法中只是對7個參數進行賦值,並沒有去做線程的創建,所以在默認情況下,線程池創建后是沒有線程的,需要在任務提交時才會創建線程。
如果需要在線程池創建之后立即創建線程,ThreadPoolExecutor提供了兩個方法可以實現:
如何執行任務
通過ThreadPoolExecutor執行任務時可以通過調用execute()
方法和submit()
方法來完成。
在這之前我們需要先知道ThreadPoolExecutor中一些比較重要的變量,可點開下圖查看。
接下來我們來看execute()
是如何執行任務的。
以上execute()
方法的執行步驟可以總結為3步:
- 如果有效線程數workerCount小於核心線程數,則嘗試增加一個線程執行當前任務,如果成功,則會在新線程中執行任務,如果失敗,則執行下一步;
- 如果線程池狀態是running,則嘗試加入到等待隊列,如果入隊成功,則需要重新檢查線程池狀態是否是running,如果已經不是running則要將任務從隊列中remove並按照拒絕策略處理;如果重新檢查線程池狀態是running,則要判斷workCount是不是等於0,如果等於0則需要創建一個新的Worker用於執行剛入隊的任務;
- 如果在第二步入隊失敗,會再次嘗試增加一個Worker執行該任務,如果這里還是不行,則表示線程池確實shutdown或者等待隊列滿了,就執行拒絕策略。
有了這個整體之后,我們再進一步看看addWorker()
方法,在這之前需要先了解Worker類的結構。
可以發現Worker類繼承了AQS,並且實現Runnable接口,也就是說Worker對象可以交給一個Thread創建線程后執行。從Worker的構造方法里我們也能看出,thread是通過線程工廠創建一個線程,將this作為參數傳遞的。
然后我們再來看addWorker()方法:
簡單總結一下addWorker()
方法分以下4步:
- 如果線程池狀態並且工作隊列為空,則直接返回false,如果工作線程數workerCount小於核心線程數或者最大線程數(這里取決於傳入的參數),則對workerCount自旋加1;
- 先加鎖,然后將Worker加入到workerset中,解鎖;
- 如果worker加入workerset成功,將線程啟動;
- 如果線程啟動失敗,則將worke移除,workerCount原子減1
既然Worker類實現了Runnable方法,那對應run()
方法中的邏輯就必須要看一下了。
在Worker的run方法中直接調用外部類ThreadPoolExecutor的runWorker(Worker)
方法。
對runWorker()
方法總結為以下幾個步驟:
- 從傳入worker對象的初始任務開始執行,如果初始任務為空則會調用
getTask()
方法獲取任務,如果返回空着該Worker線程則會退出;如果因為外部任務代碼導致的異常拋出,則也會終止循環,但是不會將Worker線程退出; - 在運行任務之前都會獲取鎖防止其他任務執行時發生其他的池中斷,並確保在池沒有停止的情況下保證該線程不會設置其他中斷;
- 每個運行任務都會調用
beforeExecute()
方法,這個方法可能會拋出異常,這種情況下會導致任務不處理,並且線程會終止; - 如果
beforeExecute()
正常執行,則會運行任務執行run()
,在運行任務出拋出的異常和Error等會收集在thrown變量上,傳給afterExecute()
; run()
執行完之后會執行afterExecute()
如果該方法拋出異常同樣會讓線程終止。
那么getTask()
是去哪里獲取任務呢?當然是從等待隊列中獲取。
getTask()
的執行可以總結如下:
- 會根據當前線程池設置的核心線程數,最大線程數,超時時間等,從任務隊列獲取任務,可能會超時等待,也可能會阻塞知道任務到達;
- 如果線程池STOP,或者有超過最大線程數的工作線程,或者線程是SHUTDOWN並且隊列為空,或者在超時等待任務時超時這4種情況下會返回空。
鈎子方法
在線程池執行任務的runWorker(Worker)
方法中我們發現,會在任務執行前和執行后有兩個方法。
從方法簽名上可以看到這兩個方法都是protected
的,當時在ThreadPoolExecutor中都沒有具體實現。所以這兩個方法主要用於在自定義線程池時覆蓋,可以在任務執行前和執行后做一些事情。比如初始化threadLocals,收集統計信息,打印日志等。需要注意的是這兩個方法中如果拋出異常都會使線程終止。
另外,ThreadPoolExecutor中的terminated()
也可以被覆蓋,可以用於在線程完全終止后執行一些特殊處理。
等待隊列和排隊策略
在上面的內容中很多次的提到了等待隊列,也就是ThreadPoolExecutor中的workQueue,用來存放等待執行的任務。
workQueue的類型定義為BlockingQueue<Runnable>
通過可以使用以下的三種類型。
有界隊列
有界隊列顧名思義就是有邊界的隊列,需要指定隊列的大小,主要有ArrayBlockingQueue和PriorityBlockingQueue。PriorityBlockingQueue的優點是等待隊列中的任務可以按照任務的優先級處理。
有界隊列的大小設置需要和線程池大小相互配合,線程池較小隊列較大時,可以減少內存消耗,降低線程切換次數和CPU的使用率,但是可能會限制系統的吞吐量,所以要結合實際場景考慮如何設置。
無界隊列
隊列的大小沒有限制,常用的有LinkedBlockingQueue,使用該隊列是要謹慎,當任務比較耗時時,可能會導致大量任務堆積在隊列中導致內存溢出。使用Executors.newFixedThreadPool創建的線程池就是使用的LinkedBlockingQueue。
同步移交隊列
如果不希望隊列等待,而是直接交給工作線程執行,則可以使用同步移交隊列SynchronousQueue,該隊列實際不會存放元素,要放入時必須有另一個現在在等待接收元素才能成功,在這之前會一直阻塞。
自定義拒絕策略
上一期我們有講到過線程池的4種拒絕策略。
- AbortPolicy:拒絕處理,拋出異常
- CallerRunsPolicy:由創建該線程的線程(main)執行
- DiscardPolicy: 丟棄,不拋出異常
- DiscardOldestPolicy:和最早創建的線程進行競爭,不拋出異常
當然我們也可以自定義拒絕策略,比如我們在拒絕策略中做一些日志記錄等自定義的需求。
線程池關閉
ThreadPoolExecutor中有兩個方法可以讓線程池關閉,如下:
- shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務
- shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務
動態調整容量
ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()
和setMaximumPoolSize()
。
- setCorePoolSize:設置核心池大小,如果設置的值小於核心線程數,則多余線程會在下一次空閑時終止,如果設置的值較大,並且等待隊列中有任務,則會立即創建線程執行等待隊列中的任務。
- setMaximumPoolSize:設置線程池最大能創建的線程數目大小,如果新值小於當前的最大線程數,則多余的線程會在下次空閑時終止。
配置線程池大小
那么最后我們來說一下應該如何來配置線程池的大小呢?或許大多數程序員都聽過這樣一種說法:
- CPU 密集型應用,線程池大小設置為 CPU核數 + 1
- IO 密集型應用,線程池大小設置為 2*CPU核數
到底對不對呢?
我認為是不對的,因為在實際場景中,一台服務器可能都不止一個應用,而這兩個公式都只和CPU核數相關,所以肯定是不正確的,只有在一台服務器只部署一個應用時才能勉強說的通;還有一個原因就是在一個應用中可能不僅僅是CPU密集型或者IO密集型,可能二者都有,那又該如何選擇呢?以及一個應用中可能會按照功能划分多個線程池,所以最終結論我覺得這兩種說法不對。
那么我們到底應該如何設置線程池的大小呢?有沒有什么可以實踐的方法,這里需要給大家介紹一個理論知識。
利特爾法則 (Little's Law)
一個系統請求數等於請求的到達率與平均每個單獨請求花費的時間之乘積
別看這個名字感覺很高大上,其實概念很簡單。
結合到我們的場景中,我們設定單位時間為1秒鍾來計算,λ=每秒收到的請求數,W=每個任務執行的時間,L=λW=每秒平均在系統中運行的線程。
假設我們的應用是單核的,則可以直接將L設置為線程池大小,但是真實情況並不是,那么多個CPU對我們這個公式中的哪部分數據會有影響呢?
主要是對於W的值有影響,需要知道一個請求中的線程IO時間和線程CPU時間。帶入公式后則是:
λ=((IO時間+CPU時間)/CPU時間)*CPU個數
那么需要獲得IO時間和CPU時間,則需要通過在代碼中進行埋點才能准確獲得,比如通過AOP切面編程在請求前后獲取時間得到結果。
當然,僅僅依靠這個公式還是不夠的,還需要通過壓力測試進行調整和檢驗,才能更准確的配置。
以上就是本期的全部內容,我們下期見,關注我的公眾號【小黑說Java】,更多干貨內容。