Executor框架是指java5中引入的一系列並發庫中與executor相關的功能類,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(圖片引用自http://www.javaclubcn.com/a/jichuzhishi/2012/1116/170.html)
本篇博文分析Executor中幾個比較重要的接口和類。
Executor
1 public interface Executor { 2 void execute(Runnable command); 3 }
Executor接口是Executor框架中最基礎的部分,定義了一個用於執行Runnable的execute方法。它沒有直接的實現類,有一個重要的子接口ExecutorService。
ExecutorService
1 //繼承自Executor接口 2 public interface ExecutorService extends Executor { 3 /** 4 * 關閉方法,調用后執行之前提交的任務,不再接受新的任務 5 */ 6 void shutdown(); 7 /** 8 * 從語義上可以看出是立即停止的意思,將暫停所有等待處理的任務並返回這些任務的列表 9 */ 10 List<Runnable> shutdownNow(); 11 /** 12 * 判斷執行器是否已經關閉 13 */ 14 boolean isShutdown(); 15 /** 16 * 關閉后所有任務是否都已完成 17 */ 18 boolean isTerminated(); 19 /** 20 * 中斷 21 */ 22 boolean awaitTermination(long timeout, TimeUnit unit) 23 throws InterruptedException; 24 /** 25 * 提交一個Callable任務 26 */ 27 <T> Future<T> submit(Callable<T> task); 28 /** 29 * 提交一個Runable任務,result要返回的結果 30 */ 31 <T> Future<T> submit(Runnable task, T result); 32 /** 33 * 提交一個任務 34 */ 35 Future<?> submit(Runnable task); 36 /** 37 * 執行所有給定的任務,當所有任務完成,返回保持任務狀態和結果的Future列表 38 */ 39 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 40 throws InterruptedException; 41 /** 42 * 執行給定的任務,當所有任務完成或超時期滿時(無論哪個首先發生),返回保持任務狀態和結果的 Future 列表。 43 */ 44 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 45 long timeout, TimeUnit unit) 46 throws InterruptedException; 47 /** 48 * 執行給定的任務,如果某個任務已成功完成(也就是未拋出異常),則返回其結果。 49 */ 50 <T> T invokeAny(Collection<? extends Callable<T>> tasks) 51 throws InterruptedException, ExecutionException; 52 /** 53 * 執行給定的任務,如果在給定的超時期滿前某個任務已成功完成(也就是未拋出異常),則返回其結果。 54 */ 55 <T> T invokeAny(Collection<? extends Callable<T>> tasks, 56 long timeout, TimeUnit unit) 57 throws InterruptedException, ExecutionException, TimeoutException; 58 }
ExecutorService接口繼承自Executor接口,定義了終止、提交任務、跟蹤任務返回結果等方法。
ExecutorService涉及到Runnable、Callable、Future接口,這些接口的具體內容如下。
1 // 實現Runnable接口的類將被Thread執行,表示一個基本的任務 2 public interface Runnable { 3 // run方法就是它所有的內容,就是實際執行的任務 4 public abstract void run(); 5 } 6 // Callable同樣是任務,與Runnable接口的區別在於它接收泛型,同時它執行任務后帶有返回內容 7 public interface Callable<V> { 8 // 相對於run方法的帶有返回值的call方法 9 V call() throws Exception; 10 }

1 // Future代表異步任務的執行結果 2 public interface Future<V> { 3 4 /** 5 * 嘗試取消一個任務,如果這個任務不能被取消(通常是因為已經執行完了),返回false,否則返回true。 6 */ 7 boolean cancel(boolean mayInterruptIfRunning); 8 9 /** 10 * 返回代表的任務是否在完成之前被取消了 11 */ 12 boolean isCancelled(); 13 14 /** 15 * 如果任務已經完成,返回true 16 */ 17 boolean isDone(); 18 19 /** 20 * 獲取異步任務的執行結果(如果任務沒執行完將等待) 21 */ 22 V get() throws InterruptedException, ExecutionException; 23 24 /** 25 * 獲取異步任務的執行結果(有最常等待時間的限制) 26 * 27 * timeout表示等待的時間,unit是它時間單位 28 */ 29 V get(long timeout, TimeUnit unit) 30 throws InterruptedException, ExecutionException, TimeoutException; 31 }
ExecutorService有一個子接口ScheduledExecutorService和一個抽象實現類AbstractExecutorService。
ScheduledExecutorService
1 // 可以安排指定時間或周期性的執行任務的ExecutorService 2 public interface ScheduledExecutorService extends ExecutorService { 3 /** 4 * 在指定延遲后執行一個任務,只執行一次 5 */ 6 public ScheduledFuture<?> schedule(Runnable command, 7 long delay, TimeUnit unit); 8 /** 9 * 與上面的方法相同,只是接受的是Callable任務 10 */ 11 public <V> ScheduledFuture<V> schedule(Callable<V> callable, 12 long delay, TimeUnit unit); 13 /** 14 * 創建並執行一個周期性的任務,在initialDelay延遲后每間隔period個單位執行一次,時間單位都是unit 15 * 每次執行任務的時間點是initialDelay, initialDelay+period, initialDelay + 2 * period... 16 */ 17 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 18 long initialDelay, 19 long period, 20 TimeUnit unit); 21 /** 22 * 創建並執行一個周期性的任務,在initialDelay延遲后開始執行,在執行結束后再延遲delay個單位開始執行下一次任務,時間單位都是unit 23 * 每次執行任務的時間點是initialDelay, initialDelay+(任務運行時間+delay), initialDelay + 2 * (任務運行時間+delay)... 24 */ 25 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 26 long initialDelay, 27 long delay, 28 TimeUnit unit); 29 }
ScheduledExecutorService定義了四個方法,已經在上面給出基本的解釋。ScheduledExecutorService有兩個實現類,分別是DelegatedScheduledExecutorService和ScheduledThreadPoolExecutor,將在后面介紹。還需要解釋的是ScheduledFuture。
ScheduledFuture繼承自Future和Delayed接口,自身沒有添加方法。Delayed接口定義了一個獲取剩余延遲的方法。
AbstractExecutorService
1 // 提供ExecutorService的默認實現 2 public abstract class AbstractExecutorService implements ExecutorService { 3 /* 4 * 為指定的Runnable和value構造一個FutureTask,value表示默認被返回的Future 5 */ 6 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 7 return new FutureTask<T>(runnable, value); 8 } 9 /* 10 * 為指定的Callable創建一個FutureTask 11 */ 12 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 13 return new FutureTask<T>(callable); 14 } 15 /* 16 * 提交Runnable任務 17 */ 18 public Future<?> submit(Runnable task) { 19 if (task == null) throw new NullPointerException(); 20 // 通過newTaskFor方法構造RunnableFuture,默認的返回值是null 21 RunnableFuture<Object> ftask = newTaskFor(task, null); 22 // 調用具體實現的execute方法 23 execute(ftask); 24 return ftask; 25 } 26 /* 27 * 提交Runnable任務 28 */ 29 public <T> Future<T> submit(Runnable task, T result) { 30 if (task == null) throw new NullPointerException(); 31 // 通過newTaskFor方法構造RunnableFuture,默認的返回值是result 32 RunnableFuture<T> ftask = newTaskFor(task, result); 33 execute(ftask); 34 return ftask; 35 } 36 /* 37 * 提交Callable任務 38 */ 39 public <T> Future<T> submit(Callable<T> task) { 40 if (task == null) throw new NullPointerException(); 41 RunnableFuture<T> ftask = newTaskFor(task); 42 execute(ftask); 43 return ftask; 44 } 45 46 /* 47 * doInvokeAny的具體實現(核心內容),其它幾個方法都是重載方法,都對這個方法進行調用 48 * tasks 是被執行的任務集,timed標志是否定時的,nanos表示定時的情況下執行任務的限制時間 49 */ 50 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 51 boolean timed, long nanos) 52 throws InterruptedException, ExecutionException, TimeoutException { 53 // tasks空判斷 54 if (tasks == null) 55 throw new NullPointerException(); 56 // 任務數量 57 int ntasks = tasks.size(); 58 if (ntasks == 0) 59 throw new IllegalArgumentException(); 60 // 創建對應數量的Future返回集 61 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); 62 ExecutorCompletionService<T> ecs = 63 new ExecutorCompletionService<T>(this); 64 try { 65 // 執行異常 66 ExecutionException ee = null; 67 // System.nanoTime()根據系統計時器當回當前的納秒值 68 long lastTime = (timed)? System.nanoTime() : 0; 69 // 獲取任務集的遍歷器 70 Iterator<? extends Callable<T>> it = tasks.iterator(); 71 72 // 向執行器ExecutorCompletionService提交一個任務,並將結果加入futures中 73 futures.add(ecs.submit(it.next 74 // 修改任務計數器 75 --ntasks; 76 // 活躍任務計數器 77 int active = 1; 78 for (;;) { 79 // 獲取並移除代表已完成任務的Future,如果不存在,返回null 80 Future<T> f = ecs.poll(); 81 if (f == null) { 82 // 沒有任務完成,且任務集中還有未提交的任務 83 if (ntasks > 0) { 84 // 剩余任務計數器減1 85 --ntasks; 86 // 提交任務並添加結果 87 futures.add(ecs.submit(it.next())); 88 // 活躍任務計數器加1 89 ++active; 90 } 91 // 沒有剩余任務,且沒有活躍任務(所有任務可能都會取消),跳過這一次循環 92 else if (active == 0) 93 break; 94 else if (timed) { 95 // 獲取並移除代表已完成任務的Future,如果不存在,會等待nanos指定的納秒數 96 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 97 if (f == null) 98 throw new TimeoutException(); 99 // 計算剩余可用時間 100 long now = System.nanoTime(); 101 nanos -= now - lastTime; 102 lastTime = now; 103 } 104 else 105 // 獲取並移除表示下一個已完成任務的未來,等待,如果目前不存在。 106 // 執行到這一步說明已經沒有任務任務可以提交,只能等待某一個任務的返回 107 f = ecs.take(); 108 } 109 // f不為空說明有一個任務完成了 110 if (f != null) { 111 // 已完成一個任務,所以活躍任務計數減1 112 --active; 113 try { 114 // 返回該任務的結果 115 return f.get(); 116 } catch (InterruptedException ie) { 117 throw ie; 118 } catch (ExecutionException eex) { 119 ee = eex; 120 } catch (RuntimeException rex) { 121 ee = new ExecutionException(rex); 122 } 123 } 124 } 125 // 如果沒有成功返回結果則拋出異常 126 if (ee == null) 127 ee = new ExecutionException(); 128 throw ee; 129 130 } finally { 131 // 無論執行中發生異常還是順利結束,都將取消剩余未執行的任務 132 for (Future<T> f : futures) 133 f.cancel(true); 134 } 135 } 136 137 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 138 throws InterruptedException, ExecutionException { 139 try { 140 // 非定時任務的doInvokeAny調用 141 return doInvokeAny(tasks, false, 0); 142 } catch (TimeoutException cannotHappen) { 143 assert false; 144 return null; 145 } 146 } 147 // 定時任務的invokeAny調用,timeout表示超時時間,unit表示時間單位 148 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 149 long timeout, TimeUnit unit) 150 throws InterruptedException, ExecutionException, TimeoutException { 151 return doInvokeAny(tasks, true, unit.toNanos(timeout)); 152 } 153 // 無超時設置的invokeAll方法 154 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 155 throws InterruptedException { 156 // 空任務判斷 157 if (tasks == null) 158 throw new NullPointerException(); 159 // 創建大小為任務數量的結果集 160 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 161 // 是否完成所有任務的標記 162 boolean done = false; 163 try { 164 // 遍歷並執行任務 165 for (Callable<T> t : tasks) { 166 RunnableFuture<T> f = newTaskFor(t); 167 futures.add(f); 168 execute(f); 169 } 170 // 遍歷結果集 171 for (Future<T> f : futures) { 172 // 如果某個任務沒完成,通過f調用get()方法 173 if (!f.isDone()) { 174 try { 175 // get方法等待計算完成,然后獲取結果(會等待)。所以調用get后任務就會完成計算,否則會等待 176 f.get(); 177 } catch (CancellationException ignore) { 178 } catch (ExecutionException ignore) { 179 } 180 } 181 } 182 // 標志所有任務執行完成 183 done = true; 184 // 返回結果 185 return futures; 186 } finally { 187 // 假如沒有完成所有任務(可能是發生異常等情況),將任務取消 188 if (!done) 189 for (Future<T> f : futures) 190 f.cancel(true); 191 } 192 } 193 // 超時設置的invokeAll方法 194 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 195 long timeout, TimeUnit unit) 196 throws InterruptedException { 197 // 需要執行的任務集為空或時間單位為空,拋出異常 198 if (tasks == null || unit == null) 199 throw new NullPointerException(); 200 // 將超時時間轉為納秒單位 201 long nanos = unit.toNanos(timeout); 202 // 創建任務結果集 203 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 204 // 是否全部完成的標志 205 boolean done = false; 206 try { 207 // 遍歷tasks,將任務轉為RunnableFuture 208 for (Callable<T> t : tasks) 209 futures.add(newTaskFor(t)); 210 // 記錄當前時間(單位是納秒) 211 long lastTime = System.nanoTime(); 212 // 獲取迭代器 213 Iterator<Future<T>> it = futures.iterator(); 214 // 遍歷 215 while (it.hasNext()) { 216 // 執行任務 217 execute((Runnable)(it.next())); 218 // 記錄當前時間 219 long now = System.nanoTime(); 220 // 計算剩余可用時間 221 nanos -= now - lastTime; 222 // 更新上一次執行時間 223 lastTime = now; 224 // 超時,返回保存任務狀態的結果集 225 if (nanos <= 0) 226 return futures; 227 } 228 229 for (Future<T> f : futures) { 230 // 如果有任務沒完成 231 if (!f.isDone()) { 232 // 時間已經用完,返回保存任務狀態的結果集 233 if (nanos <= 0) 234 return futures; 235 try { 236 // 獲取計算結果,最多等待給定的時間nanos,單位是納秒 237 f.get(nanos, TimeUnit.NANOSECONDS); 238 } catch (CancellationException ignore) { 239 } catch (ExecutionException ignore) { 240 } catch (TimeoutException toe) { 241 return futures; 242 } 243 // 計算可用時間 244 long now = System.nanoTime(); 245 nanos -= now - lastTime; 246 lastTime = now; 247 } 248 } 249 // 修改是否全部完成的標記 250 done = true; 251 // 返回結果集 252 return futures; 253 } finally { 254 // 假如沒有完成所有任務(可能是時間已經用完、發生異常等情況),將任務取消 255 if (!done) 256 for (Future<T> f : futures) 257 f.cancel(true); 258 } 259 } 260 }
AbstractExecutor實現了ExecutorService接口的部分方法。具體代碼的分析在上面已經給出。
AbstractExecutor有兩個子類:DelegatedExecutorService、ThreadPoolExecutor。將在后面介紹。
下面是AbstractExecutor中涉及到的RunnableFuture、FutureTask、ExecutorCompletionService。
RunnableFuture繼承自Future和Runnable,只有一個run()方法(Runnable中已經有一個run方法了,為什么RunnableFuture還要重新寫一個run方法呢?求高手指教)。RunnableFuture接口看上去就像是Future和Runnable兩個接口的組合。
FutureTask實現了RunnableFuture接口,除了實現了Future和Runnable中的方法外,它還有自己的方法和一個內部類Sync。
ExecutorCompletionService實現了CompletionService接口,將結果從復雜的一部分物種解耦出來。這些內容后續會介紹,不過這里先介紹框架中的其它內容,弄清整體框架。
下面看繼承自AbstractExecutorService的ThreadPoolExecutor。
ThreadPoolExecutor

1 public class ThreadPoolExecutor extends AbstractExecutorService { 2 // 檢查關閉的權限 3 private static final RuntimePermission shutdownPerm = 4 new RuntimePermission("modifyThread"); 5 /* runState提供了主要的生命周期控制,可取值有以下幾個: 6 * RUNNING:接受新的任務,處理隊列中的任務 7 * SHUTDOWN:不再接受新的任務,但是處理隊列中的任務 8 * STOP:不接受新任務,也不處理隊列中的任務,打斷正在處理的任務 9 * TERMINATED:和STOP類似,同時終止所有線程 10 * RUNNING -> SHUTDOWN 11 * On invocation of shutdown(), perhaps implicitly in finalize() 12 * (RUNNING or SHUTDOWN) -> STOP 13 * On invocation of shutdownNow() 14 * SHUTDOWN -> TERMINATED 15 * When both queue and pool are empty 16 * STOP -> TERMINATED 17 * When pool is empty 18 * 19 */ 20 volatile int runState; 21 static final int RUNNING = 0; 22 static final int SHUTDOWN = 1; 23 static final int STOP = 2; 24 static final int TERMINATED = 3; 25 26 // 用於保持任務的隊列 27 private final BlockingQueue<Runnable> workQueue; 28 // poolSize, corePoolSize, maximumPoolSize, runState, workers set的更新鎖 29 private final ReentrantLock mainLock = new ReentrantLock(); 30 // mainLock鎖的一個Condition實例 31 private final Condition termination = mainLock.newCondition(); 32 // 保持線程池中所有的工作線程。只有獲取mainLock鎖后才能訪問。 33 private final HashSet<Worker> workers = new HashSet<Worker>(); 34 // 空閑線程的等待時間,大為是納秒 35 private volatile long keepAliveTime; 36 // 是否允許核心線程“活着” false(默認值)允許,哪怕空閑;true則使用keepAliveTime來控制等待超時時間 37 private volatile boolean allowCoreThreadTimeOut; 38 // 核心線程池的大小 39 private volatile int corePoolSize; 40 // pool size最大值 41 private volatile int maximumPoolSize; 42 // 當前pool大小 43 private volatile int poolSize; 44 // 拒絕執行的處理器 顧名思義,當一個任務被拒絕執行后將同個這個處理器進行處理 45 private volatile RejectedExecutionHandler handler; 46 // 線程工廠,用於創建線程 47 private volatile ThreadFactory threadFactory; 48 // 最終pool size達到的最大值 49 private int largestPoolSize; 50 // 已完成任務計數 51 private long completedTaskCount; 52 // 默認的拒絕執行的處理器 53 private static final RejectedExecutionHandler defaultHandler = 54 new AbortPolicy(); 55 /** 56 * 關於借個size的說明: 57 * 線程池數量poolSize指工作線程Worker對象的集合workers的實際大小,通過workers.size()可直接獲得。 58 * 核心線程池數量corePoolSize,可理解為工作線程Worker對象的集合workers的目標大小。 59 * 如果poolSize > corePoolSize,那么ThreadPoolExecutor就會有機制在適當的時候回收閑置的線程。 60 * 最大線程池數量maxPoolSize,就是工作線程Worker對象的集合workers的大小上限。 61 * 假如說任務隊列滿了,再來新任務時,若poolSize還沒達到maxPoolSize,則繼續創建新的線程來執行新任務, 62 * 若不幸poolSize達到了上限maxPoolSize,那不能再創建新的線程了,只能采取reject策略來拒絕新任務。 63 */ 64 /** 構造方法 開始*/ 65 public ThreadPoolExecutor(int corePoolSize, 66 int maximumPoolSize, 67 long keepAliveTime, 68 TimeUnit unit, 69 BlockingQueue<Runnable> workQueue) { 70 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 71 Executors.defaultThreadFactory(), defaultHandler); 72 } 73 public ThreadPoolExecutor(int corePoolSize, 74 int maximumPoolSize, 75 long keepAliveTime, 76 TimeUnit unit, 77 BlockingQueue<Runnable> workQueue, 78 ThreadFactory threadFactory) { 79 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 80 threadFactory, defaultHandler); 81 } 82 public ThreadPoolExecutor(int corePoolSize, 83 int maximumPoolSize, 84 long keepAliveTime, 85 TimeUnit unit, 86 BlockingQueue<Runnable> workQueue, 87 RejectedExecutionHandler handler) { 88 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 89 Executors.defaultThreadFactory(), handler); 90 } 91 // 主要的構造方法,其它構造方法都是對這個方法的調用 92 public ThreadPoolExecutor(int corePoolSize, 93 int maximumPoolSize, 94 long keepAliveTime, 95 TimeUnit unit, 96 BlockingQueue<Runnable> workQueue, 97 ThreadFactory threadFactory, 98 RejectedExecutionHandler handler) { 99 // 非法輸入(明顯這些值都是不能小於0的) 100 if (corePoolSize < 0 || 101 maximumPoolSize <= 0 || 102 maximumPoolSize < corePoolSize || 103 keepAliveTime < 0) 104 throw new IllegalArgumentException(); 105 // 空判斷 106 if (workQueue == null || threadFactory == null || handler == null) 107 throw new NullPointerException(); 108 this.corePoolSize = corePoolSize; 109 this.maximumPoolSize = maximumPoolSize; 110 this.workQueue = workQueue; 111 this.keepAliveTime = unit.toNanos(keepAliveTime); 112 this.threadFactory = threadFactory; 113 this.handler = handler; 114 } 115 /** 構造方法 結束*/ 116 117 118 // 執行Runnable任務 119 public void execute(Runnable command) { 120 if (command == null) 121 throw new NullPointerException(); 122 /*如果當前線程數量poolSize>=核心線程數量corePoolSize, 123 那當然無法再把當前任務加入到核心線程池中執行了,於是進花括號選擇其他的策略執行; 124 如果poolSize沒有達到corePoolSize,那很自然是把當前任務放到核心線程池執行, 125 也就是執行邏輯或運算符后的方法addIfUnderCorePoolSize(command)。 126 “放到核心線程池執行”是什么意思呢? 127 就是new 一個新工作線程放到workers集合中,讓這個新線程來執行當前的任務command,而這個新線程可以認為是核心線程池中的其中一個線程。*/ 128 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { 129 // 線程池狀態時RUNNING且能將任務添加到worker隊列中 130 if (runState == RUNNING && workQueue.offer(command)) { 131 // 加入了隊列以后,只要保證有工作線程就ok了,工作線程會自動去執行任務隊列的。 132 // 所以判斷一下if ( runState != RUNNING || poolSize == 0), 133 // 在這個if為true時候,去保證一下任務隊列有線程會執行,即執行ensureQueuedTaskHandled(command)方法。 134 // 這里有兩種情況,情況一:runState != RUNNING,這種情況在ensureQueuedTaskHandled方法中會把任務丟給reject拒絕策略處理, 135 // 情況二:poolSize == 0,這種情況是new一個新線程加入到工作線程集合workers中。 136 if (runState != RUNNING || poolSize == 0) 137 ensureQueuedTaskHandled(command); 138 } 139 // 程序執行到這個分支,說明當前狀態runState != RUNNING,或者任務隊列workQueue已經滿了。 140 // 先看第一個條件下,前面解釋過runState,除了RUNNING狀態,其他三個狀態都不能接收新任務, 141 // 所以當runState != RUNNING時新任務只能根據reject策略拒絕, 142 // 而這個拒絕的邏輯是在addIfUnderMaximumPoolSize方法中實現的; 143 // 再看第二個條件下,workQueue已經滿,潛在的條件是runState == RUNNING,這種情況怎么處理新任務呢? 144 // 很簡單,若當前線程數量poolSize沒有達到最大線程數量maxPoolSize, 145 // 則創建新的線程去執行這個無法加入任務隊列的新任務, 146 // 否則就根據reject策略拒絕 147 else if (!addIfUnderMaximumPoolSize(command)) 148 reject(command); // is shutdown or saturated 149 } 150 } 151 152 private Thread addThread(Runnable firstTask) { 153 Worker w = new Worker(firstTask); 154 // 創建一個新Thread t 155 Thread t = threadFactory.newThread(w); 156 if (t != null) { 157 w.thread = t; 158 workers.add(w); 159 int nt = ++poolSize; 160 // 跟蹤線程池大小的最大值 161 if (nt > largestPoolSize) 162 largestPoolSize = nt; 163 } 164 return t; 165 } 166 167 // 創建並啟動新線程執行firstTask(在運行線程數小於核心線程池大小的情況且狀態是RUNNING) 168 private boolean addIfUnderCorePoolSize(Runnable firstTask) { 169 Thread t = null; 170 final ReentrantLock mainLock = this.mainLock; 171 // 獲取鎖 172 mainLock.lock(); 173 try { 174 if (poolSize < corePoolSize && runState == RUNNING) 175 // 創建一個新線程 176 t = addThread(firstTask); 177 } finally { 178 // 釋放鎖 179 mainLock.unlock(); 180 } 181 if (t == null) 182 return false; 183 // 啟動線程執行任務 184 t.start(); 185 return true; 186 } 187 188 // 創建並啟動新線程執行firstTask(在運行線程數小於pool size的最大值的情況且狀態是RUNNING) 189 private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { 190 Thread t = null; 191 final ReentrantLock mainLock = this.mainLock; 192 mainLock.lock(); 193 try { 194 if (poolSize < maximumPoolSize && runState == RUNNING) 195 t = addThread(firstTask); 196 } finally { 197 mainLock.unlock(); 198 } 199 if (t == null) 200 return false; 201 t.start(); 202 return true; 203 } 204 205 // 確保任務被處理 206 private void ensureQueuedTaskHandled(Runnable command) { 207 final ReentrantLock mainLock = this.mainLock; 208 mainLock.lock(); 209 // 拒絕標記 210 boolean reject = false; 211 Thread t = null; 212 try { 213 int state = runState; 214 // 如果狀態不是RUNNING,能成功從worker隊列中移除,則拒絕這個任務執行 215 if (state != RUNNING && workQueue.remove(command)) 216 reject = true; 217 else if (state < STOP && 218 poolSize < Math.max(corePoolSize, 1) && 219 !workQueue.isEmpty()) 220 t = addThread(null); 221 } finally { 222 mainLock.unlock(); 223 } 224 if (reject) 225 reject(command); 226 else if (t != null) 227 // 不用拒絕任務則啟動線程執行任務 228 t.start(); 229 } 230 231 // 調用RejectedExecutionHandler決絕任務 232 void reject(Runnable command) { 233 handler.rejectedExecution(command, this); 234 } 235 // 工作線程,實現了Runnable接口 236 private final class Worker implements Runnable { 237 // 每個任務執行都必須獲取和釋放runLock。這主要是防止中斷的目的是取消工作線程,而不是中斷正在運行的任務。 238 private final ReentrantLock runLock = new ReentrantLock(); 239 // 要執行的任務 240 private Runnable firstTask; 241 // 每個線程完成任務的計數器,最后會統計到completedTaskCount中 242 volatile long completedTasks; 243 // 用於執行Runnable的線程 244 Thread thread; 245 // 構造方法 246 Worker(Runnable firstTask) { 247 this.firstTask = firstTask; 248 } 249 // 判斷這個線程是否活動 250 boolean isActive() { 251 return runLock.isLocked(); 252 } 253 // 中斷閑置線程 254 void interruptIfIdle() { 255 final ReentrantLock runLock = this.runLock; 256 if (runLock.tryLock()) { 257 try { 258 if (thread != Thread.currentThread()) 259 thread.interrupt(); 260 } finally { 261 runLock.unlock(); 262 } 263 } 264 } 265 // 中斷 266 void interruptNow() { 267 thread.interrupt(); 268 } 269 270 271 private void runTask(Runnable task) { 272 final ReentrantLock runLock = this.runLock; 273 runLock.lock(); 274 try { 275 276 if (runState < STOP && 277 Thread.interrupted() && 278 runState >= STOP) 279 thread.interrupt(); 280 281 boolean ran = false; 282 beforeExecute(thread, task); 283 try { 284 task.run(); 285 ran = true; 286 afterExecute(task, null); 287 ++completedTasks; 288 } catch (RuntimeException ex) { 289 if (!ran) 290 afterExecute(task, ex); 291 throw ex; 292 } 293 } finally { 294 runLock.unlock(); 295 } 296 } 297 298 299 public void run() { 300 try { 301 Runnable task = firstTask; 302 firstTask = null; 303 /** 304 * 注意這段while循環的執行邏輯,每執行完一個核心線程后,就會去線程池 305 * 隊列中取下一個核心線程,如取出的核心線程為null,則當前工作線程終止 306 */ 307 while (task != null || (task = getTask()) != null) { 308 //你所提交的核心線程(任務)的運行邏輯 309 runTask(task); 310 task = null; 311 } 312 } finally { 313 // 當前工作線程退出 314 workerDone(this); 315 } 316 } 317 } 318 319 // 從池隊列中取的核心線程(任務)的方法 320 Runnable getTask() { 321 for (;;) { 322 try { 323 // 獲取運行狀態 324 int state = runState; 325 // 大於SHUTDOWN,即STOP和TERMINATED狀態,沒有任務 326 if (state > SHUTDOWN) 327 return null; 328 Runnable r; 329 // SHUTDOWN狀態 330 if (state == SHUTDOWN) // 幫助清空隊列 331 r = workQueue.poll(); 332 // 狀態時RUNNING,且poolSize > corePoolSize或allowCoreThreadTimeOut 333 else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 334 // 獲取並移除元素(等待指定的時間) 335 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); 336 else 337 // 獲取並移除元素(會一直等待知道獲取到有效元素) 338 r = workQueue.take(); 339 // 獲取結果不為空,返回 340 if (r != null) 341 return r; 342 // 檢查一個獲取任務失敗的線程能否退出 343 if (workerCanExit()) { 344 if (runState >= SHUTDOWN) // 中斷其他線程 345 interruptIdleWorkers(); 346 return null; 347 } 348 // Else retry 349 } catch (InterruptedException ie) { 350 // On interruption, re-check runState 351 } 352 } 353 } 354 355 // 檢查一個獲取任務失敗的線程能否退出 356 private boolean workerCanExit() { 357 final ReentrantLock mainLock = this.mainLock; 358 mainLock.lock(); 359 boolean canExit; 360 try { 361 // 可以退出的條件是狀態為STOP或TERMINATED或至少有一個處理非空隊列的線程(在允許超時的情況下) 362 canExit = runState >= STOP || 363 workQueue.isEmpty() || 364 (allowCoreThreadTimeOut && 365 poolSize > Math.max(1, corePoolSize)); 366 } finally { 367 mainLock.unlock(); 368 } 369 return canExit; 370 } 371 372 // 中斷其他線程 373 void interruptIdleWorkers() { 374 final ReentrantLock mainLock = this.mainLock; 375 mainLock.lock(); 376 try { 377 // 遍歷工作線程 378 for (Worker w : workers) 379 // 嘗試中斷閑置線程 380 w.interruptIfIdle(); 381 } finally { 382 mainLock.unlock(); 383 } 384 } 385 // 工作線程退出要處理的邏輯 386 void workerDone(Worker w) { 387 final ReentrantLock mainLock = this.mainLock; 388 mainLock.lock(); 389 try { 390 completedTaskCount += w.completedTasks; 391 workers.remove(w);//從工作線程緩存中刪除 392 if (--poolSize == 0)//poolSize減一,這時其實又可以創建工作線程了 393 tryTerminate();//嘗試終止 394 } finally { 395 mainLock.unlock(); 396 } 397 } 398 399 // 嘗試終止 400 private void tryTerminate() { 401 //終止的前提條件就是線程池里已經沒有工作線程(Worker)了 402 if (poolSize == 0) { 403 int state = runState; 404 /** 405 * 如果當前已經沒有了工作線程(Worker),但是線程隊列里還有等待的線程任務,則創建一個 406 * 工作線程來執行線程隊列中等待的任務 407 */ 408 if (state < STOP && !workQueue.isEmpty()) { 409 state = RUNNING; // disable termination check below 410 Thread t = addThread(null); 411 if (t != null) 412 t.start(); 413 } 414 // 設置池狀態為終止狀態 415 if (state == STOP || state == SHUTDOWN) { 416 runState = TERMINATED; 417 termination.signalAll(); 418 terminated(); 419 } 420 } 421 } 422 // 發起一個有序的關閉在以前已提交任務的執行,但不接受新任務。如果已經關閉,調用不會有其他影響。 423 public void shutdown() { 424 // Gets the system security interface. 425 SecurityManager security = System.getSecurityManager(); 426 if (security != null) 427 // 檢查權限(以拋出異常的形式) 428 security.checkPermission(shutdownPerm); 429 final ReentrantLock mainLock = this.mainLock; 430 mainLock.lock(); 431 try { 432 if (security != null) { // 檢查調用者是否能修改線程 433 for (Worker w : workers) 434 security.checkAccess(w.thread); 435 } 436 // 獲取運行狀態 437 int state = runState; 438 // 小於SHUTDOWN的不就是RUNNING么。。。 439 if (state < SHUTDOWN) 440 runState = SHUTDOWN; 441 442 try { 443 for (Worker w : workers) { 444 // 中斷線程 445 w.interruptIfIdle(); 446 } 447 } catch (SecurityException se) { // Try to back out 448 runState = state; 449 // tryTerminate() here would be a no-op 450 throw se; 451 } 452 // 嘗試終止 453 tryTerminate(); // Terminate now if pool and queue empty 454 } finally { 455 mainLock.unlock(); 456 } 457 } 458 459 460 public List<Runnable> shutdownNow() { 461 SecurityManager security = System.getSecurityManager(); 462 if (security != null) 463 security.checkPermission(shutdownPerm); 464 465 final ReentrantLock mainLock = this.mainLock; 466 mainLock.lock(); 467 try { 468 if (security != null) { // Check if caller can modify our threads 469 for (Worker w : workers) 470 security.checkAccess(w.thread); 471 } 472 473 int state = runState; 474 // 與上一個方法主要區別在於狀態和interruptNow方法 475 if (state < STOP) 476 runState = STOP; 477 478 try { 479 for (Worker w : workers) { 480 w.interruptNow(); 481 } 482 } catch (SecurityException se) { // Try to back out 483 runState = state; 484 // tryTerminate() here would be a no-op 485 throw se; 486 } 487 488 List<Runnable> tasks = drainQueue(); 489 tryTerminate(); // Terminate now if pool and queue empty 490 return tasks; 491 } finally { 492 mainLock.unlock(); 493 } 494 } 495 496 // 清空隊列 497 private List<Runnable> drainQueue() { 498 List<Runnable> taskList = new ArrayList<Runnable>(); 499 // 將隊列中的所有元素一到taskList中 500 workQueue.drainTo(taskList); 501 while (!workQueue.isEmpty()) { 502 Iterator<Runnable> it = workQueue.iterator(); 503 try { 504 if (it.hasNext()) { 505 Runnable r = it.next(); 506 // 從workQueue中移除,並添加到taskList中 507 if (workQueue.remove(r)) 508 taskList.add(r); 509 } 510 } catch (ConcurrentModificationException ignore) { 511 } 512 } 513 return taskList; 514 } 515 516 public boolean isShutdown() { 517 return runState != RUNNING; 518 } 519 520 521 boolean isStopped() { 522 return runState == STOP; 523 } 524 525 526 public boolean isTerminating() { 527 int state = runState; 528 return state == SHUTDOWN || state == STOP; 529 } 530 531 public boolean isTerminated() { 532 return runState == TERMINATED; 533 } 534 535 public boolean awaitTermination(long timeout, TimeUnit unit) 536 throws InterruptedException { 537 long nanos = unit.toNanos(timeout); 538 final ReentrantLock mainLock = this.mainLock; 539 mainLock.lock(); 540 try { 541 for (;;) { 542 if (runState == TERMINATED) 543 return true; 544 if (nanos <= 0) 545 return false; 546 nanos = termination.awaitNanos(nanos); 547 } 548 } finally { 549 mainLock.unlock(); 550 } 551 } 552 553 554 protected void finalize() { 555 shutdown(); 556 } 557 558 559 public void setThreadFactory(ThreadFactory threadFactory) { 560 if (threadFactory == null) 561 throw new NullPointerException(); 562 this.threadFactory = threadFactory; 563 } 564 565 566 public ThreadFactory getThreadFactory() { 567 return threadFactory; 568 } 569 570 571 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 572 if (handler == null) 573 throw new NullPointerException(); 574 this.handler = handler; 575 } 576 577 578 public RejectedExecutionHandler getRejectedExecutionHandler() { 579 return handler; 580 } 581 582 // 設置核心線程數 這里的設置將覆蓋構造方法中的設置 583 // 如果小於構造方法的設置,多余的線程將被閑置 584 // 如果大於構造方法的設置,新線程將被用於執行排隊的任務 585 public void setCorePoolSize(int corePoolSize) { 586 if (corePoolSize < 0) 587 throw new IllegalArgumentException(); 588 final ReentrantLock mainLock = this.mainLock; 589 mainLock.lock(); 590 try { 591 int extra = this.corePoolSize - corePoolSize; 592 this.corePoolSize = corePoolSize; 593 // 大於構造方法的設置 594 if (extra < 0) { 595 int n = workQueue.size(); 596 while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize) { 597 Thread t = addThread(null); 598 if (t != null) 599 t.start(); 600 else 601 break; 602 } 603 } 604 // 小於構造方法的設置 605 else if (extra > 0 && poolSize > corePoolSize) { 606 try { 607 Iterator<Worker> it = workers.iterator(); 608 while (it.hasNext() && 609 extra-- > 0 && 610 poolSize > corePoolSize && 611 workQueue.remainingCapacity() == 0) 612 it.next().interruptIfIdle(); 613 } catch (SecurityException ignore) { 614 // Not an error; it is OK if the threads stay live 615 } 616 } 617 } finally { 618 mainLock.unlock(); 619 } 620 } 621 622 623 public int getCorePoolSize() { 624 return corePoolSize; 625 } 626 627 628 public boolean prestartCoreThread() { 629 return addIfUnderCorePoolSize(null); 630 } 631 632 633 public int prestartAllCoreThreads() { 634 int n = 0; 635 while (addIfUnderCorePoolSize(null)) 636 ++n; 637 return n; 638 } 639 640 641 public boolean allowsCoreThreadTimeOut() { 642 return allowCoreThreadTimeOut; 643 } 644 645 646 public void allowCoreThreadTimeOut(boolean value) { 647 if (value && keepAliveTime <= 0) 648 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 649 650 allowCoreThreadTimeOut = value; 651 } 652 653 // 設置所允許的最大的線程數。這將覆蓋在構造函數中設置的任何值。如果新值小於當前值,多余的現有線程將被終止時,他們成為閑置。 654 public void setMaximumPoolSize(int maximumPoolSize) { 655 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 656 throw new IllegalArgumentException(); 657 final ReentrantLock mainLock = this.mainLock; 658 mainLock.lock(); 659 try { 660 int extra = this.maximumPoolSize - maximumPoolSize; 661 this.maximumPoolSize = maximumPoolSize; 662 if (extra > 0 && poolSize > maximumPoolSize) { 663 try { 664 Iterator<Worker> it = workers.iterator(); 665 while (it.hasNext() && 666 extra > 0 && 667 poolSize > maximumPoolSize) { 668 it.next().interruptIfIdle(); 669 --extra; 670 } 671 } catch (SecurityException ignore) { 672 // Not an error; it is OK if the threads stay live 673 } 674 } 675 } finally { 676 mainLock.unlock(); 677 } 678 } 679 680 681 public int getMaximumPoolSize() { 682 return maximumPoolSize; 683 } 684 685 686 public void setKeepAliveTime(long time, TimeUnit unit) { 687 if (time < 0) 688 throw new IllegalArgumentException(); 689 if (time == 0 && allowsCoreThreadTimeOut()) 690 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 691 this.keepAliveTime = unit.toNanos(time); 692 } 693 694 695 public long getKeepAliveTime(TimeUnit unit) { 696 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 697 } 698 699 700 public BlockingQueue<Runnable> getQueue() { 701 return workQueue; 702 } 703 704 705 public boolean remove(Runnable task) { 706 return getQueue().remove(task); 707 } 708 709 // 移除所有被取消的任務 710 public void purge() { 711 // Fail if we encounter interference during traversal 712 try { 713 Iterator<Runnable> it = getQueue().iterator(); 714 while (it.hasNext()) { 715 Runnable r = it.next(); 716 if (r instanceof Future<?>) { 717 Future<?> c = (Future<?>)r; 718 if (c.isCancelled()) 719 it.remove(); 720 } 721 } 722 } 723 catch (ConcurrentModificationException ex) { 724 return; 725 } 726 } 727 728 729 public int getPoolSize() { 730 return poolSize; 731 } 732 733 // 獲取活躍線程數 734 public int getActiveCount() { 735 final ReentrantLock mainLock = this.mainLock; 736 mainLock.lock(); 737 try { 738 int n = 0; 739 for (Worker w : workers) { 740 if (w.isActive()) 741 ++n; 742 } 743 return n; 744 } finally { 745 mainLock.unlock(); 746 } 747 } 748 749 750 public int getLargestPoolSize() { 751 final ReentrantLock mainLock = this.mainLock; 752 mainLock.lock(); 753 try { 754 return largestPoolSize; 755 } finally { 756 mainLock.unlock(); 757 } 758 } 759 760 761 public long getTaskCount() { 762 final ReentrantLock mainLock = this.mainLock; 763 mainLock.lock(); 764 try { 765 long n = completedTaskCount; 766 for (Worker w : workers) { 767 // 統計已經完成的任務 768 n += w.completedTasks; 769 // 如果w是活躍線程,說明正在執行一個任務,所以n加一 770 if (w.isActive()) 771 ++n; 772 } 773 // 加上隊列中的任務 774 return n + workQueue.size(); 775 } finally { 776 mainLock.unlock(); 777 } 778 } 779 780 // 獲取已完成的任務數 781 public long getCompletedTaskCount() { 782 final ReentrantLock mainLock = this.mainLock; 783 mainLock.lock(); 784 try { 785 long n = completedTaskCount; 786 for (Worker w : workers) 787 n += w.completedTasks; 788 return n; 789 } finally { 790 mainLock.unlock(); 791 } 792 } 793 794 795 protected void beforeExecute(Thread t, Runnable r) { } 796 797 798 protected void afterExecute(Runnable r, Throwable t) { } 799 800 801 protected void terminated() { } 802 803 // 實現了RejectedExecutionHandler,即是一個拒絕執行的Handler 804 public static class CallerRunsPolicy implements RejectedExecutionHandler { 805 806 public CallerRunsPolicy() { } 807 808 809 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 810 if (!e.isShutdown()) { 811 r.run(); 812 } 813 } 814 } 815 816 817 public static class AbortPolicy implements RejectedExecutionHandler { 818 819 public AbortPolicy() { } 820 821 822 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 823 throw new RejectedExecutionException(); 824 } 825 } 826 827 828 public static class DiscardPolicy implements RejectedExecutionHandler { 829 830 public DiscardPolicy() { } 831 832 833 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 834 } 835 } 836 837 838 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 839 840 public DiscardOldestPolicy() { } 841 842 843 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 844 if (!e.isShutdown()) { 845 e.getQueue().poll(); 846 e.execute(r); 847 } 848 } 849 } 850 }
可以參考http://xtu-xiaoxin.iteye.com/blog/647744
從上面的框架結構圖中可以可以看出剩下的就是ScheduledThreadPoolExecutor和Executors。Executors是一個工具類,提供一些工廠和實用方法。
下面看ScheduledThreadPoolExecutor,它繼承自ThreadPoolExecutor並實現了ScheduledExecutorService接口。
ScheduledThreadPoolExecutor

// 可以安排指定時間或周期性的執行任務的ExecutorService public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { // 在Shutdown的時候如果要取消或關閉任務,設置為false;true表示繼續執行任務,在Shutdown之后 private volatile boolean continueExistingPeriodicTasksAfterShutdown; // false表示在Shutdown的時候取消Delayed的任務;true表示執行這個任務 private volatile boolean executeExistingDelayedTasksAfterShutdown = true; // 打破調度聯系,進而保證先進先出的順序捆綁項目中的序列號 private static final AtomicLong sequencer = new AtomicLong(0); // 基准時間 private static final long NANO_ORIGIN = System.nanoTime(); // 當前時間(相對於基准時間的值) final long now() { return System.nanoTime() - NANO_ORIGIN; } // RunnableScheduledFuture接口表示是否是周期性的 private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { private final long sequenceNumber; // 預定安排執行的時刻 private long time; // 表示重復任務,0表示不重復,正數表示固定比率,負數表示固定延時 private final long period; // 構造方法,構造一個只執行一次的任務 ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } // 構造方法,構造一個按指定ns開始執行,指定period周期性執行 ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } // 構造方法,構造一個延時執行的任務 ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } // 按指定單位獲取延時時間 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); } // 判斷傳入延時和這個任務延時之間的大小關系 public int compareTo(Delayed other) { // 為什么可以和Delayed比較?因為這個類實現了RunnableScheduledFuture接口,而RunnableScheduledFuture接口繼承自Delayed接口 if (other == this) // compare zero ONLY if same object return 0; // other是ScheduledFutureTask實例 if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; // 比較大小 if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0)? 0 : ((d < 0)? -1 : 1); } // 是否周期性的(包括延時的情況) public boolean isPeriodic() { return period != 0; } // 執行周期性的任務 private void runPeriodic() { // 執行任務 boolean ok = ScheduledFutureTask.super.runAndReset(); // 判斷是否已經shutdown boolean down = isShutdown(); // 重新安排任務(如果沒有shutdown或在沒有關閉且允許在shutdown之后執行已存在的任務) if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { long p = period; if (p > 0) // 計算下一次執行的時間 time += p; else // 計算觸發時間 time = triggerTime(-p); // 將任務添加到隊列中 ScheduledThreadPoolExecutor.super.getQueue().add(this); } else if (down) interruptIdleWorkers(); } // 執行任務,根據是否周期性調用不同的方法 public void run() { if (isPeriodic()) runPeriodic(); else ScheduledFutureTask.super.run(); } } // 延遲執行 private void delayedExecute(Runnable command) { // 如果已經shutdown,決絕任務 if (isShutdown()) { reject(command); return; } if (getPoolSize() < getCorePoolSize()) // 預啟動線程 prestartCoreThread(); super.getQueue().add(command); } // 取消和清除關閉政策不允許運行的任務 private void cancelUnwantedTasks() { // 獲取shutdown策略 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) super.getQueue().clear(); else if (keepDelayed || keepPeriodic) { Object[] entries = super.getQueue().toArray(); for (int i = 0; i < entries.length; ++i) { Object e = entries[i]; if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; // 根據是否周期性的任務通過制定的值判斷進行取消操作 if (t.isPeriodic()? !keepPeriodic : !keepDelayed) t.cancel(false); } } entries = null; // 凈化,移除已經取消的任務 purge(); } } public boolean remove(Runnable task) { if (!(task instanceof RunnableScheduledFuture)) return false; return getQueue().remove(task); } protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task) { return task; } protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) { return task; } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), handler); } private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } // 避免移除,返回延遲的值 private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(TimeUnit.NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } // 根據執行的延時時間執行任務 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); // ScheduledFutureTask的result為null RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // 延時執行 delayedExecute(t); return t; } // 上一個方法的重載形式,接收的是Callable public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } /** * 創建並執行一個周期性的任務,在initialDelay延遲后每間隔period個單位執行一次,時間單位都是unit * 每次執行任務的時間點是initialDelay, initialDelay+period, initialDelay + 2 * period... */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Object>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period))); delayedExecute(t); return t; } /** * 創建並執行一個周期性的任務,在initialDelay延遲后開始執行,在執行結束后再延遲delay個單位開始執行下一次任務,時間單位都是unit * 每次執行任務的時間點是initialDelay, initialDelay+(任務運行時間+delay), initialDelay + 2 * (任務運行時間+delay)... */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Boolean>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay))); delayedExecute(t); return t; } // 執行任務 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 立即執行,延時時間是0 schedule(command, 0, TimeUnit.NANOSECONDS); } // 重新 AbstractExecutorService 的方法 public Future<?> submit(Runnable task) { return schedule(task, 0, TimeUnit.NANOSECONDS); } public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, TimeUnit.NANOSECONDS); } public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, TimeUnit.NANOSECONDS); } public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { continueExistingPeriodicTasksAfterShutdown = value; if (!value && isShutdown()) cancelUnwantedTasks(); } public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { return continueExistingPeriodicTasksAfterShutdown; } public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { executeExistingDelayedTasksAfterShutdown = value; if (!value && isShutdown()) cancelUnwantedTasks(); } public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { return executeExistingDelayedTasksAfterShutdown; } // 關閉 public void shutdown() { // 取消任務 cancelUnwantedTasks(); super.shutdown(); } // 立即關閉,調用的是父類立即關閉的方法 public List<Runnable> shutdownNow() { return super.shutdownNow(); } // 返回使用這個執行器的任務隊列 public BlockingQueue<Runnable> getQueue() { return super.getQueue(); } // 將DelayQueue<RunnableScheduledFuture> 包裝為 BlockingQueue<Runnable>的類 // 類似於代理 private static class DelayedWorkQueue extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> { private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>(); public Runnable poll() { return dq.poll(); } public Runnable peek() { return dq.peek(); } public Runnable take() throws InterruptedException { return dq.take(); } public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { return dq.poll(timeout, unit); } public boolean add(Runnable x) { return dq.add((RunnableScheduledFuture)x); } public boolean offer(Runnable x) { return dq.offer((RunnableScheduledFuture)x); } public void put(Runnable x) { dq.put((RunnableScheduledFuture)x); } public boolean offer(Runnable x, long timeout, TimeUnit unit) { return dq.offer((RunnableScheduledFuture)x, timeout, unit); } public Runnable remove() { return dq.remove(); } public Runnable element() { return dq.element(); } public void clear() { dq.clear(); } public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); } public int drainTo(Collection<? super Runnable> c, int maxElements) { return dq.drainTo(c, maxElements); } public int remainingCapacity() { return dq.remainingCapacity(); } public boolean remove(Object x) { return dq.remove(x); } public boolean contains(Object x) { return dq.contains(x); } public int size() { return dq.size(); } public boolean isEmpty() { return dq.isEmpty(); } public Object[] toArray() { return dq.toArray(); } public <T> T[] toArray(T[] array) { return dq.toArray(array); } public Iterator<Runnable> iterator() { return new Iterator<Runnable>() { private Iterator<RunnableScheduledFuture> it = dq.iterator(); public boolean hasNext() { return it.hasNext(); } public Runnable next() { return it.next(); } public void remove() { it.remove(); } }; } } }
在代碼中都加了注釋,我想大致能解釋清楚吧。
Executor涉及的類還是比較多的,到此為止剩下的還有Executors
Executors
Executors中所定義的 Executor
、ExecutorService
、ScheduledExecutorService
、ThreadFactory
和 Callable
類的工廠和實用方法。此類支持以下各種方法:
- 創建並返回設置有常用配置字符串的
ExecutorService
的方法。 - 創建並返回設置有常用配置字符串的
ScheduledExecutorService
的方法。 - 創建並返回“包裝的”ExecutorService 方法,它通過使特定於實現的方法不可訪問來禁用重新配置。
- 創建並返回
ThreadFactory
的方法,它可將新創建的線程設置為已知的狀態。 - 創建並返回非閉包形式的
Callable
的方法,這樣可將其用於需要 Callable 的執行方法中。
Executors提供的都是工具形式的方法,所以都是static的,並且這個類也沒有必要實例化,所以它的構造方法時private的。下面主要看一下幾個內部類。
RunnableAdapter
1 static final class RunnableAdapter<T> implements Callable<T> { 2 final Runnable task; 3 final T result; 4 RunnableAdapter(Runnable task, T result) { 5 this.task = task; 6 this.result = result; 7 } 8 public T call() { 9 task.run(); 10 return result; 11 } 12 }
適配器。以Callable的形式執行Runnable並且返回給定的result。
PrivilegedCallable
1 static final class PrivilegedCallable<T> implements Callable<T> { 2 private final AccessControlContext acc; 3 private final Callable<T> task; 4 private T result; 5 private Exception exception; 6 PrivilegedCallable(Callable<T> task) { 7 this.task = task; 8 this.acc = AccessController.getContext(); 9 } 10 11 public T call() throws Exception { 12 AccessController.doPrivileged(new PrivilegedAction<T>() { 13 public T run() { 14 try { 15 result = task.call(); 16 } catch (Exception ex) { 17 exception = ex; 18 } 19 return null; 20 } 21 }, acc); 22 if (exception != null) 23 throw exception; 24 else 25 return result; 26 } 27 }
在訪問控制下運行的Callable。涉及到Java.security包中的內容。
PrivilegedCallableUsingCurrentClassLoader類與上面的PrivilegedCallable類似,只是使用的是CurrentClassLoader。
DefaultThreadFactory
1 static class DefaultThreadFactory implements ThreadFactory { 2 static final AtomicInteger poolNumber = new AtomicInteger(1); 3 final ThreadGroup group; 4 final AtomicInteger threadNumber = new AtomicInteger(1); 5 final String namePrefix; 6 7 DefaultThreadFactory() { 8 SecurityManager s = System.getSecurityManager(); 9 group = (s != null)? s.getThreadGroup() : 10 Thread.currentThread().getThreadGroup(); 11 namePrefix = "pool-" + 12 poolNumber.getAndIncrement() + 13 "-thread-"; 14 } 15 16 public Thread newThread(Runnable r) { 17 // 調用Thread構造方法創建線程 18 Thread t = new Thread(group, r, 19 namePrefix + threadNumber.getAndIncrement(), 20 0); 21 // 取消守護線程設置 22 if (t.isDaemon()) 23 t.setDaemon(false); 24 // 設置默認優先級 25 if (t.getPriority() != Thread.NORM_PRIORITY) 26 t.setPriority(Thread.NORM_PRIORITY); 27 return t; 28 } 29 }
DefaultThreadFactory 是默認的線程工程,提供創建線程的方法。
PrivilegedThreadFactory繼承自DefaultThreadFactory,區別在於線程執行的run方法指定了classLoader並受到權限的控制。
DelegatedExecutorService繼承自AbstractExecutorService,是一個包裝類,暴露ExecutorService的方法。
DelegatedScheduledExecutorService繼承自DelegatedExecutorService,實現了ScheduledExecutorService接口。它也是一個包裝類,公開ScheduledExecutorService方法。