ScheduledThreadPoolExecutor的scheduleAtFixedRate方法探究


ScheduledThreadPoolExecutor除了具有ThreadPoolExecutor的所有功能外,還可以延遲執行任務或者周期性的執 行某個任務。scheduleWithFixedDelay和scheduleAtFixedRate就是用來完成這個功能的。平常使用 scheduleAtFixedRate這個方法時並沒有多想,但是這幾天在實現一個功能的時候,需要考慮scheduleAtFixedRate所執行 的task是否會影響任務的周期性,比如scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS),那么 這個command的執行會不會影響這個10秒的周期性。因此特意仔細看了下ScheduledThreadPoolExecutor的源代碼,這里記錄 一下,以便以后查看。

    scheduleAtFixedRate有兩個時間參數,initialDelay和period,對應該方法的兩個主要功能,即延遲運行任務和周期性執行任務。

 

Java代碼   收藏代碼
  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,  
  2.                                               long initialDelay,  
  3.                                               long period,  
  4.                                               TimeUnit unit) {  
  5.     if (command == null || unit == null)  
  6.         throw new NullPointerException();  
  7.     if (period <= 0)  
  8.         throw new IllegalArgumentException();  
  9.     RunnableScheduledFuture<?> t = decorateTask(command,  
  10.         new ScheduledFutureTask<Object>(command,  
  11.                                         null,  
  12.                                         triggerTime(initialDelay, unit),  
  13.                                         unit.toNanos(period)));  
  14.     delayedExecute(t);  
  15.     return t;  
  16. }  
  17.   
  18. /** 
  19.  * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. 
  20.  */  
  21. private void delayedExecute(Runnable command) {  
  22.     if (isShutdown()) {  
  23.         reject(command);  
  24.         return;  
  25.     }  
  26.     // Prestart a thread if necessary. We cannot prestart it  
  27.     // running the task because the task (probably) shouldn't be  
  28.     // run yet, so thread will just idle until delay elapses.  
  29.     if (getPoolSize() < getCorePoolSize())  
  30.         prestartCoreThread();  
  31.   
  32.     super.getQueue().add(command);  
  33. }  

    首先創建一個ScheduledFutureTask,然后通過delayedExecute執行這個task。在delayedExecute中,首先 預先啟動一個線程,這里要注意的是這個這里用來啟動一個新線程的firstTask參數是null,所以新啟動的線程是idle狀態的,然后把這個 task加入到workQueue。ScheduledThreadPoolExecutor里的workQueue用的是 DelayedWorkQueue,這個DelayedWorkQueue就是實現delay的關鍵。DelayedWorkQueue內部使用的是 DelayQueue,DelayQueue實現task delay的關鍵就在於其Offer(E e)和Take.下面,通過分析這兩個方法和結合ThreadPoolExecutor的運行原理來說明delay操作是如何實現的

 

Java代碼   收藏代碼
  1. public boolean offer(E e) {  
  2.     final ReentrantLock lock = this.lock;  
  3.     lock.lock();  
  4.     try {  
  5.         E first = q.peek();  
  6.         q.offer(e);  
  7.         if (first == null || e.compareTo(first) < 0)  
  8.             available.signalAll();  
  9.         return true;  
  10.     } finally {  
  11.         lock.unlock();  
  12.     }  
  13. }  
  14.   
  15. public E take() throws InterruptedException {  
  16.     final ReentrantLock lock = this.lock;  
  17.     lock.lockInterruptibly();  
  18.     try {  
  19.         for (;;) {  
  20.             E first = q.peek();  
  21.             if (first == null) {  
  22.                 available.await();  
  23.             } else {  
  24.                 long delay =  first.getDelay(TimeUnit.NANOSECONDS);  
  25.                 if (delay > 0) {  
  26.                     long tl = available.awaitNanos(delay);  
  27.                 } else {  
  28.                     E x = q.poll();  
  29.                     assert x != null;  
  30.                     if (q.size() != 0)  
  31.                         available.signalAll(); // wake up other takers  
  32.                     return x;  
  33.   
  34.                 }  
  35.             }  
  36.         }  
  37.     } finally {  
  38.         lock.unlock();  
  39.     }  
  40. }  

      ScheduledThreadPoolExecutor執行task是通過工作線程Work來承擔的,Work的Run方法如下:

 

Java代碼   收藏代碼
  1. public void run() {  
  2.     try {  
  3.         Runnable task = firstTask;  
  4.         firstTask = null;  
  5.         while (task != null || (task = getTask()) != null) {  
  6.             runTask(task);  
  7.             task = null;  
  8.         }  
  9.     } finally {  
  10.         workerDone(this);  
  11.     }  
  12. }  

     因為前面在delayedExecute方法里面創建work線程的firstTask參數為null,所以就通過getTask去從workQueue 里面獲取task,getTask在正常情況下(即線程池沒有關閉,線程數量沒有超過corePoolSize等)是通過 workQueue.take()從workQueue里獲取任務。根據上面的貼出來的take方法的代碼,如果queue是空的,則take方法會阻塞 住,直到有新task被add進來。而在上面的delayedExecute方法的最后,會把創建的scheduledFutureTask加入到 workQueue,這樣take方法中的available.await()就被喚醒;在take方法里面,如果workQueue不為空,則執行 task.getDelay()方法獲取task的delay

Java代碼   收藏代碼
  1. public long getDelay(TimeUnit unit) {  
  2.     return unit.convert(time - now(), TimeUnit.NANOSECONDS);  
  3. }  

   這里的time是通過兩個方法把initialDelay變成一個triggerTime

Java代碼   收藏代碼
  1. /** 
  2.  * Returns the trigger time of a delayed action. 
  3.  */  
  4. private long triggerTime(long delay, TimeUnit unit) {  
  5.      return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));  
  6. }  
  7.   
  8. /** 
  9.  * Returns the trigger time of a delayed action. 
  10.  */  
  11. long triggerTime(long delay) {  
  12.      return now() +  
  13.          ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));  
  14. }  

注意看這個方法,這里返回的delay不是固定不變的,從task被放入workQueue起,不同的時間調用getDelay方法會得出不同的 delay。如果放入workQueue的task的initialDelay是5秒,那么根據take方法的代碼,如果在放入workQueue5秒 后,就可以從delayQueue中拿到5秒前put進去的task,這樣就實現了delay的功能。

 

   在本文的最前面提到scheduleAtFixedRate能夠周期性地執行一項任務,那么這個是如何實現的呢?在 scheduleAtFixedRate方法里創建了一個ScheduledFutureTask,這個ScheduledFutureTask包裝了 command,最后周期性執行的是ScheduledFutureTask的run方法。

Java代碼   收藏代碼
  1. private void runPeriodic() {  
  2.     boolean ok = ScheduledFutureTask.super.runAndReset();  
  3.     boolean down = isShutdown();  
  4.     // Reschedule if not cancelled and not shutdown or policy allows  
  5.     if (ok && (!down ||  
  6.                (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&  
  7.                 !isStopped()))) {  
  8.         long p = period;  
  9.         if (p > 0)  
  10.             time += p;  
  11.         else  
  12.             time = triggerTime(-p);  
  13.         ScheduledThreadPoolExecutor.super.getQueue().add(this);  
  14.     }  
  15.     // This might have been the final executed delayed  
  16.     // task.  Wake up threads to check.  
  17.     else if (down)  
  18.         interruptIdleWorkers();  
  19. }  
  20.   
  21. /** 
  22.  * Overrides FutureTask version so as to reset/requeue if periodic. 
  23.  */  
  24. public void run() {  
  25.     if (isPeriodic())  
  26.         runPeriodic();  
  27.     else  
  28.         ScheduledFutureTask.super.run();  
  29. }  

     由上面的代碼可以看出,scheduleAtFixedRate(command,5,10,TimeUnit.SECONDS)這個方法的周期性會受 command的影響,如果command方法的執行時間是10秒,那么執行command的周期其實是20秒,即 scheduleAtFixedRate這個方法要等一個完整的command方法執行完成后才繼續周期性地執行command方法,其實這樣的設計也是 符合常理的。

 

     以上就是對ScheduledThreadPoolExecutor的一點小理解。

 

來自:http://olylakers.iteye.com/blog/1218243


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM