ScheduledThreadPoolExecutor中定時周期任務的實現源碼分析


ScheduledThreadPoolExecutor是一個定時任務線程池,相比於ThreadPoolExecutor最大的不同在於其阻塞隊列的實現

首先看一下其構造方法:

1 public ScheduledThreadPoolExecutor(int corePoolSize,
2                                    ThreadFactory threadFactory,
3                                    RejectedExecutionHandler handler) {
4     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
5           new DelayedWorkQueue(), threadFactory, handler);
6 }

ScheduledThreadPoolExecutor是繼承自ThreadPoolExecutor的,可以看到這里實際上調用了ThreadPoolExecutor的構造方法,而最大的不同在於這里使用了默認的DelayedWorkQueue“阻塞隊列”,這是后續能夠實現定時任務的關鍵


在ScheduledThreadPoolExecutor中使用scheduleWithFixedDelay或者scheduleAtFixedRate方法來完成定時周期任務

以scheduleWithFixedDelay為例
scheduleWithFixedDelay方法:

 1 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 2                                                  long initialDelay,
 3                                                  long delay,
 4                                                  TimeUnit unit) {
 5     if (command == null || unit == null)
 6         throw new NullPointerException();
 7     if (delay <= 0)
 8         throw new IllegalArgumentException();
 9     ScheduledFutureTask<Void> sft =
10         new ScheduledFutureTask<Void>(command,
11                                       null,
12                                       triggerTime(initialDelay, unit),
13                                       unit.toNanos(-delay));
14     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
15     sft.outerTask = t;
16     delayedExecute(t);
17     return t;
18 }

這里首先會將我們的任務包裝成ScheduledFutureTask
(這里的delay在傳入ScheduledFutureTask的構造方法時變為了負的,這是和scheduleAtFixedRate方法唯一不一樣的地方)


ScheduledFutureTask方法:

 1 private void delayedExecute(RunnableScheduledFuture<?> task) {
 2     if (isShutdown())
 3         reject(task);
 4     else {
 5         super.getQueue().add(task);
 6         if (isShutdown() &&
 7             !canRunInCurrentRunState(task.isPeriodic()) &&
 8             remove(task))
 9             task.cancel(false);
10         else
11             ensurePrestart();
12     }
13 }

這里不同於ThreadPoolExecutor中的處理,並沒有考慮coreSize和maxSize和任務之間的關系,而是直接將任務提交到阻塞隊列DelayedWorkQueue中


DelayedWorkQueue的add方法:

 1 public boolean add(Runnable e) {
 2     return offer(e);
 3 }
 4 
 5 public boolean offer(Runnable x) {
 6     if (x == null)
 7         throw new NullPointerException();
 8     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
 9     final ReentrantLock lock = this.lock;
10     lock.lock();
11     try {
12         int i = size;
13         if (i >= queue.length)
14             grow();
15         size = i + 1;
16         if (i == 0) {
17             queue[0] = e;
18             setIndex(e, 0);
19         } else {
20             siftUp(i, e);
21         }
22         if (queue[0] == e) {
23             leader = null;
24             available.signal();
25         }
26     } finally {
27         lock.unlock();
28     }
29     return true;
30 }

實際上調用了offer方法,從這里就可以看出這個“阻塞隊列”的不同之處


DelayedWorkQueue中有這些成員:

1 private static final int INITIAL_CAPACITY = 16;
2 private RunnableScheduledFuture<?>[] queue =
3     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
4 private int size = 0;
5 private Thread leader = null;

在DelayedWorkQueue內部維護的是queue這個初始大小16的數組,其實就是一個小根堆


回到offer方法
由於是在多線程環境,這里的操作使用了重入鎖保證原子性
若是在size大於數組的長度情況下,就需要調用grow方法來擴容

grow方法:

1 private void grow() {
2     int oldCapacity = queue.length;
3     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
4     if (newCapacity < 0) // overflow
5         newCapacity = Integer.MAX_VALUE;
6     queue = Arrays.copyOf(queue, newCapacity);
7 }

可以看到這是一個非常簡單的擴容機制,申請一個1.5倍大小的新數組,再將原來的數據copy上去

回到offer方法,在調整完容量后,就需要進行數據的插入,使其形成一個小根堆
可以看到,在if-else判斷中,首先檢查是不是第一個元素,若是第一個,則直接放入數組,同時調用
setIndex方法,和任務關聯


setIndex方法:

1 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
2     if (f instanceof ScheduledFutureTask)
3         ((ScheduledFutureTask)f).heapIndex = idx;
4 }

這個方法很簡單,將下標關聯到之前包裝好的任務ScheduledFutureTask中


若不是第一個元素,則需要調用siftUp,進行小根堆的調整
siftUp方法:

 1 private void siftUp(int k, RunnableScheduledFuture<?> key) {
 2    while (k > 0) {
 3         int parent = (k - 1) >>> 1;
 4         RunnableScheduledFuture<?> e = queue[parent];
 5         if (key.compareTo(e) >= 0)
 6             break;
 7         queue[k] = e;
 8         setIndex(e, k);
 9         k = parent;
10     }
11     queue[k] = key;
12     setIndex(key, k);
13 }

因為小根堆實際上就是一個二叉樹,利用二叉樹的性質根據當前要插入節點的下標,得到其父節點的下標parent ,再和父節點的RunnableScheduledFuture對象進行compareTo的比較(RunnableScheduledFuture繼承了Comparable接口)

compareTo的實現:

 1 public int compareTo(Delayed other) {
 2     if (other == this) // compare zero if same object
 3         return 0;
 4     if (other instanceof ScheduledFutureTask) {
 5         ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
 6         long diff = time - x.time;
 7         if (diff < 0)
 8             return -1;
 9         else if (diff > 0)
10             return 1;
11         else if (sequenceNumber < x.sequenceNumber)
12             return -1;
13         else
14             return 1;
15     }
16     long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
17     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
18 }

這里的邏輯比較簡單,只需要看第二個if

在前面ScheduledFutureTask包裝我們的任務的時候,其構造方法如下:

1 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
2     super(r, result);
3     this.time = ns;
4     this.period = period;
5     this.sequenceNumber = sequencer.getAndIncrement();
6 }

這里的time 也就是initialDelay,period 就是-delay,sequenceNumber 是一個全局自增的序列號

那么在上面的compareTo方法中,就首先根據子節點的initialDelay和父節點的initialDelay比較
若是子節點小於父節點,返回-1,子節點大於父節點返回1
若是相等,則根據序列號比較,序列號小的返回-1


回到siftUp方法,通過compareTo方法,若是大於等於0,就說明子節點大於父節點,不需要做調整,結束循環
若是小於0,說明子節點小於父節點,那么就需要將父節點先交換到當前位置,再將k變成parent,在下一次循環時,就會找parent的parent,重復上述操作,直至構成小根堆
最后將要插入的節點放入queue中合適的位置

那么在后續的任務添加中,就會根據任務的initialDelay,以及創建時間,構建一個小根堆

回到offer方法,在小根堆中插入完節點后,若是第一次插入, 將leader(Thread對象)置為null,利用available(Condition對象)喚醒Lock 的AQS上的阻塞


DelayedWorkQueue的add到此結束,回到delayedExecute方法中,在完成向阻塞隊列添加任務后,發現線程池中並沒有一個worker在工作,接下來的工作就由ThreadPoolExecutor的ensurePrestart方法實現:

1 void ensurePrestart() {
2     int wc = workerCountOf(ctl.get());
3     if (wc < corePoolSize)
4         addWorker(null, true);
5     else if (wc == 0)
6         addWorker(null, false);
7 }

可以看到這里根據ctl的取值,與corePoolSize比較,調用了線程池的addWorker方法,那么實際上也就是通過這里開啟了線程池的worker來進行工作


來看看在worker的輪詢中發生了什么:

 1 final void runWorker(Worker w) {
 2     Thread wt = Thread.currentThread();
 3     Runnable task = w.firstTask;
 4     w.firstTask = null;
 5     w.unlock(); // allow interrupts
 6     boolean completedAbruptly = true;
 7     try {
 8         while (task != null || (task = getTask()) != null) {
 9             w.lock();
10             // If pool is stopping, ensure thread is interrupted;
11             // if not, ensure thread is not interrupted.  This
12             // requires a recheck in second case to deal with
13             // shutdownNow race while clearing interrupt
14             if ((runStateAtLeast(ctl.get(), STOP) ||
15                  (Thread.interrupted() &&
16                   runStateAtLeast(ctl.get(), STOP))) &&
17                 !wt.isInterrupted())
18                 wt.interrupt();
19             try {
20                 beforeExecute(wt, task);
21                 Throwable thrown = null;
22                 try {
23                     task.run();
24                 } catch (RuntimeException x) {
25                     thrown = x; throw x;
26                 } catch (Error x) {
27                     thrown = x; throw x;
28                 } catch (Throwable x) {
29                     thrown = x; throw new Error(x);
30                 } finally {
31                     afterExecute(task, thrown);
32                 }
33             } finally {
34                 task = null;
35                 w.completedTasks++;
36                 w.unlock();
37             }
38         }
39         completedAbruptly = false;
40     } finally {
41         processWorkerExit(w, completedAbruptly);
42     }
43 }

可以看到在ThreadPoolExecutor的worker輪詢線程中,會通過getTask方法,不斷地從阻塞隊列中獲取任務


getTask方法:

 1 private Runnable getTask() {
 2     boolean timedOut = false; // Did the last poll() time out?
 3 
 4     for (;;) {
 5         int c = ctl.get();
 6         int rs = runStateOf(c);
 7 
 8         // Check if queue empty only if necessary.
 9         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
10             decrementWorkerCount();
11             return null;
12         }
13 
14         int wc = workerCountOf(c);
15 
16         // Are workers subject to culling?
17         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
18 
19         if ((wc > maximumPoolSize || (timed && timedOut))
20             && (wc > 1 || workQueue.isEmpty())) {
21             if (compareAndDecrementWorkerCount(c))
22                 return null;
23             continue;
24         }
25 
26         try {
27             Runnable r = timed ?
28                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
29                 workQueue.take();
30             if (r != null)
31                 return r;
32             timedOut = true;
33         } catch (InterruptedException retry) {
34             timedOut = false;
35         }
36     }
37 }

可以看到在這個方法中,在一系列的參數檢查並設置完畢后,會通過workQueue的poll或者take方法來獲取所需的任務

其中poll方法是在設置了超時時間的情況下進行獲取,take則不帶有超時時間


以take為例
DelayedWorkQueue的take方法:

 1 public RunnableScheduledFuture<?> take() throws InterruptedException {
 2     final ReentrantLock lock = this.lock;
 3     lock.lockInterruptibly();
 4     try {
 5         for (;;) {
 6             RunnableScheduledFuture<?> first = queue[0];
 7             if (first == null)
 8                 available.await();
 9             else {
10                 long delay = first.getDelay(NANOSECONDS);
11                 if (delay <= 0)
12                     return finishPoll(first);
13                 first = null; // don't retain ref while waiting
14                 if (leader != null)
15                     available.await();
16                 else {
17                     Thread thisThread = Thread.currentThread();
18                     leader = thisThread;
19                     try {
20                         available.awaitNanos(delay);
21                     } finally {
22                         if (leader == thisThread)
23                             leader = null;
24                     }
25                 }
26             }
27         }
28     } finally {
29         if (leader == null && queue[0] != null)
30             available.signal();
31         lock.unlock();
32     }
33 }

在for循環中首先取出數組中的第一個元素,也就是生成的小根堆中最小的那一個
得到first后,若是first為null,則說明當前沒有可執行的任務,則使用available這個Condition對象,將AQS阻塞起來,等待下次任務創建時再通過前面提到的available喚醒阻塞
若是first存在,則通過getDelay方法獲取時間間隔

getDelay方法:

1 public long getDelay(TimeUnit unit) {
2     return unit.convert(time - now(), NANOSECONDS);
3 }

這個方法就是用time減去當前時間now,得到的一個納秒級時間差值

而time是在ScheduledFutureTask執行構造方法時,通過triggerTime方法,使用initialDelay進行計算出來的

triggerTime方法:

 1 private long triggerTime(long delay, TimeUnit unit) {
 2     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
 3 }
 4 
 5 long triggerTime(long delay) {
 6     return now() +
 7         ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
 8 }
 9 
10 private long overflowFree(long delay) {
11     Delayed head = (Delayed) super.getQueue().peek();
12     if (head != null) {
13         long headDelay = head.getDelay(NANOSECONDS);
14         if (headDelay < 0 && (delay - headDelay < 0))
15             delay = Long.MAX_VALUE + headDelay;
16     }
17     return delay;
18 }

可以看到time在這里實際上就是通過initialDelay加上當時設置的納秒級時間組成的

其中overflowFree是為了防止Long類型的溢出做了一次計算,后邊再說

所以take方法中,通過getDelay方法得到的是一個時間差,若是時間差小於等於0,則說明任務到了該執行的時候了,此時調用finishPoll

finishPoll方法:

1 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
2     int s = --size;
3     RunnableScheduledFuture<?> x = queue[s];
4     queue[s] = null;
5     if (s != 0)
6         siftDown(0, x);
7     setIndex(f, -1);
8     return f;
9 }

這個方法的邏輯還是比較簡單的,就是一個簡單的小根堆重新調整的操作,由於f需要被取出,此時利用最后一個元素,完成一次自上向下的調整(生成時是自下向上)

siftDown方法和siftUp類似:

 1 private void siftDown(int k, RunnableScheduledFuture<?> key) {
 2     int half = size >>> 1;
 3     while (k < half) {
 4         int child = (k << 1) + 1;
 5         RunnableScheduledFuture<?> c = queue[child];
 6         int right = child + 1;
 7         if (right < size && c.compareTo(queue[right]) > 0)
 8             c = queue[child = right];
 9         if (key.compareTo(c) <= 0)
10             break;
11         queue[k] = c;
12         setIndex(c, k);
13         k = child;
14     }
15     queue[k] = key;
16     setIndex(key, k);
17 }

由二叉樹性質half 保證只操作到倒數第二層
在循環中,首先根據k(當前也就是根節點),得到其左右孩子的下標
若是右孩子存在,那么就用左孩子和右孩子比較,選出最下的哪一個作為child
若是右孩子不存在,則直接使用左孩子作為child

當選出child后,再和待插入的元素key比較
若是key小,則結束循環,直接將key插入k所在位置
若不是,則將當前child所在元素放在k所在位置,然后從child位置繼續開始向下尋找,直到找到一個大於key或者遍歷完畢

這樣自上向下的將當前堆又調整成了小根堆,以后的定時周期任務都是以這種方式來調用的


看到這ScheduledThreadPoolExecutor的定時周期任務已經基本理解了,只不過還存在一個問題,當執行周期任務,會從小根堆取出,那么該任務下一次的執行時間何時更新到小根堆?


回到ThreadPoolExecutor的worker的runWorker方法中,在調用完getTask方法后,在進行完一系列完全檢查后,會直接調用task的run方法,而此時的task是經過之前ScheduledFutureTask包裝的

ScheduledFutureTask的run方法:

 1 public void run() {
 2     boolean periodic = isPeriodic();
 3     if (!canRunInCurrentRunState(periodic))
 4         cancel(false);
 5     else if (!periodic)
 6         ScheduledFutureTask.super.run();
 7     else if (ScheduledFutureTask.super.runAndReset()) {
 8         setNextRunTime();
 9         reExecutePeriodic(outerTask);
10     }
11 }

若是設置了周期任務(period不為0),那么isPeriodic方法為true
邏輯上就會執行runAndReset方法,這個方法內部就會調用我們傳入的Runnable的run方法,從而真正地執行我們的任務
在執行完畢后,可以看到調用了setNextRunTime方法


setNextRunTime方法:

1 private void setNextRunTime() {
2     long p = period;
3     if (p > 0)
4         time += p;
5     else
6         time = triggerTime(-p);
7 }

這里就很簡單,利用當前time和period計算出下一次的time
由於scheduleWithFixedDelay和scheduleAtFixedRate之前所說的不一樣之處,在這里就得到了體現

因為scheduleAtFixedRate的period是大於0的,所以scheduleAtFixedRate計算出來的時間間隔就是initialDelay + n*period的這種形式,那么其執行就會有固定的時間點,不過這還是要取決於任務的執行時間,若是任務的執行時間大於時間間隔,那么當上一次任務執行完畢,就會立刻執行,而不是等到時間點到了,若是任務的執行時間小於時間間隔,那么毫無疑問就需要等到時間點到了才執行下一次的任務

由於scheduleWithFixedDelay的period是小於0的,所以需要執行triggerTime
triggerTime方法:

1 long triggerTime(long delay) {
2     return now() +
3         ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
4 }

可以看到若是不存在Long類型的溢出問題,那么下一次的時間就等於當前時間加時間間隔,所以說scheduleWithFixedDelay的不同之處在於其算上了任務的實際執行時間

若是存在Long類型的溢出問題時
在overflowFree中:

1 private long overflowFree(long delay) {
2     Delayed head = (Delayed) super.getQueue().peek();
3     if (head != null) {
4         long headDelay = head.getDelay(NANOSECONDS);
5         if (headDelay < 0 && (delay - headDelay < 0))
6             delay = Long.MAX_VALUE + headDelay;
7     }
8     return delay;
9 }

首先通過peek得到隊列中的第一個元素,若是不存在,則直接返回delay
若是存在,通過getDelay得到headDelay
這里就會存在兩情況
任務還沒達到執行時間,則headDelay 大於零
任務達到執行時間,但卻由於之前的任務還沒執行完畢,遭到了延時,headDelay 小於0
所以這次的計算就是將headDelay這部分超時時間減去,以防止后續影響compareTo的比較,從而引起offer順序的錯誤
(只不過這種情況正常不會遇見。。。)

在計算完成下一次的運行時間后
調用reExecutePeriodic方法:

1 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
2     if (canRunInCurrentRunState(true)) {
3         super.getQueue().add(task);
4         if (!canRunInCurrentRunState(true) && remove(task))
5             task.cancel(false);
6         else
7             ensurePrestart();
8     }
9 }

其中傳入的這個task(outerTask)其實就是當前執行完畢的這個任務,
可以看到這里canRunInCurrentRunState成立的情況下,就會通過
getQueue得到阻塞隊列,再次通過DelayedWorkQueue的add方法將其加入到小根堆中,只不過這時的time發生了變化
若是情況正常,則繼續通過ThreadPoolExecutor的ensurePrestart方法,調度worker的工作

這樣定時周期任務就能正常執行


ScheduledThreadPoolExecutor分析到此結束


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM