ScheduledThreadPoolExecutor是ThreadPoolExecutor的子類,同時實現了ScheduledExecutorService接口。
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService
ScheduledThreadPoolExecutor的功能主要有兩點:在固定的時間點執行(也可以認為是延遲執行),重復執行。
和分析ThreadPoolExecutor時一樣,首先來看核心方法execute:
public void execute(Runnable command) { schedule(command, 0, TimeUnit.NANOSECONDS); }
execute方法調用了另外一個方法schedule,同時我們發現三個submit方法也是同樣調用了schedule方法,因為有兩種類型的任務:Callable和Runnable,因此schedule也有兩個重載方法。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }
兩個方法邏輯基本一致,都是把任務包裝成RunnableScheduledFuture對象,然后調用delayedExecute來實現延遲執行。任務包裝類繼承自ThreadPoolExecutor的包裝類RunnableFuture,同時實現ScheduledFuture接口使包裝類具有了延遲執行和重復執行這些功能以匹配ScheduledThreadPoolExecutor。
因此首先來看ScheduledFutureTask,以下是ScheduledFutureTask專有的幾個變量:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** 針對線程池所有任務的序列號 */ private final long sequenceNumber; /** 距離任務開始執行的時間,納秒為單位 */ private long time; /** * 重復執行任務的間隔,即每隔多少時間執行一次任務 */ private final long period; /** 重復執行任務和排隊時用這個類型的對象, */ RunnableScheduledFuture<V> outerTask = this; /** * 在延遲隊列的索引,這樣取消任務時使用索引會加快查找速度 */ int heapIndex;
來看核心方法run:
public void run() { boolean periodic = isPeriodic(); // 檢測是否可以運行任務,這里涉及到另外兩個變量:continueExistingPeriodicTasksAfterShutdown // 和executeExistingDelayedTasksAfterShutdown // 前者允許在shutdown之后繼續執行重復執行的任務 // 后者允許在shutdown之后繼續執行延時執行的任務, // 因此這里根據任務是否為periodic來決定采用哪個選項,然后 // 如果線程池正在運行,那么肯定可以執行 // 如果正在shutdown,那么要看選項的值是否為true來決定是否允許執行任務 // 如果不被允許的話,就會取消任務 if (!canRunInCurrentRunState(periodic)) cancel(false); // 如果可以執行任務,對於不用重復執行的任務,直接執行即可 else if (!periodic) ScheduledFutureTask.super.run(); // 對於需要重復執行的任務,則執行一次,然后reset // 更新一下下次執行的時間,調用reExecutePeriodic更新任務在執行隊列的 // 位置(其實就是添加到隊列的末尾) else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
因此這里可以得出關於重復執行的實現:任務執行一次,Reset狀態,重新加入到任務隊列。
回到delayedExecute,它可以保證任務在准確時間點執行,來看delayedExecute是如果實現延遲執行的:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
乍看之下,發現也就是把任務加入到任務隊列中,那么這個延時執行的功能是如何實現的,秘密就在任務隊列的實現。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
ScheduledThreadPoolExecutor的任務隊列不是普通的BlockingQueue,而是一個特殊的實現DelayedWorkQueue。下一篇文章就來說說這個DelayedWorkQueue。