需要在理解線程池原理的基礎上學習定時任務:Java並發(二十一):線程池實現原理
一、先做總結
通過一個簡單示例總結:
public static void main(String[] args) { ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(3); scheduled.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println(); } }, 10, 30, TimeUnit.MILLISECONDS); }
1、概述
new一個線程池,等待隊列是DelayedWorkQueue,將Runable放入隊列中,到時間會被線程池取出執行
2、如何實現任務到時間被自動取出?
延時隊列DelayedWorkQueue:
DelayedWorkQueue為ScheduledThreadPoolExecutor中的內部類(類似DelayQueue)
DelayedWorkQueue中的任務是按照延遲時間從短到長來進行排序的(插入時排序)
只有在延遲期滿時才能從中提取元素,其列頭是延遲期滿后保存時間最長的Delayed元素
DelayedWorkQueue原理:
put()/offer():將ScheduledFutureTask放入隊列時,進行排序,時間短的在前(ScheduledFutureTask有觸發時間time屬性)
take():取出ScheduledFutureTask時,quene[0]的時間到了就返回;
quene[0]的時間沒到,就將take線程掛起delay時間。時間到了自動喚醒(Unsafe實現),再次取quene[0]。
3、周期任務如何實現?
任務被取出來run之后,將time+period又放入DelayedWorkQueue隊列
4、四個定時任務及區別:
(1)schedule(Callable callable, long delay, TimeUnit unit) :創建並執行在給定延遲后啟用的 ScheduledFuture。
(2)schedule(Runnable command, long delay, TimeUnit unit) :創建並執行在給定延遲后啟用的一次性操作。
(3)scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :創建並執行一個在給定初始延遲后首次啟用的定期操作,后續操作具有給定的周期;也就是將在 initialDelay 后開始執行,然后在 initialDelay+period 后執行,接着在 initialDelay + 2 * period 后執行,依此類推。
(4)scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :創建並執行一個在給定初始延遲后首次啟用的定期操作,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
區別:
第三個方法(scheduleAtFixedRate)是周期固定,也就說它是不會受到這個延遲的影響的,每個線程的調度周期在初始化時就已經絕對了,是什么時候調度就是什么時候調度,它不會因為上一個線程的調度失效延遲而受到影響。
但是第四個方法(scheduleWithFixedDelay),則不一樣,它是每個線程的調度間隔固定,也就是說第一個線程與第二線程之間間隔delay,第二個與第三個間隔delay,以此類推。如果第二線程推遲了那么后面所有的線程調度都會推遲。
scheduleAtFixedRate與scheduleWithFixedDelay區別原理:
任務被取出來run之后,將time+period又放入DelayedWorkQueue隊列
細節一:構造ScheduledFutureTask時,scheduleAtFixedRate傳入period(>0),scheduleWithFixedDelay傳入-delay(<0)
細節二:setNextRunTime時,scheduleAtFixedRate.time=time+period;scheduleWithFixedDelay.time=now()+period
細節一:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
細節二:
private void setNextRunTime() { long p = period; if (p > 0) time += p;// scheduleAtFixedRate:在上次開始執行的時間+周期時間 else time = triggerTime(-p);// scheduleWithFixedDelay:執行完上一個線程的時間+周期時間 } long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay: overflowFree(delay)); }
二、四個定時任務方法
ScheduledThreadPoolExecutor提供了如下四個方法,也就是四個調度器:
- schedule(Callable callable, long delay, TimeUnit unit) :創建並執行在給定延遲后啟用的 ScheduledFuture。
- schedule(Runnable command, long delay, TimeUnit unit) :創建並執行在給定延遲后啟用的一次性操作。
- scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :創建並執行一個在給定初始延遲后首次啟用的定期操作,后續操作具有給定的周期;也就是將在 initialDelay 后開始執行,然后在 initialDelay+period 后執行,接着在 initialDelay + 2 * period 后執行,依此類推。
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :創建並執行一個在給定初始延遲后首次啟用的定期操作,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
第一、二個方法差不多,都是一次性操作,只不過參數一個是Callable,一個是Runnable。
稍微分析下第三(scheduleAtFixedRate)、四個(scheduleWithFixedDelay)方法,加入initialDelay = 5,period/delay = 3,unit為秒。
如果每個線程都是都運行非常良好不存在延遲的問題,那么這兩個方法線程運行周期是5、8、11、14、17…….,但是如果存在延遲呢?比如第三個線程用了5秒鍾,那么這兩個方法的處理策略是怎樣的?第三個方法(scheduleAtFixedRate)是周期固定,也就說它是不會受到這個延遲的影響的,每個線程的調度周期在初始化時就已經絕對了,是什么時候調度就是什么時候調度,它不會因為上一個線程的調度失效延遲而受到影響。但是第四個方法(scheduleWithFixedDelay),則不一樣,它是每個線程的調度間隔固定,也就是說第一個線程與第二線程之間間隔delay,第二個與第三個間隔delay,以此類推。如果第二線程推遲了那么后面所有的線程調度都會推遲,例如,上面第二線程推遲了2秒,那么第三個就不再是11秒執行了,而是13秒執行。
三、ScheduledFutureTask
ScheduledFutureTask是ScheduledThreadPoolExecutor的內部類,線程池將Runable任務封裝成ScheduledFutureTask來提交
ScheduledFutureTask內部繼承FutureTask,實現RunnableScheduledFuture接口,它內部定義了三個比較重要的變量:
/** 任務被添加到ScheduledThreadPoolExecutor中的序號 */ private final long sequenceNumber; /** 任務要執行的具體時間 */ private long time; /** 任務的間隔周期 / private final long period;
構造函數:
ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
compareTo()方法:
提供一個排序算法,該算法規則是:首先按照time排序,time小的排在前面,大的排在后面,如果time相同,則使用sequenceNumber排序,小的排在前面,大的排在后面。
為什么在這個類里面提供compareTo()方法呢?
在前面就介紹過ScheduledThreadPoolExecutor在構造方法中提供的是DelayedWorkQueue()隊列中,也就是說ScheduledThreadPoolExecutor是把任務添加到DelayedWorkQueue中的,而DelayedWorkQueue則是類似於DelayQueue,內部維護着一個以時間為先后順序的隊列,所以compareTo()方法使用與DelayedWorkQueue隊列對其元素ScheduledThreadPoolExecutor task進行排序的算法。
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
run()方法:
ScheduledThreadPoolExecutor通過run()方法對task任務進行調度和延遲
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
(1)調用isPeriodic()獲取該線程是否為周期性任務標志,然后調用canRunInCurrentRunState()方法判斷該線程是否可以執行,如果不可以執行則調用cancel()取消任務。
(2)如果當線程已經到達了執行點,則調用run()方法執行task,該run()方法是在FutureTask中定義的。
(3)否則調用runAndReset()方法運行並充值,調用setNextRunTime()方法計算任務下次的執行時間,重新把任務添加到隊列中,讓該任務可以重復執行。
四、延時隊列DelayedWorkQueue
使用優先級隊列DelayedWorkQueue,保證添加到隊列中的任務會按照任務的延時時間進行排序,延時時間少的任務首先被獲取。
重要屬性:
// 初始時,數組長度大小。 private static final int INITIAL_CAPACITY = 16; // 使用數組來儲存隊列中的元素。 private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; // 使用lock來保證多線程並發安全問題。 private final ReentrantLock lock = new ReentrantLock(); // 隊列中儲存元素的大小 private int size = 0; //特指隊列頭任務所在線程 private Thread leader = null; // 當隊列頭的任務延時時間到了,或者有新的任務變成隊列頭時,用來喚醒等待線程 private final Condition available = lock.newCondition();
offer()方法插入元素:
public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x; // 使用lock保證並發操作安全 final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; // 如果要超過數組長度,就要進行數組擴容 if (i >= queue.length) // 數組擴容 grow(); // 將隊列中元素個數加一 size = i + 1; // 如果是第一個元素,那么就不需要排序,直接賦值就行了 if (i == 0) { queue[0] = e; setIndex(e, 0); } else { // 調用siftUp方法,使插入的元素變得有序。 siftUp(i, e); } // 表示新插入的元素是隊列頭,更換了隊列頭, // 那么就要喚醒正在等待獲取任務的線程。 if (queue[0] == e) { leader = null; // 喚醒正在等待等待獲取任務的線程 available.signal(); } } finally { lock.unlock(); } return true; }
主要是三步:
(1)元素個數超過數組長度,就會調用grow()方法,進行數組擴容。
(2)將新元素e添加到優先級隊列中對應的位置,通過siftUp方法,保證按照元素的優先級排序。
(3)如果新插入的元素是隊列頭,即更換了隊列頭,那么就要喚醒正在等待獲取任務的線程。這些線程可能是因為原隊列頭元素的延時時間沒到,而等待的。
siftUp方法:按照元素的優先級插入元素
private void siftUp(int k, RunnableScheduledFuture<?> key) { // 當k==0時,就到了堆二叉樹的根節點了,跳出循環 while (k > 0) { // 父節點位置坐標, 相當於(k - 1) / 2 int parent = (k - 1) >>> 1; // 獲取父節點位置元素 RunnableScheduledFuture<?> e = queue[parent]; // 如果key元素大於父節點位置元素,滿足條件,那么跳出循環 // 因為是從小到大排序的。 if (key.compareTo(e) >= 0) break; // 否則就將父節點元素存放到k位置 queue[k] = e; // 這個只有當元素是ScheduledFutureTask對象實例才有用,用來快速取消任務。 setIndex(e, k); // 重新賦值k,尋找元素key應該插入到堆二叉樹的那個節點 k = parent; } // 循環結束,k就是元素key應該插入的節點位置 queue[k] = key; setIndex(key, k); }
take()方法取元素:
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; // 如果沒有任務,就讓線程在available條件下等待。 if (first == null) available.await(); else { // 獲取任務的剩余延時時間 long delay = first.getDelay(NANOSECONDS); // 如果延時時間到了,就返回這個任務,用來執行。 if (delay <= 0) return finishPoll(first); // 將first設置為null,當線程等待時,不持有first的引用 first = null; // don't retain ref while waiting // 如果還是原來那個等待隊列頭任務的線程, // 說明隊列頭任務正在執行。 if (leader != null) available.await(); else { // 記錄一下當前等待隊列頭任務的線程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 當任務的延時時間到了時,能夠自動超時喚醒。 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) // 喚醒等待任務的線程 available.signal(); lock.unlock(); } }
如果隊列中沒有任務,那么就讓當前線程在available條件下等待。如果隊列頭任務的剩余延時時間delay大於0,那么就讓當前線程在available條件下等待delay時間。
五、源碼解析定時任務過程
以一個簡單的示例來分析:
public static void main(String[] args) { ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(3); scheduled.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println(); } }, 10, 30, TimeUnit.MILLISECONDS); }
new線程池:
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(3); // new一個等待隊列是DelayedWorkQueue的線程池
// Executors.newScheduledThreadPool(3); public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } // super父類即線程池類ThreadPoolExecutor public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
任務提交:
// ScheduledThreadPoolExecutor.scheduleAtFixedRate public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // 封裝成ScheduledFutureTask提交 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); // 提交 return t; } // ScheduledThreadPoolExecutor.delayedExecute(RunnableScheduledFuture<?>) private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); // 任務插入到延時隊列DelayedWorkQueue中 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); // 啟動一個線程 } } // ScheduledThreadPoolExecutor.DelayedWorkQueue public boolean add(Runnable e) { return offer(e); // 按時間排序,插入延時隊列(上文分析過了) } // ThreadPoolExecutor.ensurePrestart() void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) // 線程池啟動一個沒有任務的線程,while循環到延時隊列中取任務,調用DelayedWorkQueue.take()取 // addWorker(null, true)方法不做詳細介紹,前一篇線程池文章中分析過了 addWorker(null, true); else if (wc == 0) addWorker(null, false); } // DelayedWorkQueue.take() public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); // 如果沒有任務,就讓線程在available條件下等待。 else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); // 如果延時時間到了,就返回這個任務,用來執行。 first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 如果第一個任務延時時間沒到,就掛起delay時間,到延時時間自動喚醒 // 此處是循環,自動喚醒之后再取出任務去執行 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } // 任務是封裝成ScheduledFutureTask的,任務執行會調用ScheduledFutureTask的 run方法 public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); // 執行任務 else if (ScheduledFutureTask.super.runAndReset()) { // 設置下一次循環的任務 setNextRunTime(); reExecutePeriodic(outerTask); } } // 循環 void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
參考資料 / 相關推薦:
【死磕Java並發】—–J.U.C之線程池:ScheduledThreadPoolExecutor