深入理解Java線程池:ScheduledThreadPoolExecutor


介紹

自JDK1.5開始,JDK提供了ScheduledThreadPoolExecutor類來支持周期性任務的調度。在這之前的實現需要依靠Timer和TimerTask或者其它第三方工具來完成。但Timer有不少的缺陷:

  • Timer是單線程模式;
  • 如果在執行任務期間某個TimerTask耗時較久,那么就會影響其它任務的調度;
  • Timer的任務調度是基於絕對時間的,對系統時間敏感;
  • Timer不會捕獲執行TimerTask時所拋出的異常,由於Timer是單線程,所以一旦出現異常,則線程就會終止,其他任務也得不到執行。

ScheduledThreadPoolExecutor繼承ThreadPoolExecutor來重用線程池的功能,它的實現方式如下:

  • 將任務封裝成ScheduledFutureTask對象,ScheduledFutureTask基於相對時間,不受系統時間的改變所影響;
  • ScheduledFutureTask實現了java.lang.Comparable接口和java.util.concurrent.Delayed接口,所以有兩個重要的方法:compareTo和getDelay。compareTo方法用於比較任務之間的優先級關系,如果距離下次執行的時間間隔較短,則優先級高;getDelay方法用於返回距離下次任務執行時間的時間間隔;
  • ScheduledThreadPoolExecutor定義了一個DelayedWorkQueue,它是一個有序隊列,會通過每個任務按照距離下次執行時間間隔的大小來排序;
  • ScheduledFutureTask繼承自FutureTask,可以通過返回Future對象來獲取執行的結果。

通過如上的介紹,可以對比一下Timer和ScheduledThreadPoolExecutor:

Timer ScheduledThreadPoolExecutor
單線程 多線程
單個任務執行時間影響其他任務調度 多線程,不會影響
基於絕對時間 基於相對時間
一旦執行任務出現異常不會捕獲,其他任務得不到執行 多線程,單個任務的執行不會影響其他線程

所以,在JDK1.5之后,應該沒什么理由繼續使用Timer進行任務調度了。

ScheduledThreadPoolExecutor的使用

下面用一個具體的例子來說明ScheduledThreadPoolExecutor的使用:

public class ScheduledThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        // 創建大小為5的線程池
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 3; i++) {
            Task worker = new Task("task-" + i);
            // 只執行一次
//          scheduledThreadPool.schedule(worker, 5, TimeUnit.SECONDS);
            // 周期性執行,每5秒執行一次
            scheduledThreadPool.scheduleAtFixedRate(worker, 0,5, TimeUnit.SECONDS);
        }
        Thread.sleep(10000);
        System.out.println("Shutting down executor...");
        // 關閉線程池
        scheduledThreadPool.shutdown();
        boolean isDone;
        // 等待線程池終止
        do {
            isDone = scheduledThreadPool.awaitTermination(1, TimeUnit.DAYS);
            System.out.println("awaitTermination...");
        } while(!isDone);
        System.out.println("Finished all threads");
    }
}
class Task implements Runnable {
    private String name;
    public Task(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println("name = " + name + ", startTime = " + new Date());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("name = " + name + ", endTime = " + new Date());
    }
}

 

下面就來具體分析一下ScheduledThreadPoolExecutor的實現過程。

ScheduledThreadPoolExecutor的實現

ScheduledThreadPoolExecutor的類結構

看下ScheduledThreadPoolExecutor內部的類圖:

 

不要被這么多類嚇到,這里只不過是為了更清楚的了解ScheduledThreadPoolExecutor有關調度和隊列的接口。

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,實現了ScheduledExecutorService接口,該接口定義了schedule等任務調度的方法。

同時ScheduledThreadPoolExecutor有兩個重要的內部類:DelayedWorkQueue和ScheduledFutureTask。可以看到,DelayeddWorkQueue是一個阻塞隊列,而ScheduledFutureTask繼承自FutureTask,並且實現了Delayed接口。有關FutureTask的介紹請參考另一篇文章:FutureTask源碼解析

ScheduledThreadPoolExecutor的構造方法

ScheduledThreadPoolExecutor有3中構造方法:

 

public ScheduledThreadPoolExecutor(int corePoolSize,
                                    ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

 

因為ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以這里都是調用的ThreadPoolExecutor類的構造方法。有關ThreadPoolExecutor可以參考深入理解Java線程池:ThreadPoolExecutor

這里注意傳入的阻塞隊列是DelayedWorkQueue類型的對象。后面會詳細介紹。

schedule方法

在上文的例子中,使用了schedule方法來進行任務調度,schedule方法的代碼如下:

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

  

首先,這里的兩個重載的schedule方法只是傳入的第一個參數不同,可以是Runnable對象或者Callable對象。會把傳入的任務封裝成一個RunnableScheduledFuture對象,其實也就是ScheduledFutureTask對象,decorateTask默認什么功能都沒有做,子類可以重寫該方法:

/**
 * 修改或替換用於執行 runnable 的任務。此方法可重寫用於管理內部任務的具體類。默認實現只返回給定任務。
 */
protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}
/**
 * 修改或替換用於執行 callable 的任務。此方法可重寫用於管理內部任務的具體類。默認實現只返回給定任務。
 */
protected <V> RunnableScheduledFuture<V> decorateTask(
    Callable<V> callable, RunnableScheduledFuture<V> task) {
    return task;
}

  

然后,通過調用delayedExecute方法來延時執行任務。
最后,返回一個ScheduledFuture對象。

scheduleAtFixedRate方法

該方法設置了執行周期,下一次執行時間相當於是上一次的執行時間加上period,它是采用已固定的頻率來執行任務:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

  

scheduleWithFixedDelay方法

該方法設置了執行周期,與scheduleAtFixedRate方法不同的是,下一次執行時間是上一次任務執行完的系統時間加上period,因而具體執行時間不是固定的,但周期是固定的,是采用相對固定的延遲來執行任務:

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

  

注意這里的unit.toNanos(-delay));,這里把周期設置為負數來表示是相對固定的延遲執行。

scheduleAtFixedRate和scheduleWithFixedDelay的區別在setNextRunTime方法中就可以看出來:

private void setNextRunTime() {
    long p = period;
    // 固定頻率,上次執行時間加上周期時間
    if (p > 0)
        time += p;
    // 相對固定延遲執行,使用當前系統時間加上周期時間
    else
        time = triggerTime(-p);
}

  

setNextRunTime方法會在run方法中執行完任務后調用。

triggerTime方法

triggerTime方法用於獲取下一次執行的具體時間:

private long triggerTime(long delay, TimeUnit unit) {
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

  

這里的delay < (Long.MAX_VALUE >> 1是為了判斷是否要防止Long類型溢出,如果delay的值小於Long類型最大值的一半,則直接返回delay,否則需要進行防止溢出處理。

overflowFree方法

該方法的作用是限制隊列中所有節點的延遲時間在Long.MAX_VALUE之內,防止在compareTo方法中溢出。

private long overflowFree(long delay) {
    // 獲取隊列中的第一個節點
    Delayed head = (Delayed) super.getQueue().peek();
    if (head != null) {
        // 獲取延遲時間
        long headDelay = head.getDelay(NANOSECONDS);
        // 如果延遲時間小於0,並且 delay - headDelay 超過了Long.MAX_VALUE
        // 將delay設置為 Long.MAX_VALUE + headDelay 保證delay小於Long.MAX_VALUE
        if (headDelay < 0 && (delay - headDelay < 0))
            delay = Long.MAX_VALUE + headDelay;
    }
    return delay;
}

  

當一個任務已經可以執行出隊操作,但還沒有執行,可能由於線程池中的工作線程不是空閑的。具體分析一下這種情況:

  • 為了方便說明,假設Long.MAX_VALUE=1023,也就是11位,並且當前的時間是100,調用triggerTime時並沒有對delay進行判斷,而是直接返回了now() + delay,也就是相當於100 + 1023,這肯定是溢出了,那么返回的時間是-925;
  • 如果頭節點已經可以出隊但是還沒有執行出隊,那么頭節點的執行時間應該是小於當前時間的,假設是95;
  • 這時調用offer方法向隊列中添加任務,在offer方法中會調用siftUp方法來排序,在siftUp方法執行時又會調用ScheduledFutureTask中的compareTo方法來比較執行時間;
  • 這時如果執行到了compareTo方法中的long diff = time - x.time;時,那么計算后的結果就是-925 - 95 = -1020,那么將返回-1,而正常情況應該是返回1,因為新加入的任務的執行時間要比頭結點的執行時間要晚,這就不是我們想要的結果了,這會導致隊列中的順序不正確。
  • 同理也可以算一下在執行compareTo方法中的long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);時也會有這種情況;
  • 所以在triggerTime方法中對delay的大小做了判斷,就是為了防止這種情況發生。

如果執行了overflowFree方法呢,這時headDelay = 95 - 100 = -5,然后執行delay = 1023 + (-5) = 1018,那么triggerTime會返回100 + 1018 = -930,再執行compareTo方法中的long diff = time - x.time;時,diff = -930 - 95 = -930 - 100 + 5 = 1018 + 5 = 1023,沒有溢出,符合正常的預期。

所以,overflowFree方法中把已經超時的部分時間給減去,就是為了避免在compareTo方法中出現溢出情況。

(說實話,這段代碼看的很痛苦,一般情況下也不會發生這種情況,誰會傳一個Long.MAX_VALUE呢。要知道Long.MAX_VALUE的納秒數換算成年的話是292年,誰會這么無聊。。。)

 

ScheduledFutureTask的getDelay方法

public long getDelay(TimeUnit unit) {
    // 執行時間減去當前系統時間
    return unit.convert(time - now(), NANOSECONDS);
}

  

ScheduledFutureTask的構造方法

ScheduledFutureTask繼承自FutureTask並實現了RunnableScheduledFuture接口,具體可以參考上文的類圖,構造方法如下:

ScheduledFutureTask(Runnable r, V result, long ns) {
    super(r, result);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}
/**
 * Creates a periodic action with given nano time and period.
 */
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}
/**
 * Creates a one-shot action with given nanoTime-based trigger time.
 */
ScheduledFutureTask(Callable<V> callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}

  

這里面有幾個重要的屬性,下面來解釋一下:

  • time:下次任務執行時的時間;
  • period:執行周期;
  • sequenceNumber:保存任務被添加到ScheduledThreadPoolExecutor中的序號。

在schedule方法中,創建完ScheduledFutureTask對象之后,會執行delayedExecute方法來執行任務。

delayedExecute方法

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 如果線程池已經關閉,使用拒絕策略拒絕任務
    if (isShutdown())
        reject(task);
    else {
        // 添加到阻塞隊列中
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 確保線程池中至少有一個線程啟動,即使corePoolSize為0
            // 該方法在ThreadPoolExecutor中實現
            ensurePrestart();
    }
}

  

說一下這里的第二個if判斷:

  1. 如果不是SHUTDOWN狀態,執行else,否則執行步驟2;
  2. 如果在當前線程池運行狀態下可以執行任務,執行else,否則執行步驟3;
  3. 從阻塞隊列中刪除任務,如果失敗,執行else,否則執行步驟4;
  4. 取消任務,但不中斷執行中的任務。

對於步驟2,可以通過setContinueExistingPeriodicTasksAfterShutdownPolicy方法設置在線程池關閉時,周期任務繼續執行,默認為false,也就是線程池關閉時,不再執行周期任務。

ensurePrestart方法在ThreadPoolExecutor中定義:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

 

調用了addWorker方法,可以在深入理解Java線程池:ThreadPoolExecutor中查看addWorker方法的介紹,線程池中的工作線程是通過該方法來啟動並執行任務的。

ScheduledFutureTask的run方法

回顧一下線程池的執行過程:當線程池中的工作線程啟動時,不斷地從阻塞隊列中取出任務並執行,當然,取出的任務實現了Runnable接口,所以是通過調用任務的run方法來執行任務的。

這里的任務類型是ScheduledFutureTask,所以下面看一下ScheduledFutureTask的run方法:

public void run() {
    // 是否是周期性任務
    boolean periodic = isPeriodic();
    // 當前線程池運行狀態下如果不可以執行任務,取消該任務
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 如果不是周期性任務,調用FutureTask中的run方法執行
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果是周期性任務,調用FutureTask中的runAndReset方法執行
    // runAndReset方法不會設置執行結果,所以可以重復執行任務
    else if (ScheduledFutureTask.super.runAndReset()) {
        // 計算下次執行該任務的時間
        setNextRunTime();
        // 重復執行任務
        reExecutePeriodic(outerTask);
    }
}

  

有關FutureTask的run方法和runAndReset方法,可以參考FutureTask源碼解析

分析一下執行過程:

  1. 如果當前線程池運行狀態不可以執行任務,取消該任務,然后直接返回,否則執行步驟2;
  2. 如果不是周期性任務,調用FutureTask中的run方法執行,會設置執行結果,然后直接返回,否則執行步驟3;
  3. 如果是周期性任務,調用FutureTask中的runAndReset方法執行,不會設置執行結果,然后直接返回,否則執行步驟4和步驟5;
  4. 計算下次執行該任務的具體時間;
  5. 重復執行任務。

ScheduledFutureTask的reExecutePeriodic方法

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

  

該方法和delayedExecute方法類似,不同的是:

  1. 由於調用reExecutePeriodic方法時已經執行過一次周期性任務了,所以不會reject當前任務;
  2. 傳入的任務一定是周期性任務。

onShutdown方法

onShutdown方法是ThreadPoolExecutor中的鈎子方法,在ThreadPoolExecutor中什么都沒有做,參考深入理解Java線程池:ThreadPoolExecutor,該方法是在執行shutdown方法時被調用:

@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    // 獲取在線程池已 shutdown 的情況下是否繼續執行現有延遲任務
    boolean keepDelayed =
        getExecuteExistingDelayedTasksAfterShutdownPolicy();
    // 獲取在線程池已 shutdown 的情況下是否繼續執行現有定期任務
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();
    // 如果在線程池已 shutdown 的情況下不繼續執行延遲任務和定期任務
    // 則依次取消任務,否則則根據取消狀態來判斷
    if (!keepDelayed && !keepPeriodic) {
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();
    }
    else {
        // Traverse snapshot to avoid iterator exceptions
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                // 如果有在 shutdown 后不繼續的延遲任務或周期任務,則從隊列中刪除並取消任務
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // also remove if already cancelled
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate();
}

  

DelayedWorkQueue

ScheduledThreadPoolExecutor之所以要自己實現阻塞的工作隊列,是因為ScheduledThreadPoolExecutor要求的工作隊列有些特殊。

DelayedWorkQueue是一個基於堆的數據結構,類似於DelayQueue和PriorityQueue。在執行定時任務的時候,每個任務的執行時間都不同,所以DelayedWorkQueue的工作就是按照執行時間的升序來排列,執行時間距離當前時間越近的任務在隊列的前面(注意:這里的順序並不是絕對的,堆中的排序只保證了子節點的下次執行時間要比父節點的下次執行時間要大,而葉子節點之間並不一定是順序的,下文中會說明)。

堆結構如下圖所示:

 

可見,DelayedWorkQueue是一個基於最小堆結構的隊列。堆結構可以使用數組表示,可以轉換成如下的數組:

在這種結構中,可以發現有如下特性:

假設,索引值從0開始,子節點的索引值為k,父節點的索引值為p,則:

  1. 一個節點的左子節點的索引為:k = p * 2 + 1;
  2. 一個節點的右子節點的索引為:k = (p + 1) * 2;
  3. 一個節點的父節點的索引為:p = (k - 1) / 2。

為什么要使用DelayedWorkQueue呢?

定時任務執行時需要取出最近要執行的任務,所以任務在隊列中每次出隊時一定要是當前隊列中執行時間最靠前的,所以自然要使用優先級隊列。

DelayedWorkQueue是一個優先級隊列,它可以保證每次出隊的任務都是當前隊列中執行時間最靠前的,由於它是基於堆結構的隊列,堆結構在執行插入和刪除操作時的最壞時間復雜度是 O(logN)

 

DelayedWorkQueue的屬性

// 隊列初始容量
private static final int INITIAL_CAPACITY = 16;
// 根據初始容量創建RunnableScheduledFuture類型的數組
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// leader線程
private Thread leader = null;
// 當較新的任務在隊列的頭部可用時,或者新線程可能需要成為leader,則通過該條件發出信號
private final Condition available = lock.newCondition();

  

注意這里的leader,它是Leader-Follower模式的變體,用於減少不必要的定時等待。什么意思呢?對於多線程的網絡模型來說:

所有線程會有三種身份中的一種:leader和follower,以及一個干活中的狀態:proccesser。它的基本原則就是,永遠最多只有一個leader。而所有follower都在等待成為leader。線程池啟動時會自動產生一個Leader負責等待網絡IO事件,當有一個事件產生時,Leader線程首先通知一個Follower線程將其提拔為新的Leader,然后自己就去干活了,去處理這個網絡事件,處理完畢后加入Follower線程等待隊列,等待下次成為Leader。這種方法可以增強CPU高速緩存相似性,及消除動態內存分配和線程間的數據交換。

參考自:http://blog.csdn.net/goldlevi/article/details/7705180

具體leader的作用在分析take方法時再詳細介紹。

offer方法

既然是阻塞隊列,入隊的操作如add和put方法都調用了offer方法,下面查看一下offer方法:

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        // queue是一個RunnableScheduledFuture類型的數組,如果容量不夠需要擴容
        if (i >= queue.length)
            grow();
        size = i + 1;
        // i == 0 說明堆中還沒有數據
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
        // i != 0 時,需要對堆進行重新排序
            siftUp(i, e);
        }
        // 如果傳入的任務已經是隊列的第一個節點了,這時available需要發出信號
        if (queue[0] == e) {
            // leader設置為null為了使在take方法中的線程在通過available.signal();后會執行available.awaitNanos(delay);
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

  

有關Condition的介紹請參考深入理解AbstractQueuedSynchronizer(三)

這里的重點是siftUp方法。

siftUp方法

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        // 找到父節點的索引
        int parent = (k - 1) >>> 1;
        // 獲取父節點
        RunnableScheduledFuture<?> e = queue[parent];
        // 如果key節點的執行時間大於父節點的執行時間,不需要再排序了
        if (key.compareTo(e) >= 0)
            break;
        // 如果key.compareTo(e) < 0,說明key節點的執行時間小於父節點的執行時間,需要把父節點移到后面
        queue[k] = e;
        // 設置索引為k
        setIndex(e, k);
        k = parent;
    }
    // key設置為排序后的位置中
    queue[k] = key;
    setIndex(key, k);
}

  

代碼很好理解,就是循環的根據key節點與它的父節點來判斷,如果key節點的執行時間小於父節點,則將兩個節點交換,使執行時間靠前的節點排列在隊列的前面。

假設新入隊的節點的延遲時間(調用getDelay()方法獲得)是5,執行過程如下:

  1. 先將新的節點添加到數組的尾部,這時新節點的索引k為7:

  2. 計算新父節點的索引:parent = (k - 1) >>> 1,parent = 3,那么queue[3]的時間間隔值為8,因為 5 < 8 ,將執行queue[7] = queue[3]:

  3.這時將k設置為3,繼續循環,再次計算parent為1,queue[1]的時間間隔為3,因為 5 > 3 ,這時退出循環,最終k為3: 

 

 

可見,每次新增節點時,只是根據父節點來判斷,而不會影響兄弟節點。

另外,setIndex方法只是設置了ScheduledFutureTask中的heapIndex屬性:

private void setIndex(RunnableScheduledFuture<?> f, int idx) {
    if (f instanceof ScheduledFutureTask)
        ((ScheduledFutureTask)f).heapIndex = idx;
}

  

take方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
                available.await();
            else {
                // 計算當前時間到執行時間的時間間隔
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                // leader不為空,阻塞線程
                if (leader != null)
                    available.await();
                else {
                    // leader為空,則把leader設置為當前線程,
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 阻塞到執行時間
                        available.awaitNanos(delay);
                    } finally {
                        // 設置leader = null,讓其他線程執行available.awaitNanos(delay);
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 如果leader不為空,則說明leader的線程正在執行available.awaitNanos(delay);
        // 如果queue[0] == null,說明隊列為空
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

  

ake方法是什么時候調用的呢?在深入理解Java線程池:ThreadPoolExecutor中,介紹了getTask方法,工作線程會循環地從workQueue中取任務。但定時任務卻不同,因為如果一旦getTask方法取出了任務就開始執行了,而這時可能還沒有到執行的時間,所以在take方法中,要保證只有在到指定的執行時間的時候任務才可以被取走。

再來說一下leader的作用,這里的leader是為了減少不必要的定時等待,當一個線程成為leader時,它只等待下一個節點的時間間隔,但其它線程無限期等待。 leader線程必須在從take()或poll()返回之前signal其它線程,除非其他線程成為了leader。

舉例來說,如果沒有leader,那么在執行take時,都要執行available.awaitNanos(delay),假設當前線程執行了該段代碼,這時還沒有signal,第二個線程也執行了該段代碼,則第二個線程也要被阻塞。多個這時執行該段代碼是沒有作用的,因為只能有一個線程會從take中返回queue[0](因為有lock),其他線程這時再返回for循環執行時取的queue[0],已經不是之前的queue[0]了,然后又要繼續阻塞。

所以,為了不讓多個線程頻繁的做無用的定時等待,這里增加了leader,如果leader不為空,則說明隊列中第一個節點已經在等待出隊,這時其它的線程會一直阻塞,減少了無用的阻塞(注意,在finally中調用了signal()來喚醒一個線程,而不是signalAll())。

poll方法

下面看下poll方法,與take類似,但這里要提供超時功能:

 

public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                // 如果delay <= 0,說明已經到了任務執行的時間,返回。
                if (delay <= 0)
                    return finishPoll(first);
                // 如果nanos <= 0,說明已經超時,返回null
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                // nanos < delay 說明需要等待的時間小於任務要執行的延遲時間
                // leader != null 說明有其它線程正在對任務進行阻塞
                // 這時阻塞當前線程nanos納秒
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 這里的timeLeft表示delay減去實際的等待時間
                        long timeLeft = available.awaitNanos(delay);
                        // 計算剩余的等待時間
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

  

finishPoll方法

當調用了take或者poll方法能夠獲取到任務時,會調用該方法進行返回:

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    // 數組長度-1
    int s = --size;
    // 取出最后一個節點
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;
    // 長度不為0,則從第一個元素開始排序,目的是要把最后一個節點放到合適的位置上
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}

  

siftDown方法

siftDown方法使堆從k開始向下調整:

private void siftDown(int k, RunnableScheduledFuture<?> key) {
    // 根據二叉樹的特性,數組長度除以2,表示取有子節點的索引
    int half = size >>> 1;
    // 判斷索引為k的節點是否有子節點
    while (k < half) {
        // 左子節點的索引
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        // 右子節點的索引
        int right = child + 1;
        // 如果有右子節點並且左子節點的時間間隔大於右子節點,取時間間隔最小的節點
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        // 如果key的時間間隔小於等於c的時間間隔,跳出循環
        if (key.compareTo(c) <= 0)
            break;
        // 設置要移除索引的節點為其子節點
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    // 將key放入索引為k的位置
    queue[k] = key;
    setIndex(key, k);
}

  

siftDown方法執行時包含兩種情況,一種是沒有子節點,一種是有子節點(根據half判斷)。例如:

沒有子節點的情況:

假設初始的堆如下:

 

 

假設 k = 3 ,那么 k = half ,沒有子節點,在執行siftDown方法時直接把索引為3的節點設置為數組的最后一個節點:

 

有子節點的情況:

假設 k = 0 ,那么執行以下步驟:

  1. 獲取左子節點,child = 1 ,獲取右子節點, right = 2 :
  2. 由於 right < size ,這時比較左子節點和右子節點時間間隔的大小,這里 3 < 7 ,所以 c = queue[child] ;
  3. 比較key的時間間隔是否小於c的時間間隔,這里不滿足,繼續執行,把索引為k的節點設置為c,然后將k設置為child,;

   4.因為 half = 3 ,k = 1 ,繼續執行循環,這時的索引變為:

 

  5.這時再經過如上判斷后,將k的值為3,最終的結果如下:

  6.最后,如果在finishPoll方法中調用的話,會把索引為0的節點的索引設置為-1,表示已經刪除了該節點,並且size也減了1,最后的結果如下:

 

可見,siftdown方法在執行完並不是有序的,但可以發現,子節點的下次執行時間一定比父節點的下次執行時間要大,由於每次都會取左子節點和右子節點中下次執行時間最小的節點,所以還是可以保證在take和poll時出隊是有序的。

remove方法

 

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = indexOf(x);
        if (i < 0)
            return false;
        setIndex(queue[i], -1);
        int s = --size;
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        if (s != i) {
            // 從i開始向下調整
            siftDown(i, replacement);
            // 如果queue[i] == replacement,說明i是葉子節點
            // 如果是這種情況,不能保證子節點的下次執行時間比父節點的大
            // 這時需要進行一次向上調整
            if (queue[i] == replacement)
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

  

假設初始的堆結構如下:

 這時要刪除8的節點,那么這時 k = 1,key為最后一個節點:

這時通過上文對siftDown方法的分析,siftDown方法執行后的結果如下:

這時會發現,最后一個節點的值比父節點還要小,所以這里要執行一次siftUp方法來保證子節點的下次執行時間要比父節點的大,所以最終結果如下:

 

總結

本文詳細分析了ScheduedThreadPoolExecutor的實現,主要介紹了以下方面:

  • 與Timer執行定時任務的比較,相比Timer,ScheduedThreadPoolExecutor有什么優點;
  • ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以它也是一個線程池,也有coorPoolSize和workQueue,ScheduledThreadPoolExecutor特殊的地方在於,自己實現了優先工作隊列DelayedWorkQueue;
  • ScheduedThreadPoolExecutor實現了ScheduledExecutorService,所以就有了任務調度的方法,如schedule,scheduleAtFixedRate和scheduleWithFixedDelay,同時注意他們之間的區別;
  • 內部類ScheduledFutureTask繼承自FutureTask,實現了任務的異步執行並且可以獲取返回結果。同時也實現了Delayed接口,可以通過getDelay方法獲取將要執行的時間間隔;
  • 周期任務的執行其實是調用了FutureTask類中的runAndReset方法,每次執行完不設置結果和狀態。參考FutureTask源碼解析
  • 詳細分析了DelayedWorkQueue的數據結構,它是一個基於最小堆結構的優先隊列,並且每次出隊時能夠保證取出的任務是當前隊列中下次執行時間最小的任務。同時注意一下優先隊列中堆的順序,堆中的順序並不是絕對的,但要保證子節點的值要比父節點的值要大,這樣就不會影響出隊的順序。

總體來說,ScheduedThreadPoolExecutor的重點是要理解下次執行時間的計算,以及優先隊列的出隊、入隊和刪除的過程,這兩個是理解ScheduedThreadPoolExecutor的關鍵。

原文地址:http://www.ideabuffer.cn/2017/04/14/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3Java%E7%BA%BF%E7%A8%8B%E6%B1%A0%EF%BC%9AScheduledThreadPoolExecutor/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM