鏈接
Java線程池詳解(一)
Java線程池詳解(二)
Java調度線程池ScheduleExecutorService
上面列出了最近寫的關於java線程池ScheduleExecutorService的內容,可以作為參考,本文是對ScheduleExecutorService學習和總結的一個收尾,對java線程池技術更為深入的學習和總結將在未來適宜的時候進行。
向一個ScheduleExecutorService提交一個死循環任務
本文依然延續上一篇文章Java調度線程池ScheduleExecutorService,首先提出一個問題,如果向一個調度線程池提交一個死循環任務會發生什么?為了內容的完整性,本文會提到一些在上面列出的文章中已經涉及到的內容。
比如我們運行下面的代碼:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); private static Runnable loopRunner = () -> { for(;;){ } }; scheduledExecutorService .scheduleAtFixedRate(loopRunner, 0, 100, TimeUnit.MILLISECONDS);
loopRunner里面只有一個死循環什么也不做,當然這是極端情況,更為一般的情況為在for(;;)里面做一些某種驅動類型的工作,比如Netty的EventLoop一樣,那樣的循環更有意義,但是本文只是為了學習當向一個調度線程池提交了一個死循環任務之后的運行情況。
下面我們就分析一下scheduleAtFixedRate方法的調用鏈路:
1、將loopRunner包裝成一個ScheduledFutureTask對象,ScheduledFutureTask這個類對於調度線程池至關重要
2、再次包裝變為RunnableScheduledFuture對象
3、delayedExecute方法運行,確保任務被正確處理,如果線程池已經被關閉了,那么拒絕任務的提交,否則將任務添加到一個延時隊列(workQueue)中去,這是一個具有延時功能的阻塞隊列,初始容量為16,每次擴容增加50%的容量,最大容量為Integer.MAX_VALUE
4、運行方法ensurePrestart,確保線程池已經開始工作了,如果線程池里面的線程數量還沒有達到設定的corePoolSize,那么就添加一個新的Worker,然后讓這個Worker去延時隊列去獲取任務來執行
5、方法addWorker執行,添加一個Worker,然后讓他執行我們提交的任務,下面摘取一段addWorker的方法內容:
/** * 完整代碼見源碼,下面只是摘取了部分,去除了一些不影響閱讀的部分 */ private boolean addWorker(Runnable firstTask, boolean core) { boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //添加一個新的worker int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //如果添加新的Worker成功,那么就啟動它來執行我們提交的任務 t.start(); workerStarted = true; } } } return workerStarted; }
6、第五步中最為重要的一句話就是t.start(),這句話的執行會發生什么?首先看這個t是什么東西:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
而this就是Worker自身,而Worker是實現了Runnable的,也就是說,t.start()這句話會執行worker自身的run方法
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
7、我們已經知道現在會執行Worker的run方法,下面是run方法的內容:
public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
首先從Worker中獲取其負責的task,如果task為空,那么就去延時隊列獲取任務,如果沒有獲取到任務那么線程就可以休息了,如果獲取到,那么繼續執行下面的內容。主要的就是一句:task.run(),那這句話會發生什么呢?
8、想要知道task.run()之后會發生什么,就需要知道task是個什么東西,第二步的時候說過,也就是我們的任務,只是被包裝成了一個RunnableScheduledFuture<Void>對象,那現在就去看RunnableScheduledFuture這個方法里面的run會發生什么,下面展示了其run方法的具體細節:
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
如果這不是一個周期性的任務,那么就執行super的run方法,否則執行runAndReset方法,介於本文是問題導向的文章,所以在此不對super的run方法和runAndReset方法做分析,只要知道這就是執行我們實際提交的任務就好了。也就是說,走到這一步,我們的任務開始運行起來了,也就是我們的那個loopRunner開始無限循環了,下面的代碼將永遠得不到執行。所以,到這一步就可以解決問題了,向一個調度線程池提交一個死循環的任務,那么這個任務會霸占一個線程一直不會釋放,如果很不幸線程池里面只允許有一個線程的話,那么其他提交的任務都將得不到調度執行。
9、為了走通整個流程,我們假設我們提交的不是一個死循環任務,那么提交的任務總是會被執行完的,線程總是會被釋放的,那么就會執行setNextRunTime這個方法,下面是這個方法的細節:
private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
p > 0代表的是scheduleAtFixedRate,p < 0代表的是scheduleWithFixedDelay,兩者的區別在於前者總是按照設定的軌跡來設定下次應該調度的時間,而后者總是在任務執行完成之后再根據周期設定下一次應該執行的時間。我們只分析前者。對於第一次提交的任務,time等於當前時間 + 首次延時執行的時間,對於delay等於0的情況下,首次提交任務的time就是當前時間,然后 + p代表的是下一次應該被調度的時間。
10、我們發現,每個任務都是在執行完一次之后再設定下次執行任務的時間的,這也特別關鍵。設定好下次調度的時間,那么就要開始去准備執行它吧,也就是reExecutePeriodic方法會執行,下面是reExecutePeriodic方法的內容:
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
這個方法只是將任務重新提交到了延時隊列而已,一次完整的流程到底也就結束了,為了內容的完整性,再來分析一下一個Worker從延時隊列獲取任務時的情況。回到第七步,我們有一個方法沒有提到,那就是getTask():
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
我們主要來看兩個方法: poll/take,這兩個方法都是從延時隊列獲取一個任務,下面是poll的代碼,take會阻塞一直到獲取到內容,而poll則不會阻塞,take的代碼就不粘了:
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null; first = null; // don't retain ref while waiting if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
poll的代碼最為核心的內容就是,獲取隊列首部的任務,然后獲取其延時時間,這個時間是我們在完成一次調度之后設置的下次調度時間,如果任務的運行時間大於我們設定的周期的話,這個延時時間就是負數的,那么就會被立即執行,否則會等到設定的時間,時間到了再返回給Worker執行。
最后把getDelay方法的細節粘出來,這樣內容就完整了,其中的time是我們設定的:
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
作者:一字馬胡
鏈接:https://www.jianshu.com/p/5b3ee98f0a0e
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。
轉載自:https://www.jianshu.com/p/5b3ee98f0a0e