ThreadPool
本篇博文,博主將介紹Quartz框架中ThreadPool線程池相關的內容。線程池顧名思義,就是一個可以幫助我們來進行線程資源管理的對象。在web開發中,常見的就有數據庫連接池,http連接池,redis連接池等。在看這篇文章之前,讀者需要先具備一定的多線程和鎖的知識,如使用wait和notify方法,實現生產者和消費者功能。
為什么要用線程池?
- 線程池可以復用線程,減少線程的創建和銷毀次數。
- 可以提高程序的響應速度。
- 可以對線程資源進行統一管理,比如監控。
- ...
接口定義
quartz框架中的ThreadPool接口定義如下,博主在相應的接口方法上進行了注釋。
public interface ThreadPool {
//將runnable接口放入到Thread中執行
boolean runInThread(Runnable runnable);
//阻塞獲取可用線程數
int blockForAvailableThreads();
//線程初始化方法
void initialize() throws SchedulerConfigException;
//關閉線程
void shutdown(boolean waitForJobsToComplete);
//獲取線程池大小
int getPoolSize();
//設置實例id
void setInstanceId(String schedInstId);
//設置實例名稱
void setInstanceName(String schedName);
}
WorkThread
在介紹SimpleThreadPool之前,博主先講解一下WorkThread。WorkThread繼承了Thread類,因此它是一個可以被運行的線程。它會阻塞式的接收線程池分配的任務,然后執行對應的任務。
工作者線程屬性
- lock,任務執行鎖。
- tp,工作者線程所在的線程池。
- runnable,可以運行的任務。
- runOnce,是否只執行一次。通過構造函數傳runnable實例,此時runOnce為true。
- run,是否需要循環執行。
Runnable的run方法
工作者線程啟動之后,在一個while循環里面執行業務邏輯。while循環的退出條件是線程是否關閉(run == false)。先去獲取任務鎖(lock對象),如果沒有關閉且當前runnale方法為空,說明此時沒有需要執行的任務。線程會在這里等待500毫秒(wait 500)。當有任務投遞給當前線程時,才會喚醒工作者線程,繼續執行當前方法。
如果任務不為空的話,執行runnable接口。執行完之后獲取lock鎖,防止並發沖突。獲取到鎖之后,設置runnable接口為null。如果只是只執行一次的任務,通過持有的threadPool引用,去獲取下一次任務允許的鎖,獲取到下一次任務允許的鎖之后,那么就將自己從busyWorks中移除。否則就從busyWorkers中移除,然后添加到avaliableWorkers。
自定義的run方法
內部的run(Runnable newRunnable)方法:先去獲取內部對象lock的鎖,獲取成功后先判斷內部的runnable是否為空。如果不為空的話說明該線程還是處於繁忙狀態,拋出異常。如果為空的話,則設置為傳遞進來的newRunnable,並且喚醒所有lock等待隊列中的對象(提前讓這些對象結束等待,提高工作線程的響應速度)。
shutdown方法
設置內部成員變量run為false,也就是任務執行完之后不再循環等待下一個可以運行的任務,結束線程的run方法后,線程會進入terminate狀態。
SimpleThreadPool
在quartz默認的配置文件中,使用的是SimpleThreadPool這個線程池,並且指定了線程池的個數為10。從源碼中,我們也可以看到SimpleThreadPool是一個線程大小固定的線程池。代碼如下所示:
public SimpleThreadPool(int threadCount, int threadPriority) {
setThreadCount(threadCount);
setThreadPriority(threadPriority);
}
線程池屬性
- count,線程池個數
- isShutdown,是否處於關閉狀態
- handoffPending,是否處於切換狀態
- makeThreadsDaemons,創建的線程是否為后台線程
- threadGroup,線程組
- nextRunnableLock,下一個可以運行的任務鎖
- workers,總共線程集合
- availWorkers,空閑線程集合
- busyWorkers,繁忙線程集合
- threadNamePrefix,線程名稱前綴
- schedulerInstanceName,調度器實例名稱
initialize方法
在QuartzScheduler對象進行初始化的時候,就會創建對應的線程池,並且調用對應的initialize方法。initialize方法主要就是預先創建對應個數的工作者線程,並且將它們添加到workers和availWorkers集合中,並且循環調用每個workThread的start方法。
blockForAvailableThreads方法
先獲取到下一次運行的任務鎖(nextRunnableLock),防止這時候的空閑線程集合availWorkers發生變化。循環判斷如果此時的availWorkers小於1,或者此時有任務進行等待分配(handoffPending為true),並且此時線程池沒有關閉(isShutdown為false),那么就進行等待(wait 500),直到前面的條件都不滿足為止才退出循環。
如果此時獲取到的空閑線程數大於等於1,則說明現在可以把對應個數的任務交給線程池進行分配執行。
runInThread方法
先獲取到可以允許下一個任務的鎖(nextRunnableLock),設置handoffPending為true,handoffPending表示當前有任務在等待線程池分配任務。接着阻塞判斷是否存在空閑線程可以獲取(一般情況下,繁忙線程執行完后會將自己添加到空閑線程集合中)或者 線程池是否將要被關閉(isShutdown為false)。
如果沒有關閉線程池的情況下,直接從空閑線程集合中拿到第一個線程,並把它從空閑集合中移除。然后將這個空閑線程添加到繁忙線程集合中,接着執行workThread的投遞方法(run方法)。
如果關閉線程池將要被關閉的情況下,直接new出一個線程(此時這個WorkThread的runOnce屬性為true)去執行這個任務,不再等待有空閑線程去執行,這樣可以減少線程池的shutdown時間。並將此線程添加到繁忙線程集合中,添加到工作者集合中。
最后通知等待下一次允許任務鎖的線程,設置handoffPending為false。
shutdown
整體來說,我們關閉線程池時,一個需要停止上游線程(quartzScheduleThread)給他(thradPool)分配任務,另一個需要關閉掉工作池中現有的任務。關閉線程池的方法有一個waitForJobsToComplete的屬性,waitForJobsToComplete表示線程池關閉是否需要等到運行中的任務執行完畢。
先獲取nextRunnableLock,設置線程池為關閉狀態(shutdown),並循環調用workers集合中thread的shutdown方法(不讓工作者線程再循環執行),然后移除availWorkers中的線程。通知阻塞在nextRunnableLock鎖上的所有線程。
如果waitForJobsToComplete為true,那么會循環判斷busyWorkers的元素個數是否大於0(一般情況下,繁忙集合中的線程結束任務后,會將自己從繁忙集合中移除),如果busyWorkers的元素個數大於0的,調用nextRunnableLock的阻塞方法,讓其它方法有時間處理(比如如果此時有任務需要進行分配,可以讓線程池把任務分配好)。最后循環調用每個workers中thread的join方法,等待thread死亡。
如果waitForJobsToComplete為false,那么會直接返回。
博主微信公眾號
