大家好,這篇文章我們來聊下動態線程池開源項目(DynamicTp)的通知告警模塊。目前項目提供以下通知告警功能,每一個通知項都可以獨立配置是否開啟、告警閾值、告警間隔時間、平台等,具體代碼請看core模塊notify包。
1.核心參數變更通知
2.線程池活躍度告警
3.隊列容量告警
4.拒絕策略告警
5.任務執行超時告警
6.任務排隊超時告警
DynamicTp項目地址
目前700star,感謝你的star,歡迎pr,業務之余一起給開源貢獻一份力量
gitee地址:https://gitee.com/yanhom/dynamic-tp
github地址:https://github.com/lyh200/dynamic-tp
系列文章
動態線程池(DynamicTp),動態調整Tomcat、Jetty、Undertow線程池參數篇
線程池解讀
上篇文章里大概講到了JUC線程池的執行流程,我們這里再仔細回顧下,上圖是JUC下線程池ThreadPoolExecutor類的繼承體系。
頂級接口Executor提供了一種方式,解耦任務的提交和執行,只定義了一個execute(Runnable command)方法用來提交任務,至於具體任務怎么執行則交給他的實現者去自定義實現。
ExecutorService接口繼承Executor,且擴展了生命周期管理的方法、返回Futrue的方法、批量提交任務的方法
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
AbstractExecutorService抽象類繼承ExecutorService接口,對ExecutorService相關方法提供了默認實現,用RunnableFuture的實現類FutureTask包裝Runnable任務,交給execute()方法執行,然后可以從該FutureTask阻塞獲取執行結果,並且對批量任務的提交做了編排
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
ThreadPoolExecutor繼承AbstractExecutorService,采用池化思想管理一定數量的線程來調度執行提交的任務,且定義了一套線程池的生命周期狀態,用一個ctl變量來同時保存當前池狀態(高3位)和當前池線程數(低29位)。看過源碼的小伙伴會發現,ThreadPoolExecutor類里的方法大量有同時需要獲取或更新池狀態和池當前線程數的場景,放一個原子變量里,可以很好的保證數據的一致性以及代碼的簡潔性。
// 用此變量保存當前池狀態(高3位)和當前線程數(低29位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 可以接受新任務提交,也會處理任務隊列中的任務
// 結果:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 不接受新任務提交,但會處理任務隊列中的任務
// 結果:000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接受新任務,不執行隊列中的任務,且會中斷正在執行的任務
// 結果:001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 任務隊列為空,workerCount = 0,線程池的狀態在轉換為TIDYING狀態時,會執行鈎子方法terminated()
// 結果:010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 調用terminated()鈎子方法后進入TERMINATED狀態
// 結果:010 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 低29位變為0,得到了線程池的狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 高3位變為為0,得到了線程池中的線程數
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
核心入口execute()方法執行邏輯如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以總結出如下主要執行流程,當然看上述代碼會有一些異常分支判斷,可以自己順理加到下述執行主流程里
1.判斷線程池的狀態,如果不是RUNNING狀態,直接執行拒絕策略
2.如果當前線程數 < 核心線程池,則新建一個線程來處理提交的任務
3.如果當前線程數 > 核心線程數且任務隊列沒滿,則將任務放入任務隊列等待執行
4.如果 核心線程池 < 當前線程池數 < 最大線程數,且任務隊列已滿,則創建新的線程執行提交的任務
5.如果當前線程數 > 最大線程數,且隊列已滿,則拒絕該任務
addWorker()方法邏輯
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取當前池狀態
int rs = runStateOf(c);
// 1.判斷如果線程池狀態 > SHUTDOWN,直接返回false,否則2
// 2.如果線程池狀態 = SHUTDOWN,並且firstTask不為null則直接返回false,因為SHUTDOWN狀態的線程池不能在接受新任務,否則3
// 3.如果線程池狀態 = SHUTDOWN,並且firstTask == null,此時如果任務隊列為空,則直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 1.如果當前線程池線程數大於等於CAPACITY(理論上的最大值5億),則返回fasle
// 2.如果創建核心線程情況下當前池線程數 >= corePoolSize,則返回false
// 3.如果創建非核心線程情況下當前池線程數 >= maximumPoolSize,則返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas 增加當前池線程數量,成功則退出循環
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// cas 增加當前池線程數量失敗(多線程並發),則重新獲取ctl,計算出當前線程池狀態,如果不等於上述計算的狀態rs,則說明線程池狀態發生了改變,需要跳到外層循環重新進行狀態判斷,否則執行內部循環
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 至此說明線程池狀態校驗通過,且增加池線程數量成功,則創建一個Worker線程來執行任務
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 訪問worker set時需要獲取mainLock全局鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 1.當前池狀態 < SHUTDOWN,也就是RUNNING狀態,如果已經started,拋出異常
// 2.當前池狀態 = SHUTDOWN,且firstTask == null,需要處理任務隊列中的任務,如果已經started,拋出異常
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 剛創建線程添加到workers集合中
workers.add(w);
int s = workers.size();
// 判斷更新歷史最大線程數量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動新建線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 啟動失敗,workerCount--,workers里移除該worker
addWorkerFailed(w);
}
return workerStarted;
}
線程池中的線程並不是直接用的Thread類,而是定義了一個內部工作線程Worker類,實現了AQS以及Runnable接口,然后持有一個Thread類的引用及一個firstTask(創建后第一個要執行的任務),每個Worker線程啟動后會執行run()方法,該方法會調用執行外層runWorker(Worker w)方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 1.如果task不為空,則作為該線程的第一個任務直接執行
// 2.如果task為空,則通過getTask()方法從任務隊列中獲取任務執行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 線程池狀態 >= STOP,則中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 實際執行任務前調用的鈎子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 實際執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任務執行后調用的鈎子方法
afterExecute(task, thrown);
}
} finally {
// 任務置為null,重新獲取新任務,完成數++
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 無任務可執行,執行worker銷毀邏輯
processWorkerExit(w, completedAbruptly);
}
}
getTask()方法邏輯
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 以下兩種情況遞減工作線程數量
// 1. rs >= STOP
// 2. rs == SHUTDOWN && workQueue.isEmpty()
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 允許核心線程超時 或者 當前線程數 > 核心線程數,有可能發生超時關閉
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc什么情況 > maximumPoolSize,調用setMaximumPoolSize()方法將maximumPoolSize調小了,會發生這種情況,此時需要關閉多余線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 阻塞隊列獲取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 發生中斷,進行重試
timedOut = false;
}
}
}
以上內容比較詳細的介紹了ThreadPoolExecutor的繼承體系,以及相關的核心源碼,基於此,現在我們來看DynamicTp提供的告警通知能力。
核心參數變更通知
對應配置中心的監聽端監聽到配置變更后,封裝到DtpProperties中然后交由DtpRegistry類中的refresh()方法去做配置更新,同時通知時會高亮顯示有變更的字段
線程池活躍度告警
活躍度 = activeCount / maximumPoolSize
服務啟動后會開啟一個定時監控任務,每隔一定時間(可配置)去計算線程池的活躍度,達到配置的threshold閾值后會觸發一次告警,告警間隔內多次觸發不會發送告警通知
隊列容量告警
容量使用率 = queueSize / queueCapacity
服務啟動后會開啟一個定時監控任務,每隔一定時間去計算任務隊列的使用率,達到配置的threshold閾值后會觸發一次告警,告警間隔內多次觸發不會發送告警通知
拒絕策略告警
/**
* Do sth before reject.
* @param executor ThreadPoolExecutor instance
*/
default void beforeReject(ThreadPoolExecutor executor) {
if (executor instanceof DtpExecutor) {
DtpExecutor dtpExecutor = (DtpExecutor) executor;
dtpExecutor.incRejectCount(1);
Runnable runnable = () -> AlarmManager.doAlarm(dtpExecutor, REJECT);
AlarmManager.triggerAlarm(dtpExecutor.getThreadPoolName(), REJECT.getValue(), runnable);
}
}
線程池線程數達到配置的最大線程數,且任務隊列已滿,再提交任務會觸發拒絕策略。DtpExecutor線程池用到的RejectedExecutionHandler是經過動態代理包裝過的,在執行具體的拒絕策略之前會執行RejectedAware類beforeReject()方法,此方法會去做拒絕數量累加(總數值累加、周期值累加)。且判斷如果周期累計值達到配置的閾值,則會觸發一次告警通知(同時重置周期累加值為0及上次告警時間為當前時間),告警間隔內多次觸發不會發送告警通知
任務隊列超時告警
重寫ThreadPoolExecutor的execute()方法和beforeExecute()方法,如果配置了執行超時或排隊超時值,則會用DtpRunnable包裝任務,同時記錄任務的提交時間submitTime,beforeExecute根據當前時間和submitTime的差值就可以計算到該任務在隊列中的等待時間,然后判斷如果差值大於配置的queueTimeout則累加排隊超時任務數量(總數值累加、周期值累加)。且判斷如果周期累計值達到配置的閾值,則會觸發一次告警通知(同時重置周期累加值為0及上次告警時間為當前時間),告警間隔內多次觸發不會發送告警通知
@Override
public void execute(Runnable command) {
if (CollUtil.isNotEmpty(taskWrappers)) {
for (TaskWrapper t : taskWrappers) {
command = t.wrap(command);
}
}
if (runTimeout > 0 || queueTimeout > 0) {
command = new DtpRunnable(command);
}
super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (!(r instanceof DtpRunnable)) {
super.beforeExecute(t, r);
return;
}
DtpRunnable runnable = (DtpRunnable) r;
long currTime = System.currentTimeMillis();
if (runTimeout > 0) {
runnable.setStartTime(currTime);
}
if (queueTimeout > 0) {
long waitTime = currTime - runnable.getSubmitTime();
if (waitTime > queueTimeout) {
queueTimeoutCount.incrementAndGet();
Runnable alarmTask = () -> AlarmManager.doAlarm(this, QUEUE_TIMEOUT);
AlarmManager.triggerAlarm(this.getThreadPoolName(), QUEUE_TIMEOUT.getValue(), alarmTask);
}
}
super.beforeExecute(t, r);
}
任務執行超時告警
重寫ThreadPoolExecutor的afterExecute()方法,根據當前時間和beforeExecute()中設置的startTime的差值即可算出任務的實際執行時間,然后判斷如果差值大於配置的runTimeout則累加排隊超時任務數量(總數值累加、周期值累加)。且判斷如果周期累計值達到配置的閾值,則會觸發一次告警通知(同時重置周期累加值為0及上次告警時間為當前時間),告警間隔內多次觸發不會發送告警通知
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (runTimeout > 0) {
DtpRunnable runnable = (DtpRunnable) r;
long runTime = System.currentTimeMillis() - runnable.getStartTime();
if (runTime > runTimeout) {
runTimeoutCount.incrementAndGet();
Runnable alarmTask = () -> AlarmManager.doAlarm(this, RUN_TIMEOUT);
AlarmManager.triggerAlarm(this.getThreadPoolName(), RUN_TIMEOUT.getValue(), alarmTask);
}
}
super.afterExecute(r, t);
}
告警通知相關配置項
如果想使用通知告警功能,配置文件必須要配置platforms字段,且可以配置多個平台,如釘釘、企微等;notifyItems配置具體告警項,包括閾值、平台、告警間隔等。
spring:
dynamic:
tp:
# 省略其他項
platforms: # 通知平台
- platform: wechat
urlKey: 38a98-0c5c3b649c
receivers: test
- platform: ding
urlKey: f80db3e801d593604f4a08dcd6a
secret: SECb5444a6f375d5b9d21
receivers: 17811511815
executors: # 動態線程池配置,都有默認值,采用默認值的可以不配置該項,減少配置量
- threadPoolName: dtpExecutor1
executorType: common # 線程池類型common、eager:適用於io密集型
corePoolSize: 2
maximumPoolSize: 4
queueCapacity: 200
queueType: VariableLinkedBlockingQueue # 任務隊列,查看源碼QueueTypeEnum枚舉類
rejectedHandlerType: CallerRunsPolicy # 拒絕策略,查看RejectedTypeEnum枚舉類
keepAliveTime: 50
allowCoreThreadTimeOut: false
threadNamePrefix: dtp1 # 線程名前綴
waitForTasksToCompleteOnShutdown: false # 參考spring線程池設計
awaitTerminationSeconds: 5 # 單位(s)
preStartAllCoreThreads: false # 是否預熱核心線程,默認false
runTimeout: 200 # 任務執行超時閾值,目前只做告警用,單位(ms)
queueTimeout: 100 # 任務在隊列等待超時閾值,目前只做告警用,單位(ms)
taskWrapperNames: ["ttl"] # 任務包裝器名稱,集成TaskWrapper接口
notifyItems: # 報警項,不配置自動會按默認值配置(變更通知、容量報警、活性報警、拒絕報警、任務超時報警)
- type: capacity # 報警項類型,查看源碼 NotifyTypeEnum枚舉類
threshold: 80 # 報警閾值
platforms: [ding,wechat] # 可選配置,不配置默認拿上層platforms配置的所以平台
interval: 120 # 報警間隔(單位:s)
- type: change
- type: liveness
threshold: 80
interval: 120
- type: reject
threshold: 1
interval: 160
- type: run_timeout
threshold: 1
interval: 120
- type: queue_timeout
threshold: 1
interval: 140
總結
本文開頭介紹了線程池ThreadPoolExecutor的繼承體系,核心流程的源碼解讀。然后介紹了DynamicTp提供的以上6種告警通知能力,希望通過監控+告警可以讓我們及時感知到我們業務線程池的執行負載情況,第一時間做出調整,防止事故的發生。
聯系我
對項目有什么想法或者建議,可以加我微信交流,或者創建issues,一起完善項目
公眾號:CodeFox
微信:yanhom1314