重點內容
- 線程池的使⽤
- 創建線程池
- 提交任務
- 關閉線程池
- 線程池的原理
- 合理配置線程池
- 線程池的監控
1.線程池的創建
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- corePoolSize:線程池的基本大小。 提前調用prestartAllCoreThreads(),會把所有的基本線程啟動 。
- workQueue: ⽤於保存等待執⾏的任務的阻塞隊列。
- ArrayBlockingQueue 基於數組實現的(先進先出)。
- LinkedBlockingQueue 吞吐量要高於ArrayBlockingQueue。
- SynchronousQueue 吞吐量要高於LinkedBlockingQueue 不存儲元素的阻塞隊列,得等一個線程做移除操作才能繼續進行,要不會一直阻塞。
- PriorityBlockingQueue 具有優先級的無限阻塞隊列。
- maximumPoolSize: 線程池允許創建的最⼤線程數。
- threadFactory: ⽤於設置創建線程的工廠可以使用谷歌的開源方法。
- handler: 飽和策略,阻塞隊列和我們的線程的創建數都滿了的時候就會飽和選擇一個策略對新提交的策略進行處理。
- AbortPolicy 直接拋出異常。
- CallerRunsPolicy 只用調用者所在的線程來處理任務。
- DiscardOldestPolicy 丟棄隊列里最近的一個任務。
- DiscardPolicy 直接丟棄。
- ⾃定義 自己定義一個處理方式。
- keepAliveTime:線程池的⼯作線程空閑后,保持存活的時間。
- unit:線程活動保持時間的單位。
2.提交任務
execute:⽤於提交不需要返回值的任務
submit:⽤於提交需要返回值的任務
shutdown:終止的時候會拋出異常
shutdownNow:中止的時候不會拋出異常
- 線程池測試代碼
/** @Classname ThreadPoolDemo @Author XW @Date 2021/12/17 23:15 */
public class ThreadPoolDemo {
private static ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
private static ExecutorService pool =
new ThreadPoolExecutor(
2,
20,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(2),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
pool.execute(new NoResultThread(i));
/**
* submit 驗證 Future<String> future = pool.submit(new ResultThread()); try {
* System.out.println("main thread get result: " + future.get()); // future.get(100,
* TimeUnit.MICROSECONDS); } catch (Exception e) { e.printStackTrace(); }
*/
}
// shutdown 驗證
System.out.println("執行shutdown! ");
pool.shutdown(); // 會繼續執行並且完成所有未執行的任務, 新提交的任務會被reject(通過reject策略)
for (int i = 10; i < 12; i++) {
pool.execute(new NoResultThread(i));
}
/**
* shutdownnow 驗證 System.out.println("執行shutdownnow! "); List<Runnable> runnableList =
* pool.shutdownNow(); // 會清除所有未執行的任務並且在運行線程上調用interrupt()
*/
System.out.println("pool shutdown state: " + pool.isShutdown());
while (true) {
if (pool.isTerminated()) {
System.out.println("pool terminated!");
break;
} else {
System.out.println("pool terminated state: " + pool.isTerminated());
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class NoResultThread implements Runnable {
private int taskNum;
public NoResultThread(int taskNum) {
this.taskNum = taskNum;
}
@Override
public void run() {
System.out.println("線程 " + Thread.currentThread().getName() + " 開始執行任務 " + this.taskNum);
try {
Thread.sleep(1000);
System.out.println("線程 " + Thread.currentThread().getName() + " 執行完任務 " + this.taskNum);
} catch (InterruptedException e) {
System.out.println(
"線程 "
+ Thread.currentThread().getName()
+ " 在執行任務 "
+ this.taskNum
+ " 時被中斷 :"
+ e.getMessage());
}
}
}
private static class ResultThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println(
Thread.currentThread().getState() + "----------" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getName();
}
}
}
3.線程池的實現原理
首先會判斷corePoolSize核心線程池是否已經滿了,沒滿就直接創建線程執行任務,滿了再去判斷隊列是否滿了,隊列沒有滿的話在把任務放在隊列里面,隊列如果滿的話,會將當前的線程數量跟maximumPoolSize進行對比如果沒滿的話就創建線程執行任務,maximumPoolSize也滿了話就按照策略(handler)處理無法執行的任務。注意線程池只要創建線程就會獲取全局鎖。
線程會根據worker去線程池里面拿任務
- 線程池execute的源碼
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
/** ctl記錄着workCount和runState */
int c = ctl.get();
/** 第一步: 如果線程池中的線程數量小於核心線程數,那么創建線程並執行*/
if (workerCountOf(c) < corePoolSize) { // workerCountOf(c): 獲取當前活動的線程數
/**
* 在線程池中新建一個新的線程
* command:需要執行的Runnable線程
* true:新增線程時,【當前活動的線程數】是否 < corePoolSize
* false:新增線程時,【當前活動的線程數】是否 < maximumPoolSize
*/
if (addWorker(command, true)) {
// 添加新線程成功,則直接返回。
return;
}
// 添加新線程失敗,則重新獲取【當前活動的線程數】
c = ctl.get();
}
/**
* 第二步:如果當前線程池是運行狀態 並且 任務添加到隊列成功
* (即:case2: 如果workCount >= corePoolSize,創建線程往workQueue添加線程任務,等待執行)
*/
// BlockingQueue<Runnable> workQueue 和 Runnable command
if (isRunning(c) && workQueue.offer(command)) { // 添加command到workQueue隊列中。
// 重新獲取ctl
int recheck = ctl.get();
// 再次check一下,當前線程池是否是運行狀態,如果不是運行時狀態,則把剛剛添加到workQueue中的command移除掉,並調用拒絕策略
if (!isRunning(recheck) && remove(command)) {
reject(command);
} else if (workerCountOf(recheck) == 0) { // 如果【當前活動的線程數】為0,則執行addWork方法
/**
* null:只創建線程,但不去啟動
* false:添加線程時,根據maximumPoolSize來判斷
*
* 如果 workerCountOf(recheck) > 0, 則直接返回,在隊列中的command稍后會出隊列並且執行
*/
addWorker(null, false);
}
}
/**
* 第三步:滿足以下兩種條件之一,進入第三步判斷語句
* case1: 線程池不是正在運行狀態,即:isRunning(c)==false
* case2: workCount >= corePoolSize 並且 添加workQueue隊列失敗。即:workQueue.offer(command)==false
*
* 由於第二個參數傳的是false,所以如果workCount < maximumPoolSize,則創建執行線程;否則,進入方法體執行reject(command)
*/
else if (!addWorker(command, false)) {
// 執行線程創建失敗的拒絕策略
reject(command);
}
}
- 線程池addWorker的源碼
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
/** 步驟一:試圖將workerCount+1 */
for (; ; ) {
int c = ctl.get();
// 獲得運行狀態runState
int rs = runStateOf(c);
/**
* 只有如下兩種情況可以新增worker,繼續執行下去:
* case one: rs == RUNNING
* case two: rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()
*/
if (rs >= SHUTDOWN && // 即:非RUNNING狀態(請查看isRunning()方法)。線程池異常,表示不再去接收新的線程任務了,返回false
/**
* 當線程池是SHUTDOWN狀態時,表示不再接收新的任務了,所以:
* case1:如果firstTask!=null,表示要添加新任務,則:新增worker失敗,返回false。
* case2:如果firstTask==null並且workQueue為空,表示隊列中的任務已經處理完畢,不需要添加新任務了。
* 則:新增worker失敗,返回false
*/
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
return false;
}
for (; ; ) {
// 獲得當前線程池里的線程數
int wc = workerCountOf(c);
/**
* 滿足如下任意情況,則新增worker失敗,返回false
* case1:大於等於最大線程容量,即:int CAPACITY = 00011111111111111111111111111111 = 536870911(十進制)
* case2:當core是true時:>= 核心線程數
* 當core是false時:>= 最大線程數
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
// 當前工作線程數加1
if (compareAndIncrementWorkerCount(c)) {
break retry; // 成功加1,則跳出retry標識的這兩層for循環
}
// 如果線程數加1操作失敗,則獲取當前最新的線程池運行狀態
c = ctl.get();
// 判斷線程池運行狀態(rs)是否改變;如果不同,則說明方法處理期間線程池運行狀態發生了變化,重新獲取最新runState
if (runStateOf(c) != rs) {
continue retry; // 跳出內層for循環,繼續從第一個for循環執行
}
}
}
/**
* 步驟二:workerCount成功+1后,創建Worker,加入集合workers中,並啟動Worker線程
*/
boolean workerStarted = false; /** 用於判斷新的worker實例是否已經開始執行Thread.start() */
boolean workerAdded = false; /** 用於判斷新的worker實例是否已經被添加到線程池的workers隊列中 */
Worker w = null; // AQS.Worker
try {
w = new Worker(firstTask); /** 創建Worker實例,每個Worker對象都會針對入參firstTask來創建一個線程。 */
final Thread t = w.thread; /** 從Worker中獲得新建的線程t */
if (t != null) {
final ReentrantLock mainLock = this.mainLock; /** 加重入鎖 */
/** ----------lock() 嘗試加鎖操作!!獲得鎖后繼續執行,沒獲得則等待直到獲得鎖為止---------- */
mainLock.lock();
try {
int rs = runStateOf(ctl.get()); /** 獲得線程池當前的運行狀態runStatus */
/**
* 滿足如下任意條件,即可向線程池中添加線程:
* case1:線程池狀態為RUNNING。(請查看isRunning()方法)
* case2:線程池狀態為SHUTDOWN並且firstTask為null。
*/
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) { /** 因為t是新構建的線程,還沒有啟動。所以,如果是alive狀態,說明已經被啟動了,則拋出異常 */
throw new IllegalThreadStateException();
}
workers.add(w); /** workers中保存線程池中存在的所有work實例集合 */
int s = workers.size();
if (s > largestPoolSize) { /** largestPoolSize用於記錄線程池中曾經存在的最大的線程數量 */
largestPoolSize = s;
}
workerAdded = true;
}
} finally {
mainLock.unlock(); /** ----------unlock 解鎖操作!!---------- */
}
if (workerAdded) {
t.start(); /** 開啟線程,執行Worker.run() */
workerStarted = true;
}
}
} finally {
if (!workerStarted) { // 如果沒有開啟線程
addWorkerFailed(w); // 往線程池中添加worker失敗了
}
}
return workerStarted;
}
- 線程池runWorker的源碼
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
/**
* 如果線程池正在停止,請確保線程被中斷;否則,請確保線程不被中斷。
* 這需要在第二種情況下重新檢查以處理shutdownNow競賽,同時清除中斷
*
* 同時滿足如下兩個條件,則執行wt.interrupt()
* 1>線程狀態為STOP、TIDYING、TERMINATED 或者 (當前線程被中斷(清除中斷標記)並且線程狀態為STOP、TIDYING、TERMINATED)
* 2>當前線程wt沒有被標記中斷
*/
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted()) {
wt.interrupt();
}
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); /** 真正做事兒的地方了 */
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
4.合理配置線程池
任務的性質
- CPU密集型 : N cpu + 1 配置盡可能小的線程,線程數要少一點,減少cpu頻繁的上下文切換,提高cpu的利用率
- IO 密集型 :2 * N cpu 需要配置盡可能多的線程,這樣才能保證cpu能被充分的利用
- 混合型 :拆分成CPU密集型和IO密集型
- N = Runtime.getRuntime().availableProcessors()
任務的優先級 :PriorityBlockingQueue
任務的執⾏時間
- 不同規模的線程池
- PriorityBlockingQueue 讓執行時間比較短的線程先執行
任務的依賴性
- 增加線程數量
- 使⽤有界隊列保證系統的穩定性
5.線程池的監控
taskCount 任務的數量
completedTaskCount 運行的過程中完成的任務數量
largestPoolSize 曾經創建過的最大的線程數量
getPoolSize 線程數量
getActiveCount 獲取活動的線程數
擴展線程池:beforeExecute、afterExecute 在線程執行前,執行后做點什么