ScheduledThreadPoolExecutor除了具有ThreadPoolExecutor的所有功能外,還可以延遲執行任務或者周期性的執 行某個任務。scheduleWithFixedDelay和scheduleAtFixedRate就是用來完成這個功能的。平常使用 scheduleAtFixedRate這個方法時並沒有多想,但是這幾天在實現一個功能的時候,需要考慮scheduleAtFixedRate所執行 的task是否會影響任務的周期性,比如scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS),那么 這個command的執行會不會影響這個10秒的周期性。因此特意仔細看了下ScheduledThreadPoolExecutor的源代碼,這里記錄 一下,以便以后查看。
scheduleAtFixedRate有兩個時間參數,initialDelay和period,對應該方法的兩個主要功能,即延遲運行任務和周期性執行任務。
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
- long initialDelay,
- long period,
- TimeUnit unit) {
- if (command == null || unit == null)
- throw new NullPointerException();
- if (period <= 0)
- throw new IllegalArgumentException();
- RunnableScheduledFuture<?> t = decorateTask(command,
- new ScheduledFutureTask<Object>(command,
- null,
- triggerTime(initialDelay, unit),
- unit.toNanos(period)));
- delayedExecute(t);
- return t;
- }
- /**
- * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
- */
- private void delayedExecute(Runnable command) {
- if (isShutdown()) {
- reject(command);
- return;
- }
- // Prestart a thread if necessary. We cannot prestart it
- // running the task because the task (probably) shouldn't be
- // run yet, so thread will just idle until delay elapses.
- if (getPoolSize() < getCorePoolSize())
- prestartCoreThread();
- super.getQueue().add(command);
- }
首先創建一個ScheduledFutureTask,然后通過delayedExecute執行這個task。在delayedExecute中,首先 預先啟動一個線程,這里要注意的是這個這里用來啟動一個新線程的firstTask參數是null,所以新啟動的線程是idle狀態的,然后把這個 task加入到workQueue。ScheduledThreadPoolExecutor里的workQueue用的是 DelayedWorkQueue,這個DelayedWorkQueue就是實現delay的關鍵。DelayedWorkQueue內部使用的是 DelayQueue,DelayQueue實現task delay的關鍵就在於其Offer(E e)和Take.下面,通過分析這兩個方法和結合ThreadPoolExecutor的運行原理來說明delay操作是如何實現的
- public boolean offer(E e) {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- E first = q.peek();
- q.offer(e);
- if (first == null || e.compareTo(first) < 0)
- available.signalAll();
- return true;
- } finally {
- lock.unlock();
- }
- }
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- E first = q.peek();
- if (first == null) {
- available.await();
- } else {
- long delay = first.getDelay(TimeUnit.NANOSECONDS);
- if (delay > 0) {
- long tl = available.awaitNanos(delay);
- } else {
- E x = q.poll();
- assert x != null;
- if (q.size() != 0)
- available.signalAll(); // wake up other takers
- return x;
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
ScheduledThreadPoolExecutor執行task是通過工作線程Work來承擔的,Work的Run方法如下:
- public void run() {
- try {
- Runnable task = firstTask;
- firstTask = null;
- while (task != null || (task = getTask()) != null) {
- runTask(task);
- task = null;
- }
- } finally {
- workerDone(this);
- }
- }
因為前面在delayedExecute方法里面創建work線程的firstTask參數為null,所以就通過getTask去從workQueue 里面獲取task,getTask在正常情況下(即線程池沒有關閉,線程數量沒有超過corePoolSize等)是通過 workQueue.take()從workQueue里獲取任務。根據上面的貼出來的take方法的代碼,如果queue是空的,則take方法會阻塞 住,直到有新task被add進來。而在上面的delayedExecute方法的最后,會把創建的scheduledFutureTask加入到 workQueue,這樣take方法中的available.await()就被喚醒;在take方法里面,如果workQueue不為空,則執行 task.getDelay()方法獲取task的delay
- public long getDelay(TimeUnit unit) {
- return unit.convert(time - now(), TimeUnit.NANOSECONDS);
- }
這里的time是通過兩個方法把initialDelay變成一個triggerTime
- /**
- * Returns the trigger time of a delayed action.
- */
- private long triggerTime(long delay, TimeUnit unit) {
- return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
- }
- /**
- * Returns the trigger time of a delayed action.
- */
- long triggerTime(long delay) {
- return now() +
- ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
- }
注意看這個方法,這里返回的delay不是固定不變的,從task被放入workQueue起,不同的時間調用getDelay方法會得出不同的 delay。如果放入workQueue的task的initialDelay是5秒,那么根據take方法的代碼,如果在放入workQueue5秒 后,就可以從delayQueue中拿到5秒前put進去的task,這樣就實現了delay的功能。
在本文的最前面提到scheduleAtFixedRate能夠周期性地執行一項任務,那么這個是如何實現的呢?在 scheduleAtFixedRate方法里創建了一個ScheduledFutureTask,這個ScheduledFutureTask包裝了 command,最后周期性執行的是ScheduledFutureTask的run方法。
- private void runPeriodic() {
- boolean ok = ScheduledFutureTask.super.runAndReset();
- boolean down = isShutdown();
- // Reschedule if not cancelled and not shutdown or policy allows
- if (ok && (!down ||
- (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
- !isStopped()))) {
- long p = period;
- if (p > 0)
- time += p;
- else
- time = triggerTime(-p);
- ScheduledThreadPoolExecutor.super.getQueue().add(this);
- }
- // This might have been the final executed delayed
- // task. Wake up threads to check.
- else if (down)
- interruptIdleWorkers();
- }
- /**
- * Overrides FutureTask version so as to reset/requeue if periodic.
- */
- public void run() {
- if (isPeriodic())
- runPeriodic();
- else
- ScheduledFutureTask.super.run();
- }
由上面的代碼可以看出,scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS)這個方法的周期性會受 command的影響,如果command方法的執行時間是10秒,那么執行command的周期其實是20秒,即 scheduleAtFixedRate這個方法要等一個完整的command方法執行完成后才繼續周期性地執行command方法,其實這樣的設計也是 符合常理的。
以上就是對ScheduledThreadPoolExecutor的一點小理解。
來自:http://olylakers.iteye.com/blog/1218243

