Java調度線程池ScheduleExecutorService



如果在一個ScheduleExecutorService中提交一個任務,這個任務的調度周期設置
的時間比任務本身執行的時間短的話會出現什么情況?也就是在線程調度時間已經到了
但是上次的任務還沒有做完的情況下,ScheduleExecutorService是怎么處理的?

這個問題曾經困擾了我很久,我們都知道,ScheduleExecutorService是一個支持周期調度的線程池,我們可以設置調度的周期period,ScheduleExecutorService會按照設定好的周期調度我們的任務,如果我們設定的調度周期小於任務運行時間,那么很好理解,比如說我們設置的調度周期為1秒,而任務實際只需要10毫秒就可以執行完成一次,那么執行完成之后放到調度隊列即可,下次調度時間到了再次調度執行。那么,如果我們的任務執行時間大於我們設定的調度時間會怎么樣?比如我們設定的調度周期為1秒,但是我們的任務每次需要執行2秒,這個情況是不是很奇怪呢?

對於ScheduleExecutorService來說,你給我設定的調度周期是1秒,那么我當然1秒就會去運行一次你,但是運行1秒后發現你還在運行,那我是再次運行你還是等你運行完成再調度你運行?

當然,這都是我的主觀臆斷來猜測ScheduleExecutorService的原理,ScheduleExecutorService的真正原理需要去閱讀源碼來理解,下面帶着這個問題,以解決這個問題為目標去看一下ScheduleExecutorService的源碼吧。

首先,我們使用下面的代碼作為測試:


    private static Runnable blockRunner = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("one round:" + new Date()); } catch (InterruptedException e) { e.printStackTrace(); } }; private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); public static void main(String ... args) { scheduledExecutorService .scheduleAtFixedRate(blockRunner, 0, 100, TimeUnit.MILLISECONDS); } 

我們設定了調度周期為100毫秒,但是blockRunner實際上需要執行2秒才能返回。關於java的線程池,已經在前面的文章中寫到了,可以參考下面的文章:

先來看一下scheduleAtFixedRate這個方法:


    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } 

我們的任務command被包裝了兩次,一次變成了一個ScheduledFutureTask類型的對象,然后又變成了RunnableScheduledFuture類型的對象。然后執行了一個方法delayedExecute,這個方法字面意思上看起來像是延時執行的意思,看一下它的代碼:


    private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } } 

它的執行邏輯是:如果線程池被關閉了,那么拒絕提交的任務,否則,將該任務添加隊列中去。這個隊列就是ThreadPoolExecutor中的workQueue,而這個workQueue是在ThreadPoolExecutor的構造函數中被初始化的,也就是下面這關鍵的一句:

    public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } 

也就是說,我們的任務被添加到了一個DelayedWorkQueue隊列中去了,而DelayedWorkQueue我們在Java阻塞隊列詳解中已經分析過,它是一個可以延遲消費的阻塞隊列。而延時的時間是通過接口Delayed的getDelay方法來獲得的,我們最后找到ScheduledFutureTask實現了Delayed的getDelay方法。


        public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } 

time變量是什么?原來是delay,好像和period無關啊!!分析了這么久,發現這是第一次執行任務的邏輯啊,我想知道的是第二次、第三次以后和初始的delay無關之后的周期調度的情況啊,繼續找吧!

然后發現了ScheduledFutureTask的run方法,很明顯這就是任務調度被執行的關鍵所在,看下代碼:


        public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } } 

最為關鍵的地方在於:


            else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } 

首先是:runAndReset()這個方法,然后是setNextRunTime()這個方法,然后是reExecutePeriodic(outerTask)這個方法。
第一個方法runAndReset()貌似是執行我們的提交的任務的,我們看下代碼:


    protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; } 

關鍵的地方是c.call()這一句,這個c就是我們提交的任務。
第二個方法setNextRunTime()的意思是設置下次執行的時間,下面是他的代碼細節:


        private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); } 

我們只需要看p>0這個分支就可以了,其實這是兩種策略。我們的示例對應了第一個分支的策略,所以很顯然,time這個變量會被加p,而p則是我們設定好的period。下面我們找一下這個time是在哪里初始化的,回憶一下scheduleAtFixedRate這個方法的內,我們說我們的任務被包裝了兩次,而time就是在這里被初始化的:


    /** * Returns the trigger time of a delayed action. */ private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } /** * Returns the trigger time of a delayed action. */ long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } 

無論如何,我們知道一個任務會被運行完一次之后再次設置時間,然后線程池會獲取任務來執行,而任務隊列是一個延時阻塞隊列,所以也就造成了周期性運行的假象。可以看下下面獲取任務的take方法:


  public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } } 

可以看到,如果delay小於等於0,那么就是說需要被立即調度,否則延時delay這樣一段時間。也就是延時消費。

結論就是,一個任務會被重復添加到一個延時任務隊列,所以同一時間任務隊列中會有多個任務待調度,線程池會首先獲取優先級高的任務執行。如果我們的任務運行時間大於設置的調度時間,那么效果就是任務運行多長時間,調度時間就會變為多久,因為添加到任務隊列的任務的延時時間每次都是負數,所以會被立刻執行。



作者:一字馬胡
鏈接:https://www.jianshu.com/p/8c4c160ebdf7
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。
轉載自:https://www.jianshu.com/p/8c4c160ebdf7


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM