文本將主要講述 ThreadPoolExecutor 一個特殊的子類 ScheduledThreadPoolExecutor,主要用於執行周期性任務;所以在看本文之前最好先了解一下 ThreadPoolExecutor ,可以參考 ThreadPoolExecutor 詳解;另外 ScheduledThreadPoolExecutor 中使用了延遲隊列,主要是基於完全二叉堆實現的,可以參考 完全二叉堆;
一、ScheduledThreadPoolExecutor 結構概述
1. 繼承關系
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor implements ScheduledExecutorService {}
在源碼中可以看到,ScheduledThreadPoolExecutor 的狀態管理、入隊操作、拒絕操作等都是繼承於 ThreadPoolExecutor;ScheduledThreadPoolExecutor 主要是提供了周期任務和延遲任務相關的操作;
- schedule(Runnable command, long delay, TimeUnit unit) // 無返回值的延遲任務
- schedule(Callable
callable, long delay, TimeUnit unit) // 有返回值的延遲任務 - scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 固定頻率周期任務
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 固定延遲周期任務
就 `ScheduledThreadPoolExecutor` 的運行邏輯而言,大致可以表述為:
- 首先將 Runnable/Callable 封裝為 ScheduledFutureTask,延遲時間作為比較屬性;
- 然后加入 DelayedWorkQueue 隊列中,每次取出隊首延遲最小的任務,超時等待,然后執行;
- 最后判斷是否為周期任務,然后將其重新加入 DelayedWorkQueue 隊列中;
其內部結構如圖所示:
這里需要注意的:
- ScheduledThreadPoolExecutor 中的隊列不能指定,只能是 DelayedWorkQueue;因為他是 無界隊列,所以再添加任務的時候線程最多可以增加到 coreSize,這里不清楚的可以查看 ThreadPoolExecutor 詳解 ,就不再重復了;
- ScheduledThreadPoolExecutor 重寫了 ThreadPoolExecutor 的 execute() 方法,其執行的核心方法變成 delayedExecute();
2. ScheduledFutureTask
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber; // 任務序號,從 AtomicLong sequencer 獲取,當延遲時間相同時,序號小的先出
private long time; // 下次任務執行時間
private final long period; // 0 表示非周期任務,正值表示固定頻率周期任務,負值表示固定延遲周期任務
RunnableScheduledFuture<V> outerTask = this; // 重復執行的任務,傳入的任務可以使用 decorateTask() 重新包裝
int heapIndex; // 隊列索引
}
其中最重要的方法必然是 run 方法了:
public void run() {
boolean periodic = isPeriodic(); // 是否為周期任務,period != 0
if (!canRunInCurrentRunState(periodic)) // 當前狀態能否繼續運行,詳細測試后面還會講到
cancel(false); // 取消任務
else if (!periodic) // 不是周期任務時,直接運行
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) { // 時周期任務
setNextRunTime(); // 設置下次執行時間
reExecutePeriodic(outerTask); // 重新入隊
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning); // 設置中斷狀態
if (cancelled && removeOnCancel && heapIndex >= 0) // 當設置 removeOnCancel 狀態時,移除任務
remove(this); // 默認為 false
return cancelled;
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) { // 如果當前狀態可以執行
super.getQueue().add(task); // 則重新入隊
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else ensurePrestart(); // 確保有線程執行任務
}
}
此外還有 DelayedWorkQueue,但是這里不准備講了,可以查看 完全二叉堆 了解實現的原理;
二、scheduleAtFixedRate 與 scheduleWithFixedDelay
scheduleAtFixedRate 和 scheduleWithFixedDelay 是我們最常用的兩個方法,但是他們的區別可能不是很清楚,這里重點講一下,
1. scheduleAtFixedRate
// 測試
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
sleep(1000); // 睡眠 1s,
log.info("run task");
}, 1, 2, TimeUnit.SECONDS); // 延遲 1s,周期 2s
// 打印
[19:41:28,489 INFO ] [pool-1-thread-1] - run task
[19:41:30,482 INFO ] [pool-1-thread-1] - run task
[19:41:32,483 INFO ] [pool-1-thread-1] - run task
[19:41:34,480 INFO ] [pool-1-thread-1] - run task
可以看到的確時固定周期 2s 執行的,但是如果任務執行時間超過周期呢?
// 測試
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
int i = 2000 + random.nextInt(3) * 1000;
sleep(i);
log.info("run task, sleep :{}", i);
}, 1, 2, TimeUnit.SECONDS); // 延遲 1s,周期 2s
// 打印
[19:42:53,428 INFO ] [pool-1-thread-1] - run task, sleep :2000
[19:42:55,430 INFO ] [pool-1-thread-1] - run task, sleep :2000
[19:42:59,430 INFO ] [pool-1-thread-1] - run task, sleep :4000
[19:43:02,434 INFO ] [pool-1-thread-1] - run task, sleep :3000
[19:43:06,436 INFO ] [pool-1-thread-1] - run task, sleep :4000
可以看到如果任務執行時間超出周期時,下一次任務會立刻運行;就好像周期是一個有彈性的袋子,能裝下運行時間的時候,是固定大小,裝不下的時候就會被撐大,圖像化表示如下:
2. scheduleWithFixedDelay
// 測試
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
int i = 1000 + random.nextInt(5) * 1000;
sleep(i);
log.info("run task, sleep :{}", i);
}, 1, 2, TimeUnit.SECONDS); // 延遲 1s,周期 2s
// 打印
[20:05:40,682 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:05:45,686 INFO ] [pool-1-thread-1] - run task, sleep :3000
[20:05:49,689 INFO ] [pool-1-thread-1] - run task, sleep :2000
[20:05:55,690 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:06:01,692 INFO ] [pool-1-thread-1] - run task, sleep :4000
可以看到無論執行時間是多少,其結果都是在執行完畢后,停頓固定的時間,然后執行下一次任務,其圖形化表示為:
三、 源碼分析
1. 延遲任務
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null) throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(
command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null || unit == null) throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(
callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
可以看到所有的周期任務,最終執行的都是 delayedExecute 方法,其中 decorateTask 是一個鈎子函數,其之類可以利用他對任務進行重構過濾等操作;
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) reject(task); // 如果線程池已經關閉,則拒絕任務
else {
super.getQueue().add(task); // 任務入隊
if (isShutdown() && // 再次檢查,線程池是否關閉
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart(); // 確保有線程執行任務
}
}
2. 周期任務
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit) {
if (command == null || unit == null) throw new NullPointerException();
if (period <= 0) throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period)); // 注意這里添加的是正值
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
if (command == null || unit == null) throw new NullPointerException();
if (delay <= 0) throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay)); // 注意這里添加的是負值
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
從上面代碼可以看到 scheduleAtFixedRate 和 scheduleWithFixedDelay 只有周期任務的時間不同,其他的都一樣,那么下面我們看一下他們的任務時間計算;
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
private void setNextRunTime() {
long p = period;
if (p > 0) // 正值表示 scheduleAtFixedRate
time += p; // 不管任務執行時間,直接加上周期時間,也就是一次任務超時,會影響后續任務的執行,
// 超時的時候,getDelay 是負值,所以在延遲隊列中必然排在最前面,立刻被取出執行
else
time = triggerTime(-p); // 計算觸發時間
}
long triggerTime(long delay) { // 這里可以看到,每次的確是在當前時間的基礎上,加上延遲時間;
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
這里特別要注意 scheduleAtFixedRate 一次任務超時,會持續影響后面的任務周期安排,所以在設定周期的時候要特別注意; 例如:
// 測試
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
pool.scheduleAtFixedRate(() -> {
int i = random.nextInt(5) * 1000;
sleep(i);
log.info("run task, sleep :{}", i);
}, 1, 2, TimeUnit.SECONDS);
// 打印
[20:29:11,310 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:16,304 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:29:19,304 INFO ] [pool-1-thread-1] - run task, sleep :3000
[20:29:21,305 INFO ] [pool-1-thread-1] - run task, sleep :2000
[20:29:22,305 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:23,306 INFO ] [pool-1-thread-1] - run task, sleep :1000
[20:29:27,306 INFO ] [pool-1-thread-1] - run task, sleep :4000
[20:29:30,307 INFO ] [pool-1-thread-1] - run task, sleep :3000
如圖所示:
3. 取消任務
private volatile boolean continueExistingPeriodicTasksAfterShutdown; //關閉后繼續執行周期任務,默認false
private volatile boolean executeExistingDelayedTasksAfterShutdown = true; //關閉后繼續執行延遲任務,默認true
private volatile boolean removeOnCancel = false; // 取消任務是,從隊列中刪除任務,默認 false
@Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // 繼續延遲任務
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 繼續周期任務
if (!keepDelayed && !keepPeriodic) { // 都是 false,直接清除
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();
}
總結
- scheduleAtFixedRate,固定頻率周期任務,注意一次任務超時,會持續的影響后續的任務周期;
- scheduleWithFixedDelay,固定延遲周期任務,即每次任務結束后,超時等待固定時間;
- 此外 ScheduledThreadPoolExecutor 線程最多為核心線程,最大線程數不起作用,因為 DelayedWorkQueue 是無界隊列;
