並發編程(十五)——定時器 ScheduledThreadPoolExecutor 實現原理與源碼深度解析


在上一篇線程池的文章《並發編程(十一)—— Java 線程池 實現原理與源碼深度解析(一)》中從ThreadPoolExecutor源碼分析了其運行機制。限於篇幅,留下了ScheduledThreadPoolExecutor未做分析,因此本文繼續從源代碼出發分析ScheduledThreadPoolExecutor的內部原理。

類聲明

1 public class ScheduledThreadPoolExecutor
2         extends ThreadPoolExecutor
3         implements ScheduledExecutorService {

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,實現了ScheduledExecutorService。因此它具有ThreadPoolExecutor的所有能力。所不同的是它具有定時執行,以周期或間隔循環執行任務等功能。

這里我們先看下ScheduledExecutorService的源碼:

ScheduledExecutorService

 1 //可調度的執行者服務接口
 2 public interface ScheduledExecutorService extends ExecutorService {
 3 
 4     //指定時延后調度執行任務,只執行一次,沒有返回值
 5     public ScheduledFuture<?> schedule(Runnable command,
 6                                        long delay, TimeUnit unit);
 7 
 8     //指定時延后調度執行任務,只執行一次,有返回值
 9     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
10                                            long delay, TimeUnit unit);
11 
12     //指定時延后開始執行任務,以后每隔period的時長再次執行該任務
13     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
14                                                   long initialDelay,
15                                                   long period,
16                                                   TimeUnit unit);
17 
18     //指定時延后開始執行任務,以后任務執行完成后等待delay時長,再次執行任務
19     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
20                                                      long initialDelay,
21                                                      long delay,
22                                                      TimeUnit unit);
23 }

其中schedule方法用於單次調度執行任務。這里主要理解下后面兩個方法。

  • scheduleAtFixedRate:該方法在initialDelay時長后第一次執行任務,以后每隔period時長,再次執行任務。注意,period是從任務開始執行算起的。開始執行任務后,定時器每隔period時長檢查該任務是否完成,如果完成則再次啟動任務,否則等該任務結束后才再次啟動任務,看下圖示例

  • scheduleWithFixDelay:該方法在initialDelay時長后第一次執行任務,以后每當任務執行完成后,等待delay時長,再次執行任務,看下圖示例。

使用例子

1、schedule(Runnable command,long delay, TimeUnit unit)

 1 /**
 2  * @author: ChenHao
 3  * @Date: Created in 14:54 2019/1/11
 4  */
 5 public class Test1 {
 6     public static void main(String[] args) throws ExecutionException, InterruptedException {
 7         // 延遲1s后開始執行,只執行一次,沒有返回值
 8         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
 9         ScheduledFuture<?> result = executorService.schedule(new Runnable() {
10             @Override
11             public void run() {
12                 System.out.println("gh");
13                 try {
14                     Thread.sleep(3000);
15                 } catch (InterruptedException e) {
16                     // TODO Auto-generated catch block
17                     e.printStackTrace();
18                 }
19             }
20         }, 1000, TimeUnit.MILLISECONDS);
21         System.out.println(result.get());
22     }
23 }

運行結果:

 

2、schedule(Callable<V> callable, long delay, TimeUnit unit);

 1 public class Test2 {
 2     public static void main(String[] args) throws ExecutionException, InterruptedException {
 3         // 延遲1s后開始執行,只執行一次,有返回值
 4         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
 5         ScheduledFuture<String> result = executorService.schedule(new Callable<String>() {
 6             @Override
 7             public String call() throws Exception {
 8                 try {
 9                     Thread.sleep(3000);
10                 } catch (InterruptedException e) {
11                     // TODO Auto-generated catch block
12                     e.printStackTrace();
13                 }
14                 return "ghq";
15             }
16         }, 1000, TimeUnit.MILLISECONDS);
17         // 阻塞,直到任務執行完成
18         System.out.print(result.get());
19     }
20 }

運行結果:

 

3、scheduleAtFixedRate

/**
 * @author: ChenHao
 * @Date: Created in 14:54 2019/1/11
 */
public class Test3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
        // 從加入任務開始算1s后開始執行任務,1+2s開始執行,1+2*2s執行,1+n*2s開始執行;
        // 但是如果執行任務時間大於2s則不會並發執行后續任務,當前執行完后,接着執行下次任務。
        ScheduledFuture<?> result = executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis());
            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);
        
        //一個ScheduledExecutorService里可以同時添加多個定時任務,這樣就是形成堆
        ScheduledFuture<?> result2 = executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis());
            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);
    }
}

這里可以看到一個ScheduledExecutorService 中可以添加多個定時任務,這是就會形成堆

運行結果:

 

4、scheduleWithFixedDelay

 1 /**
 2  * @author: ChenHao
 3  * @Date: Created in 14:54 2019/1/11
 4  */
 5 public class Test4 {
 6     public static void main(String[] args) throws ExecutionException, InterruptedException {
 7         //任務間以固定時間間隔執行,延遲1s后開始執行任務,任務執行完畢后間隔2s再次執行,任務執行完畢后間隔2s再次執行,依次往復
 8         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10);
 9         ScheduledFuture<?> result = executorService.scheduleWithFixedDelay(new Runnable() {
10             @Override
11             public void run() {
12                 System.out.println(System.currentTimeMillis());
13             }
14         }, 1000, 2000, TimeUnit.MILLISECONDS);
15 
16         // 由於是定時任務,一直不會返回
17         result.get();
18         System.out.println("over");
19     }
20 }

運行結果:

源碼分析

構造器

1 public ScheduledThreadPoolExecutor(int corePoolSize) {
2        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
3              new DelayedWorkQueue());
4 }

 

 內部其實都是調用了父類ThreadPoolExecutor的構造器,因此它具有ThreadPoolExecutor的所有能力。

 通過super方法的參數可知,核心線程的數量即傳入的參數,而線程池的線程數為Integer.MAX_VALUE,幾乎為無上限。
 這里采用了DelayedWorkQueue任務隊列,也是定時任務的核心,是一種優先隊列,時間小的排在前面,所以獲取任務的時候就能先獲取到時間最小的執行,可以看我上篇文章《並發編程(十四)—— ScheduledThreadPoolExecutor 實現原理與源碼深度解析 之 DelayedWorkQueue》。

 由於這里隊列沒有定義大小,所以隊列不會添加滿,因此最大的線程數就是核心線程數,超過核心線程數的任務就放在隊列里,並不重新開啟臨時線程。

我們先來看看幾個入口方法的實現:

 1 public ScheduledFuture<?> schedule(Runnable command,
 2                                    long delay,
 3                                    TimeUnit unit) {
 4     if (command == null || unit == null)
 5         throw new NullPointerException();
 6     RunnableScheduledFuture<?> t = decorateTask(command,
 7         new ScheduledFutureTask<Void>(command, null,
 8                                       triggerTime(delay, unit)));
 9     delayedExecute(t);
10     return t;
11 }
12 
13 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
14                                        long delay,
15                                        TimeUnit unit) {
16     if (callable == null || unit == null)
17         throw new NullPointerException();
18     RunnableScheduledFuture<V> t = decorateTask(callable,
19         new ScheduledFutureTask<V>(callable,
20                                    triggerTime(delay, unit)));
21     delayedExecute(t);
22     return t;
23 }
24 
25 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
26                                               long initialDelay,
27                                               long period,
28                                               TimeUnit unit) {
29     if (command == null || unit == null)
30         throw new NullPointerException();
31     if (period <= 0)
32         throw new IllegalArgumentException();
33     ScheduledFutureTask<Void> sft =
34         new ScheduledFutureTask<Void>(command,
35                                       null,
36                                       triggerTime(initialDelay, unit),
37                                       unit.toNanos(period));
38     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
39     sft.outerTask = t;
40     delayedExecute(t);
41     return t;
42 }
43 
44 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
45                                                  long initialDelay,
46                                                  long delay,
47                                                  TimeUnit unit) {
48     if (command == null || unit == null)
49         throw new NullPointerException();
50     if (delay <= 0)
51         throw new IllegalArgumentException();
52     ScheduledFutureTask<Void> sft =
53         new ScheduledFutureTask<Void>(command,
54                                       null,
55                                       triggerTime(initialDelay, unit),
56                                       unit.toNanos(-delay));
57     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
58     sft.outerTask = t;
59     delayedExecute(t);
60     return t;
61 }

 

 這幾個方法都是將任務封裝成了ScheduledFutureTask,上面做的首先把runnable裝飾為delay隊列所需要的格式的元素,然后把元素加入到阻塞隊列,然后線程池線程會從阻塞隊列獲取超時的元素任務進行處理,下面看下隊列元素如何實現的。

ScheduledFutureTask

ScheduledFutureTask是一個延時定時任務,它可以返回任務剩余延時時間,可以被周期性地執行。

屬性

 1 private class ScheduledFutureTask<V>
 2         extends FutureTask<V> implements RunnableScheduledFuture<V> {
 3         /** 是一個序列,每次創建任務的時候,都會自增。 */
 4         private final long sequenceNumber;
 5 
 6         /** 任務能夠開始執行的時間 */
 7         private long time;
 8 
 9         /**
10          * 任務周期執行的時間
11          * 0表示不是一個周期定時任務
12          * 正數表示固定周期時間去執行任務
13          * 負數表示任務完成之后,延時period時間再去執行任務
14          */
15         private final long period;
16 
17         /** 表示再次執行的任務,在reExecutePeriodic中調用 */
18         RunnableScheduledFuture<V> outerTask = this;
19 
20         /**
21          * 表示在任務隊列中的索引位置,用來支持快速從隊列中刪除任務。
22          */
23         int heapIndex;
24 }

ScheduledFutureTask繼承了 FutureTask 和 RunnableScheduledFuture

屬性說明:

  1. sequenceNumber: 是一個序列,每次創建任務的時候,都會自增。
  2. time: 任務能夠開始執行的時間。
  3. period: 任務周期執行的時間。0表示不是一個周期定時任務。
  4. outerTask: 表示再次執行的任務,在reExecutePeriodic中調用
  5. heapIndex: 表示在任務隊列中的索引位置,用來支持快速從隊列中刪除任務。

構造器

  • 創建延時任務

 1 /**
 2  * 創建延時任務
 3  */
 4 ScheduledFutureTask(Runnable r, V result, long ns) {
 5     // 調用父類的方法
 6     super(r, result);
 7     // 任務開始的時間
 8     this.time = ns;
 9     // period是0,不是一個周期定時任務
10     this.period = 0;
11     // 每次創建任務的時候,sequenceNumber都會自增
12     this.sequenceNumber = sequencer.getAndIncrement();
13 }
14 
15  /**
16  * 創建延時任務
17  */
18 ScheduledFutureTask(Callable<V> callable, long ns) {
19     // 調用父類的方法
20     super(callable);
21     // 任務開始的時間
22     this.time = ns;
23     // period是0,不是一個周期定時任務
24     this.period = 0;
25     // 每次創建任務的時候,sequenceNumber都會自增
26     this.sequenceNumber = sequencer.getAndIncrement();
27 }

 我們看看super(),其實就是FutureTask 里面的構造方法,關於FutureTask 可以看看我之前的文章《Java 多線程(五)—— 線程池基礎 之 FutureTask源碼解析

 1 public FutureTask(Runnable runnable, V result) {
 2     this.callable = Executors.callable(runnable, result);
 3     this.state = NEW;       // ensure visibility of callable
 4 }
 5 public FutureTask(Callable<V> callable) {
 6     if (callable == null)
 7         throw new NullPointerException();
 8     this.callable = callable;
 9     this.state = NEW;       // ensure visibility of callable
10 }
  • 創建延時定時任務
 1 /**
 2  * 創建延時定時任務
 3  */
 4 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
 5     // 調用父類的方法
 6     super(r, result);
 7     // 任務開始的時間
 8     this.time = ns;
 9     // 周期定時時間
10     this.period = period;
11     // 每次創建任務的時候,sequenceNumber都會自增
12     this.sequenceNumber = sequencer.getAndIncrement();
13 }

延時定時任務不同的是設置了period,后面通過判斷period是否為0來確定是否是定時任務。

run()

 1 public void run() {
 2     // 是否是周期任務
 3     boolean periodic = isPeriodic();
 4     // 如果不能在當前狀態下運行,那么就要取消任務
 5     if (!canRunInCurrentRunState(periodic))
 6         cancel(false);
 7     // 如果只是延時任務,那么就調用run方法,運行任務。
 8     else if (!periodic)
 9         ScheduledFutureTask.super.run();
10     // 如果是周期定時任務,調用runAndReset方法,運行任務。
11     // 這個方法不會改變任務的狀態,所以可以反復執行。
12     else if (ScheduledFutureTask.super.runAndReset()) {
13         // 設置周期任務下一次執行的開始時間time
14         setNextRunTime();
15         // 重新執行任務outerTask
16         reExecutePeriodic(outerTask);
17     }
18 }

這個方法會在ThreadPoolExecutor的runWorker方法中調用,而且這個方法調用,說明肯定已經到了任務的開始時間time了。這個方法我們待會會再繼續來回看一下

  1. 先判斷當前線程狀態能不能運行任務,如果不能,就調用cancel()方法取消本任務。
  2. 如果任務只是一個延時任務,那么調用父類的run()運行任務,改變任務的狀態,表示任務已經運行完成了。
  3. 如果任務只是一個周期定時任務,那么就任務必須能夠反復執行,那么就不能調用run()方法,它會改變任務的狀態。而是調用runAndReset()方法,只是簡單地運行任務,而不會改變任務狀態。
  4. 設置周期任務下一次執行的開始時間time,並重新執行任務。

schedule(Runnable command, long delay,TimeUnit unit)

 1 public ScheduledFuture<?> schedule(Runnable command,
 2                                   long delay,
 3                                   TimeUnit unit) {
 4    if (command == null || unit == null)
 5        throw new NullPointerException();
 6 
 7    //裝飾任務,主要實現public long getDelay(TimeUnit unit)和int compareTo(Delayed other)方法
 8    RunnableScheduledFuture<?> t = decorateTask(command,
 9        new ScheduledFutureTask<Void>(command, null,
10                                      triggerTime(delay, unit)));
11    //添加任務到延遲隊列
12    delayedExecute(t);
13    return t;
14 }

 獲取延時執行時間

 1 private long triggerTime(long delay, TimeUnit unit) {
 2     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
 3 }
 4 
 5 /**
 6  * Returns the trigger time of a delayed action.
 7  */
 8 long triggerTime(long delay) {
 9     //當前時間加上延時時間
10     return now() +
11         ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
12 }

上述的decorateTask方法把Runnable任務包裝成ScheduledFutureTask,用戶可以根據自己的需要覆寫該方法:

1 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
2     return task;
3 }

schedule的核心是其中的delayedExecute方法:

 1 private void delayedExecute(RunnableScheduledFuture<?> task) {
 2     if (isShutdown())   // 線程池已關閉
 3         reject(task);   // 任務拒絕策略
 4     else {
 5         //將任務添加到任務隊列,會根據任務延時時間進行排序
 6         super.getQueue().add(task);
 7         // 如果線程池狀態改變了,當前狀態不能運行任務,那么就嘗試移除任務,
 8         // 移除成功,就取消任務。
 9         if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
10             task.cancel(false);  // 取消任務
11         else
12             // 預先啟動工作線程,確保線程池中有工作線程。
13             ensurePrestart();
14     }
15 }

這個方法的主要作用就是將任務添加到任務隊列中,因為這里任務隊列是優先級隊列DelayedWorkQueue,它會根據任務的延時時間進行排序。

  • 如果線程池不是RUNNING狀態,不能執行延時任務task,那么調用reject(task)方法,拒絕執行任務task。

  • 將任務添加到任務隊列中,會根據任務的延時時間進行排序。

  • 因為是多線程並發環境,就必須判斷在添加任務的過程中,線程池狀態是否被別的線程更改了,那么就可能要取消任務了。

  • 將任務添加到任務隊列后,還要確保線程池中有工作線程,不然任務也不為執行。所以ensurePrestart()方法預先啟動工作線程,確保線程池中有工作線程。

 1 void ensurePrestart() {
 2     // 線程池中的線程數量
 3     int wc = workerCountOf(ctl.get());
 4     // 如果小於核心池數量,就創建新的工作線程
 5     if (wc < corePoolSize)
 6         addWorker(null, true);
 7     // 說明corePoolSize數量是0,必須創建一個工作線程來執行任務
 8     else if (wc == 0)
 9         addWorker(null, false);
10 }

通過ensurePrestart可以看到,如果核心線程池未滿,則新建的工作線程會被放到核心線程池中。如果核心線程池已經滿了,ScheduledThreadPoolExecutor不會像ThreadPoolExecutor那樣再去創建歸屬於非核心線程池的工作線程,加入到隊列就完了,等待核心線程執行完任務再拉取隊列里的任務。也就是說,在ScheduledThreadPoolExecutor中,一旦核心線程池滿了,就不會再去創建工作線程。

這里思考一點,什么時候會執行else if (wc == 0)創建一個歸屬於非核心線程池的工作線程?
答案是,當通過setCorePoolSize方法設置核心線程池大小為0時,這里必須要保證任務能夠被執行,所以會創建一個工作線程,放到非核心線程池中。

看到 addWorker(null, true); 並沒有將任務設置進入,而是設置的null, 則說明線程池里線程第一次啟動時, runWorker中取到的 firstTask為null,需要通過 getTask() 從隊列中取任務,這里可以看看我之前寫的關於線程池的文章《並發編程(十一)—— Java 線程池 實現原理與源碼深度解析(一)》。

getTask()中  Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();如果是存在核心線程則調用take(),如果傳入的核心線程為0,則存在一個臨時線程,調用poll(),這兩個方法都會先獲取時間,看看有沒有達到執行時間,沒有達到執行時間則阻塞,可以看看我上一篇文章,達到執行時間,則取到任務,就會執行下面的run方法。

 1 public void run() {
 2     // 是否是周期任務
 3     boolean periodic = isPeriodic();
 4     // 如果不能在當前狀態下運行,那么就要取消任務
 5     if (!canRunInCurrentRunState(periodic))
 6         cancel(false);
 7     // 如果只是延時任務,那么就調用run方法,運行任務。
 8     else if (!periodic)
 9         ScheduledFutureTask.super.run();
10     // 如果是周期定時任務,調用runAndReset方法,運行任務。
11     // 這個方法不會改變任務的狀態,所以可以反復執行。
12     else if (ScheduledFutureTask.super.runAndReset()) {
13         // 設置周期任務下一次執行的開始時間time
14         setNextRunTime();
15         // 重新執行任務outerTask
16         reExecutePeriodic(outerTask);
17     }
18 }
19 
20 public boolean isPeriodic() {
21     return period != 0;
22 }

 schedule不是周期任務,那么調用父類的run()運行任務,改變任務的狀態,表示任務已經運行完成了。

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

 1 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
 2                                              long initialDelay,
 3                                              long period,
 4                                              TimeUnit unit) {
 5    if (command == null || unit == null)
 6        throw new NullPointerException();
 7    if (period <= 0)
 8        throw new IllegalArgumentException();
 9    //裝飾任務類,注意period=period>0,不是負的
10    ScheduledFutureTask<Void> sft =
11        new ScheduledFutureTask<Void>(command,
12                                      null,
13                                      triggerTime(initialDelay, unit),
14                                      unit.toNanos(period));
15    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
16    sft.outerTask = t;
17    //添加任務到隊列
18    delayedExecute(t);
19    return t;
20 }

如果是周期任務則執行上面run()方法中的第12行,調用父類中的runAndReset(),這個方法同run方法比較的區別是call方法執行后不設置結果,因為周期型任務會多次執行,所以為了讓FutureTask支持這個特性除了發生異常不設置結果。

執行完任務后通過setNextRunTime方法計算下一次啟動時間:

 1 private void setNextRunTime() {
 2    long p = period;
 3   //period=delay;
 4    if (p > 0)
 5        time += p;//由於period>0所以執行這里,設置time=time+delay
 6    else
 7        time = triggerTime(-p);
 8 }
 9 
10 long triggerTime(long delay) {
11     return now() +
12         ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
13 }

scheduleAtFixedRate會執行到情況一,下一次任務的啟動時間最早為上一次任務的啟動時間加period。
scheduleWithFixedDelay會執行到情況二,這里很巧妙的將period參數設置為負數到達這段代碼塊,在此又將負的period轉為正數。情況二將下一次任務的啟動時間設置為當前時間加period。

然后將任務再次添加到任務隊列:

 1 /**
 2  * 重新執行任務task
 3  */
 4 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
 5     // 判斷當前線程池狀態能不能運行任務
 6     if (canRunInCurrentRunState(true)) {
 7         // 將任務添加到任務隊列,會根據任務延時時間進行排序
 8         super.getQueue().add(task);
 9         // 如果線程池狀態改變了,當前狀態不能運行任務,那么就嘗試移除任務,
10         // 移除成功,就取消任務。
11         if (!canRunInCurrentRunState(true) && remove(task))
12             task.cancel(false);
13         else
14             // 預先啟動工作線程,確保線程池中有工作線程。
15             ensurePrestart();
16     }
17 }

這個方法與delayedExecute方法很像,都是將任務添加到任務隊列中。

  1. 如果當前線程池狀態能夠運行任務,那么任務添加到任務隊列。
  2. 如果在在添加任務的過程中,線程池狀態是否被別的線程更改了,那么就要進行判斷,是否需要取消任務。
  3. 調用ensurePrestart()方法,預先啟動工作線程,確保線程池中有工作線程。

ScheduledFuture的get方法

既然ScheduledFuture的實現是ScheduledFutureTask,而ScheduledFutureTask繼承自FutureTask,所以ScheduledFuture的get方法的實現就是FutureTask的get方法的實現,FutureTask的get方法的實現分析在ThreadPoolExecutor篇已經寫過,這里不再敘述。要注意的是ScheduledFuture的get方法對於非周期任務才是有效的。

ScheduledThreadPoolExecutor總結

  • ScheduledThreadPoolExecutor和ThreadPoolExecutor的區別:

    ThreadPoolExecutor每次addwoker就會將自己的Task傳進新創建的woker中的線程執行,因此woker會第一時間執行當前Task,只有線程數超過了核心線程才會將任務放進隊列里

    ScheduledThreadPoolExecutor是直接入隊列,並且創建woker時傳到woker的是null,說明woker中的線程剛啟動時並沒有任務執行,只能通過getTask去隊列里取任務,取任務時會判斷是否到了執行時間,因此具有了延時執行的特性,並且task執行完了,會將當前任務重新放進堆里,並設置下次執行的時間。

  • ScheduledThreadPoolExecutor是實現自ThreadPoolExecutor的線程池,構造方法中傳入參數n,則最多會有n個核心線程工作,空閑的核心線程不會被自動終止,而是一直阻塞在DelayedWorkQueue的take方法嘗試獲取任務。構造方法傳入的參數為0,ScheduledThreadPoolExecutor將以非核心線程工作,並且最多只會創建一個非核心線程,參考上文中ensurePrestart方法的執行過程。而這個非核心線程以poll方法獲取定時任務之所以不會因為超時就被回收,是因為任務隊列並不為空,只有在任務隊列為空時才會將空閑線程回收,詳見ThreadPoolExecutor篇的runWorker方法,之前我以為空閑的非核心線程超時就會被回收是不正確的,還要具備任務隊列為空這個條件。

  • ScheduledThreadPoolExecutor的定時執行任務依賴於DelayedWorkQueue,其內部用可擴容的數組實現以啟動時間升序的二叉樹。

  • 工作線程嘗試獲取DelayedWorkQueue的任務只有在任務到達指定時間才會成功,否則非核心線程會超時返回null,核心線程一直阻塞。

  • 對於非周期型任務只會執行一次並且可以通過ScheduledFuture的get方法阻塞得到結果,其內部實現依賴於FutureTask的get方法。

  • 周期型任務通過get方法無法獲取有效結果,因為FutureTask對於周期型任務執行的是runAndReset方法,並不會設置結果。周期型任務執行完畢后會重新計算下一次啟動時間並且再次添加到DelayedWorkQueue中,所有的Task會公用一個隊列,如果一個定時器里添加多個任務,此時就會形成堆,如果只是一個定時任務,則每次只有堆頂一個數據,並且也只需要一個核心線程就夠用了,因為只有當前任務執行完才會再將該任務添加到堆里。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM