1.ScheduledThreadPoolExecutor
整體結構剖析。
1.1類圖介紹
根據上面類圖圖可以看到Executor其實是一個工具類,里面提供了好多靜態方法,根據用戶選擇返回不同的線程池實例。可以看到ScheduledThreadPoolExecutor
繼承了 ThreadPoolExecutor
並實現 ScheduledExecutorService
接口。線程池隊列是 DelayedWorkQueue
,和 DelayedQueue
類似是一個延遲隊列。
ScheduledFutureTask
是具有返回值的任務,繼承自 FutureTask,FutureTask 內部有個變量 state 用來表示任務的狀態,一開始狀態為 NEW,所有狀態為:
private static final int NEW = 0;//初始狀態 private static final int COMPLETING = 1;//執行中狀態 private static final int NORMAL = 2;//正常運行結束狀態 private static final int EXCEPTIONAL = 3;//運行中異常 private static final int CANCELLED = 4;//任務被取消 private static final int INTERRUPTING = 5;//任務正在被中斷 private static final int INTERRUPTED = 6;//任務已經被中斷
FutureTask可能的任務狀態轉換路徑如下所示:
NEW -> COMPLETING -> NORMAL //初始狀態->執行中->正常結束 NEW -> COMPLETING -> EXCEPTIONAL//初始狀態->執行中->執行異常 NEW -> CANCELLED//初始狀態->任務取消 NEW -> INTERRUPTING -> INTERRUPTED//初始狀態->被中斷中->被中斷
其實ScheduledFutureTask
內部還有個變量 period 用來表示任務的類型,其任務類型如下:
-
period=0,說明當前任務是一次性的,執行完畢后就退出了。
-
period 為負數,說明當前任務為 fixed-delay 任務,是定時可重復執行任務。
-
period 為整數,說明當前任務為 fixed-rate 任務,是定時可重復執行任務。
接下來我們可以看到ScheduledThreadPoolExecutor
的造函數如下
//使用改造后的Delayqueue. public ScheduledThreadPoolExecutor(int corePoolSize) { //調用父類ThreadPoolExecutor的構造函數 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
根據上面代碼可以看到線程池隊列是 DelayedWorkQueue
2、原理分析
我們主要看三個重要的函數,如下所示:
schedule(Runnable command, long delay,TimeUnit unit) scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
2.1、schedule(Runnable command, long delay,TimeUnit unit)
方法
該方法作用是提交一個延遲執行的任務,任務從提交時間算起延遲 unit 單位的 delay 時間后開始執行,提交的任務不是周期性任務,任務只會執行一次,代碼如下:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { //(1)參數校驗 if (command == null || unit == null) throw new NullPointerException(); //(2)任務轉換 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); //(3)添加任務到延遲隊列 delayedExecute(t); return t; }
可以看到上面代碼所示,代碼(1)參數校驗,如果 command 或者 unit 為 null,拋出 NPE 異常。
代碼(2)裝飾任務,把提交的 command 任務轉換為 ScheduledFutureTask
,ScheduledFutureTask
是具體放入到延遲隊列里面的東西,由於是延遲任務,所以 ScheduledFutureTask
實現了 long getDelay(TimeUnit unit)
和 int compareTo(Delayed other)
方法,triggerTime 方法轉換延遲時間為絕對時間,也就是把當前時間的納秒數加上延遲的納秒數后的 long 型值。
接下來我們需要看 ScheduledFutureTask
的構造函數,如下所示:
ScheduledFutureTask(Runnable r, V result, long ns) { //調用父類FutureTask的構造函數 super(r, result); this.time = ns; this.period = 0;//period為0,說明為一次性任務 this.sequenceNumber = sequencer.getAndIncrement(); }
根據構造函數可以看到內部首先調用了父類 FutureTask 的構造函數,父類 FutureTask 的構造函數代碼如下:
//通過適配器把runnable轉換為callable public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; //設置當前任務狀態為NEW }
根據上面代碼可以看到FutureTask 中任務又被轉換為了 Callable 類型后,保存到了變量 this.callable 里面,並設置 FutureTask 的任務狀態為 NEW。
然后 ScheduledFutureTask
構造函數內部設置 time 為上面說的絕對時間,需要注意這里 period 的值為 0,這說明當前任務為一次性任務,不是定時反復執行任務。
其中 long getDelay(TimeUnit unit)
方法代碼如下,用來獲取當前任務還有多少時間就過期了,代碼如下所示:
//元素過期算法,裝飾后時間-當前時間,就是即將過期剩余時間 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
接下來接着看compareTo(Delayed other)
方法,代碼如下:
public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); }
根據上面代碼的執行邏輯,可以看到compareTo 作用是加入元素到延遲隊列后,內部建立或者調整堆時候會使用該元素的 compareTo 方法與隊列里面其他元素進行比較,讓最快要過期的元素放到隊首。所以無論什么時候向隊列里面添加元素,隊首的的元素都是最即將過期的元素。
接下來接着看代碼(3)添加任務到延遲隊列,delayedExecute 的代碼如下:
private void delayedExecute(RunnableScheduledFuture<?> task) { //(4)如果線程池關閉了,則執行線程池拒絕策略 if (isShutdown()) reject(task); else { //(5)添加任務到延遲隊列 super.getQueue().add(task); //(6)再次檢查線程池狀態 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //(7)確保至少一個線程在處理任務 ensurePrestart(); } }
可以看到代碼(4)首先判斷當前線程池是否已經關閉了,如果已經關閉則執行線程池的拒絕策略(如果不知道線程池的拒絕策略可以看前一篇線程池的介紹。)
否者執行代碼(5)添加任務到延遲隊列。添加完畢后還要重新檢查線程池是否被關閉了,如果已經關閉則從延遲隊列里面刪除剛才添加的任務,但是有可能線程池線程已經從任務隊列里面移除了該任務,也就是該任務已經在執行了,所以還需要調用任務的 cancle 方法取消任務。
如果代碼(6)判斷結果為 false,則會執行代碼(7)確保至少有一個線程在處理任務,即使核心線程數 corePoolSize 被設置為 0.
ensurePrestart 代碼如下:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); //增加核心線程數 if (wc < corePoolSize) addWorker(null, true); //如果初始化corePoolSize==0,則也添加一個線程。 else if (wc == 0) addWorker(null, false); } }
如上代碼首先首先獲取線程池中線程個數,如果線程個數小於核心線程數則新增一個線程,否者如果當前線程數為 0 則新增一個線程。
通過上面代碼我們分析了如何添加任務到延遲隊列,下面我們看線程池里面的線程如何獲取並執行任務的,從前面講解的 ThreadPoolExecutor
我們知道具體執行任務的線程是 Worker 線程,Worker 線程里面調用具體任務的 run 方法進行執行,由於這里任務是 ScheduledFutureTask
,所以我們下面看看 ScheduledFutureTask
的 run 方法。代碼如下:
public void run() { //(8)是否只執行一次 boolean periodic = isPeriodic(); //(9)取消任務 if (!canRunInCurrentRunState(periodic)) cancel(false); //(10)只執行一次,調用schdule時候 else if (!periodic) ScheduledFutureTask.super.run(); //(11)定時執行 else if (ScheduledFutureTask.super.runAndReset()) { //(11.1)設置time=time+period setNextRunTime(); //(11.2)重新加入該任務到delay隊列 reExecutePeriodic(outerTask); } }
可以看到代碼(8)isPeriodic 的作用是判斷當前任務是一次性任務還是可重復執行的任務,isPeriodic 的代碼如下:
public boolean isPeriodic() { return period != 0; }
可知內部是通過 period 的值來判斷,由於轉換任務創建 ScheduledFutureTask 時候傳遞的 period 為 0 ,所以這里 isPeriodic 返回 false。
代碼(9)判斷當前任務是否應該被取消,canRunInCurrentRunState 的代碼如下:
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
這里傳遞的 periodic 為 false,所以 isRunningOrShutdown
的參數為 executeExistingDelayedTasksAfterShutdown
,executeExistingDelayedTasksAfterShutdown
默認是 true 標示當其它線程調用了 shutdown 命令關閉了線程池后,當前任務還是要執行,否者如果為 false,標示當前任務要被取消。
由於 periodic 為 false,所以執行代碼(10)調用父類 FutureTask 的 run 方法具體執行任務,FutureTask 的 run 方法代碼如下:
public void run() { //(12) if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; //(13) try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //(13.1) setException(ex); } //(13.2) if (ran) set(result); } } finally { ...省略 } }
可以看到代碼(12)如果任務狀態不是 NEW 則直接返回,或者如果當前任務狀態為NEW但是使用 CAS 設置當然任務的持有者為當前線程失敗則直接返回。代碼(13)具體調用 callable 的 call 方法執行任務,這里在調用前又判斷了任務的狀態是否為 NEW 是為了避免在執行代碼(12)后其他線程修改了任務的狀態(比如取消了該任務)。
如果任務執行成功則執行代碼(13.2)修改任務狀態,set 方法代碼如下:
protected void set(V v) { //如果當前任務狀態為NEW,則設置為COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //設置當前任務終狀為NORMAL,也就是任務正常結束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
如上代碼首先 CAS 設置當前任務狀態從 NEW 轉換到 COMPLETING,這里多個線程調用時候只有一個線程會成功,成功的線程在通過 UNSAFE.putOrderedInt
設置任務的狀態為正常結束狀態,這里沒有用 CAS 是因為同一個任務只可能有一個線程可以運行到這里,這里使用 putOrderedInt
比使用 CAS 函數或者 putLongVolatile
效率要高,並且這里的場景不要求其它線程馬上對設置的狀態值可見。
這里思考個問題,這里什么時候多個線程會同時執行 CAS 設置任務狀態從態從 NEW 到 COMPLETING?其實當同一個 comand 被多次提交到線程池時候就會存在這樣的情況,由於同一個任務共享一個狀態值 state。
如果任務執行失敗,則執行代碼(13.1),setException 的代碼如下,可見與 set 函數類似,代碼如下:
protected void setException(Throwable t) { //如果當前任務狀態為NEW,則設置為COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; //設置當前任務終態為EXCEPTIONAL,也就是任務非正常結束 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion(); } }
到這里代碼(10)邏輯執行完畢,一次性任務也就執行完畢了,
下面會講到如果任務是可重復執行的,則不會執行步驟(10)而是執行代碼(11)。
2.2 scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)方法
當任務執行完畢后,延遲固定間隔時間后再次運行(fixed-delay 任務):其中 initialDelay 說明提交任務后延遲多少時間開始執行任務 command,delay 表示當任務執行完畢后延長多少時間后再次運行 command 任務,unit 是 initialDelay 和 delay 的時間單位。任務會一直重復運行直到任務運行時候拋出了異常或者取消了任務,或者關閉了線程池。scheduleWithFixedDelay
的代碼如下:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { //(14)參數校驗 if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); //(15)任務轉換,注意這里是period=-delay<0 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; //(16)添加任務到隊列 delayedExecute(t); return t;
}
如上代碼(14)進行參數校驗,校驗失敗則拋出異常,代碼(15)轉換 command 任務為 ScheduledFutureTask
,這里需要注意的是這里傳遞給 ScheduledFutureTask
的 period 變量的值為 -delay,period < 0 這個說明該任務為可重復執行的任務。然后代碼(16)添加任務到延遲隊列后返回。
任務添加到延遲隊列后線程池線程會從隊列里面獲取任務,然后調用 ScheduledFutureTask
的 run 方法執行,由於這里 period<0 所以 isPeriodic 返回 true,所以執行代碼(11),runAndReset 的代碼如下:
protected boolean runAndReset() { //(17) if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; //(18) boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { ... } return ran && s == NEW;//(19) }
該代碼和 FutureTask 的 run 類似,只是任務正常執行完畢后不會設置任務的狀態,這樣做是為了讓任務成為可重復執行的任務,這里多了代碼(19)如果當前任務正常執行完畢並且任務狀態為 NEW 則返回 true 否者返回 false。
如果返回了 true 則執行代碼(11.1)setNextRunTime
方法設置該任務下一次的執行時間,setNextRunTime
的代碼如下:
private void setNextRunTime() { long p = period; if (p > 0)//fixed-rate類型任務 time += p; else//fixed-delay類型任務 time = triggerTime(-p); }
如上代碼這里 p < 0 說明當前任務為 fixed-delay
類型任務,然后設置 time 為當前時間加上 -p
的時間,也就是延遲 -p
時間后在次執行。
總結:本節介紹的 fixed-delay
類型的任務的執行實現原理如下,當添加一個任務到延遲隊列后,等 initialDelay 時間后,任務就會過期,過期的任務就會被從隊列移除,並執行,執行完畢后,會重新設置任務的延遲時間,然后在把任務放入延遲隊列實現的,依次往復。需要注意的是如果一個任務在執行某一個次時候拋出了異常,那么這個任務就結束了,但是不影響其它任務的執行。
2.3、scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)方法
相對起始時間點固定頻率調用指定的任務(fixed-rate 任務):當提交任務到線程池后延遲 initialDelay 個時間單位為 unit 的時間后開始執行任務 comand ,然后 initialDelay + period
時間點再次執行,然后在 initialDelay + 2 * period
時間點再次執行,依次往復,直到拋出異常或者調用了任務的 cancel 方法取消了任務在結束或者關閉了線程池。
scheduleAtFixedRate
的原理與 scheduleWithFixedDelay
類似,下面我們講下不同點,首先調用 scheduleAtFixedRate
時候代碼如下:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { ... //裝飾任務類,注意period=period>0,不是負的 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); ... return t; }
如上代碼 fixed-rate
類型的任務在轉換 command
任務為 ScheduledFutureTask
的時候設置的 period=period
不在是 -period
。
所以當前任務執行完畢后,調用 setNextRunTime
設置任務下次執行的時間時候執行的是 time += p
而不在是 time = triggerTime(-p);
。
總結:相對於 fixed-delay
任務來說,fixed-rate
方式執行規則為時間為 initdelday + n*period;
時候啟動任務,但是如果當前任務還沒有執行完,下一次要執行任務的時間到了,不會並發執行,下次要執行的任務會延遲執行,要等到當前任務執行完畢后在執行一個任務。
3、總結
ScheduledThreadPoolExecutor
的實現原理,其內部使用的 DelayQueue
來存放具體任務,其中任務分為三種,其中一次性執行任務執行完畢就結束了,fixed-delay
任務保證同一個任務多次執行之間間隔固定時間,fixed-rate
任務保證任務執行按照固定的頻率執行,其中任務類型使用 period
的值來區分。