《java.util.concurrent 包源碼閱讀》14 線程池系列之ScheduledThreadPoolExecutor 第一部分


ScheduledThreadPoolExecutor是ThreadPoolExecutor的子類,同時實現了ScheduledExecutorService接口。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService

 

ScheduledThreadPoolExecutor的功能主要有兩點:在固定的時間點執行(也可以認為是延遲執行),重復執行。

和分析ThreadPoolExecutor時一樣,首先來看核心方法execute:

    public void execute(Runnable command) {
        schedule(command, 0, TimeUnit.NANOSECONDS);
    }

execute方法調用了另外一個方法schedule,同時我們發現三個submit方法也是同樣調用了schedule方法,因為有兩種類型的任務:Callable和Runnable,因此schedule也有兩個重載方法。

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

兩個方法邏輯基本一致,都是把任務包裝成RunnableScheduledFuture對象,然后調用delayedExecute來實現延遲執行。任務包裝類繼承自ThreadPoolExecutor的包裝類RunnableFuture,同時實現ScheduledFuture接口使包裝類具有了延遲執行和重復執行這些功能以匹配ScheduledThreadPoolExecutor。

因此首先來看ScheduledFutureTask,以下是ScheduledFutureTask專有的幾個變量:

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        /** 針對線程池所有任務的序列號 */
        private final long sequenceNumber;

        /** 距離任務開始執行的時間,納秒為單位 */
        private long time;

        /**
         * 重復執行任務的間隔,即每隔多少時間執行一次任務
         */
        private final long period;

        /** 重復執行任務和排隊時用這個類型的對象, */
        RunnableScheduledFuture<V> outerTask = this;

        /**
         * 在延遲隊列的索引,這樣取消任務時使用索引會加快查找速度
         */
        int heapIndex;

來看核心方法run:

        public void run() {
            boolean periodic = isPeriodic();
            // 檢測是否可以運行任務,這里涉及到另外兩個變量:continueExistingPeriodicTasksAfterShutdown
            // 和executeExistingDelayedTasksAfterShutdown
            // 前者允許在shutdown之后繼續執行重復執行的任務
            // 后者允許在shutdown之后繼續執行延時執行的任務,
            // 因此這里根據任務是否為periodic來決定采用哪個選項,然后
            // 如果線程池正在運行,那么肯定可以執行
            // 如果正在shutdown,那么要看選項的值是否為true來決定是否允許執行任務
            // 如果不被允許的話,就會取消任務
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            // 如果可以執行任務,對於不用重復執行的任務,直接執行即可
            else if (!periodic)
                ScheduledFutureTask.super.run();
            // 對於需要重復執行的任務,則執行一次,然后reset
            // 更新一下下次執行的時間,調用reExecutePeriodic更新任務在執行隊列的
            // 位置(其實就是添加到隊列的末尾)
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

 

因此這里可以得出關於重復執行的實現:任務執行一次,Reset狀態,重新加入到任務隊列。

回到delayedExecute,它可以保證任務在准確時間點執行,來看delayedExecute是如果實現延遲執行的:

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

乍看之下,發現也就是把任務加入到任務隊列中,那么這個延時執行的功能是如何實現的,秘密就在任務隊列的實現。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

ScheduledThreadPoolExecutor的任務隊列不是普通的BlockingQueue,而是一個特殊的實現DelayedWorkQueue。下一篇文章就來說說這個DelayedWorkQueue


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM