1、線程池的好處
- 降低資源消耗(重復利用已創建的線程減少創建和銷毀線程的開銷)
- 提高響應速度(無須創建線程)
- 提高線程的可管理性
2、相關類圖
JDK5以后將工作單元和執行機制分離開來,工作單元包括Runnable和Callable;執行機制由Executor框架提供,管理線程的生命周期,將任務的提交和如何執行進行解耦。Executors是一個快速得到線程池的工具類,相關的類圖如下所示:
3、Executor框架接口
Executor接口
Executor接口只有一個execute方法,用來替代通常創建或啟動線程的方法。
public interface Executor { void execute(Runnable command); }
ExecutorService接口
ExecutorService接口繼承自Executor接口,加入了關閉方法、submit方法和對Callable、Future的支持。
ScheduledExecutorService接口
ScheduledExecutorService擴展ExecutorService接口並加入了對定時任務的支持。
4、ThreadPoolExecutor分析
ThreadPoolExecutor繼承自AbstractExecutorService,也是實現了ExecutorService接口。
4.1 內部狀態
1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 2 private static final int COUNT_BITS = Integer.SIZE - 3; 3 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 4 5 // runState is stored in the high-order bits 6 private static final int RUNNING = -1 << COUNT_BITS; 7 private static final int SHUTDOWN = 0 << COUNT_BITS; 8 private static final int STOP = 1 << COUNT_BITS; 9 private static final int TIDYING = 2 << COUNT_BITS; 10 private static final int TERMINATED = 3 << COUNT_BITS; 11 12 // Packing and unpacking ctl 13 private static int runStateOf(int c) { return c & ~CAPACITY; } 14 private static int workerCountOf(int c) { return c & CAPACITY; } 15 private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl是對線程池的運行狀態(高3位)和線程池中有效線程的數量(低29位)進行控制的一個字段。線程池有五種狀態,分別是:
- RUNNING:-1 << COUNT_BITS,即高3位為111,該狀態的線程池會接收新任務,並處理阻塞隊列中的任務;
- SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務;
- STOP : 1 << COUNT_BITS,即高3位為001,該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運行的任務;
- TIDYING : 2 << COUNT_BITS,即高3位為010, 所有的任務都已經終止;
- TERMINATED: 3 << COUNT_BITS,即高3位為011, terminated()方法已經執行完成。
4.2 構造方法
構造方法有4個,這里只列出其中最基礎的一個。
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.corePoolSize = corePoolSize; 16 this.maximumPoolSize = maximumPoolSize; 17 this.workQueue = workQueue; 18 this.keepAliveTime = unit.toNanos(keepAliveTime); 19 this.threadFactory = threadFactory; 20 this.handler = handler; 21 }
構造方法中參數的含義如下:
- corePoolSize:核心線程數量,線程池中應該常駐的線程數量
- maximumPoolSize:線程池允許的最大線程數,非核心線程在超時之后會被清除
- keepAliveTime:線程沒有任務執行時可以保持的時間
- unit:時間單位
- workQueue:阻塞隊列,存儲等待執行的任務。JDK提供了如下4種阻塞隊列:
- ArrayBlockingQueue:基於數組結構的有界阻塞隊列,按FIFO排序任務;
- LinkedBlockingQuene:基於鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高於ArrayBlockingQuene;
- SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene;
- PriorityBlockingQuene:具有優先級的無界阻塞隊列;
- threadFactory:線程工廠,來創建線程
- handler:線程池的飽和策略。如果阻塞隊列滿了並且沒有空閑的線程,這時如果繼續提交任務,就需要采取一種策略處理該任務。線程池提供了4種策略:
- AbortPolicy:直接拋出異常,這是默認策略;
- CallerRunsPolicy:用調用者所在的線程來執行任務;
- DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
- DiscardPolicy:直接丟棄任務。
4.3 execute方法
ThreadPoolExecutor.execute(task)實現了Executor.execute(task),用來提交任務,不能獲取返回值,代碼如下:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * Proceed in 3 steps: 6 * 7 * 1. If fewer than corePoolSize threads are running, try to 8 * start a new thread with the given command as its first 9 * task. The call to addWorker atomically checks runState and 10 * workerCount, and so prevents false alarms that would add 11 * threads when it shouldn't, by returning false. 12 * 13 * 2. If a task can be successfully queued, then we still need 14 * to double-check whether we should have added a thread 15 * (because existing ones died since last checking) or that 16 * the pool shut down since entry into this method. So we 17 * recheck state and if necessary roll back the enqueuing if 18 * stopped, or start a new thread if there are none. 19 * 20 * 3. If we cannot queue task, then we try to add a new 21 * thread. If it fails, we know we are shut down or saturated 22 * and so reject the task. 23 */ 24 int c = ctl.get(); 25 /* 26 * workerCountOf方法取出低29位的值,表示當前活動的線程數; 27 * 如果當前活動線程數小於corePoolSize,則新建一個線程放入線程池中; 28 * 並把任務添加到該線程中。 29 */ 30 31 if (workerCountOf(c) < corePoolSize) { 32 /* 33 * addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷還是maximumPoolSize來判斷; 34 * 如果為true,根據corePoolSize來判斷; 35 * 如果為false,則根據maximumPoolSize來判斷 36 */ 37 if (addWorker(command, true)) 38 return; 39 /* 40 * 如果添加失敗,則重新獲取ctl值 41 */ 42 c = ctl.get(); 43 } 44 /* 45 * 線程池處於RUNNING狀態,把提交的任務成功放入阻塞隊列中 46 */ 47 if (isRunning(c) && workQueue.offer(command)) { 48 // 重新獲取ctl值 49 int recheck = ctl.get(); 50 // 再次判斷線程池的運行狀態,如果不是運行狀態,由於之前已經把command添加到workQueue中了, 51 // 這時需要移除該command 52 // 執行過后通過handler使用拒絕策略對該任務進行處理,整個方法返回 53 if (! isRunning(recheck) && remove(command)) 54 reject(command); 55 /* 56 * 獲取線程池中的有效線程數,如果數量是0,則執行addWorker方法 57 * 這里傳入的參數表示: 58 * 1. 第一個參數為null,表示在線程池中創建一個線程,但不去啟動; 59 * 2. 第二個參數為false,將線程池的有限線程數量的上限設置為maximumPoolSize,添加線程時根據maximumPoolSize來判斷; 60 * 如果判斷workerCount大於0,則直接返回,在workQueue中新增的command會在將來的某個時刻被執行。 61 */ 62 else if (workerCountOf(recheck) == 0) 63 addWorker(null, false); 64 } 65 /* 66 * 如果執行到這里,有兩種情況: 67 * 1. 線程池已經不是RUNNING狀態; 68 * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize並且workQueue已滿。 69 * 這時,再次調用addWorker方法,但第二個參數傳入為false,將線程池的有限線程數量的上限設置為maximumPoolSize; 70 * 如果失敗則拒絕該任務 71 */ 72 else if (!addWorker(command, false)) 73 reject(command); 74 }
如果線程池狀態一直是RUNNING,則執行過程如下:
- 如果workerCount < corePoolSize,則創建並啟動一個線程來執行新提交的任務;
- 如果workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則創建並啟動一個線程來執行新提交的任務;
- 如果workerCount >= maximumPoolSize,並且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。
4.4 addWorker方法
從executor的方法實現可以看出,addWorker主要負責創建新的線程並執行任務。線程池創建新線程執行任務時,需要獲取全局鎖:
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 // 獲取運行狀態 6 int rs = runStateOf(c); 7 /* 8 * 這個if判斷 9 * 如果rs >= SHUTDOWN,則表示此時不再接收新任務; 10 * 接着判斷以下3個條件,只要有1個不滿足,則返回false: 11 * 1. rs == SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞隊列中已保存的任務 12 * 2. firsTask為空 13 * 3. 阻塞隊列不為空 14 * 15 * 首先考慮rs == SHUTDOWN的情況 16 * 這種情況下不會接受新提交的任務,所以在firstTask不為空的時候會返回false; 17 * 然后,如果firstTask為空,並且workQueue也為空,則返回false, 18 * 因為隊列中已經沒有任務了,不需要再添加線程了 19 */ 20 // Check if queue empty only if necessary. 21 if (rs >= SHUTDOWN && 22 ! (rs == SHUTDOWN && 23 firstTask == null && 24 ! workQueue.isEmpty())) 25 return false; 26 27 for (;;) { 28 // 獲取線程數 29 int wc = workerCountOf(c); 30 // 如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false; 31 // 這里的core是addWorker方法的第二個參數,如果為true表示根據corePoolSize來比較, 32 // 如果為false則根據maximumPoolSize來比較。 33 if (wc >= CAPACITY || 34 wc >= (core ? corePoolSize : maximumPoolSize)) 35 return false; 36 // 嘗試增加workerCount,如果成功,則跳出第一個for循環 37 if (compareAndIncrementWorkerCount(c)) 38 break retry; 39 // 如果增加workerCount失敗,則重新獲取ctl的值 40 c = ctl.get(); // Re-read ctl 41 // 如果當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行 42 if (runStateOf(c) != rs) 43 continue retry; 44 // else CAS failed due to workerCount change; retry inner loop 45 } 46 } 47 48 boolean workerStarted = false; 49 boolean workerAdded = false; 50 Worker w = null; 51 try { 52 // 根據firstTask來創建Worker對象 53 w = new Worker(firstTask); 54 // 每一個Worker對象都會創建一個線程 55 final Thread t = w.thread; 56 if (t != null) { 57 final ReentrantLock mainLock = this.mainLock; 58 mainLock.lock(); 59 try { 60 // Recheck while holding lock. 61 // Back out on ThreadFactory failure or if 62 // shut down before lock acquired. 63 int rs = runStateOf(ctl.get()); 64 // rs < SHUTDOWN表示是RUNNING狀態; 65 // 如果rs是RUNNING狀態或者rs是SHUTDOWN狀態並且firstTask為null,向線程池中添加線程。 66 // 因為在SHUTDOWN時不會在添加新的任務,但還是會執行workQueue中的任務 67 if (rs < SHUTDOWN || 68 (rs == SHUTDOWN && firstTask == null)) { 69 if (t.isAlive()) // precheck that t is startable 70 throw new IllegalThreadStateException(); 71 // workers是一個HashSet 72 workers.add(w); 73 int s = workers.size(); 74 // largestPoolSize記錄着線程池中出現過的最大線程數量 75 if (s > largestPoolSize) 76 largestPoolSize = s; 77 workerAdded = true; 78 } 79 } finally { 80 mainLock.unlock(); 81 } 82 if (workerAdded) { 83 // 啟動線程,執行任務(Worker.thread(firstTask).start()); 84 //啟動時會調用Worker類中的run方法,Worker本身實現了Runnable接口,所以一個Worker類型的對象也是一個線程。 85 t.start(); 86 workerStarted = true; 87 } 88 } 89 } finally { 90 if (! workerStarted) 91 addWorkerFailed(w); 92 } 93 return workerStarted; 94 }
4.5 Worker類
線程池中的每一個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象。Worker類設計如下:
- 繼承了AQS類,用於判斷線程是否空閑以及是否可以被中斷,可以方便的實現工作線程的中止操作;
- 實現了Runnable接口,可以將自身作為一個任務在工作線程中執行;
- 當前提交的任務firstTask作為參數傳入Worker的構造方法;
1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable 4 { 5 /** 6 * This class will never be serialized, but we provide a 7 * serialVersionUID to suppress a javac warning. 8 */ 9 private static final long serialVersionUID = 6138294804551838833L; 10 11 /** Thread this worker is running in. Null if factory fails. */ 12 final Thread thread; 13 /** Initial task to run. Possibly null. */ 14 Runnable firstTask; 15 /** Per-thread task counter */ 16 volatile long completedTasks; 17 18 /** 19 * Creates with given first task and thread from ThreadFactory. 20 * @param firstTask the first task (null if none) 21 */ 22 Worker(Runnable firstTask) { 23 setState(-1); // inhibit interrupts until runWorker 24 this.firstTask = firstTask; 25 this.thread = getThreadFactory().newThread(this); 26 } 27 28 /** Delegates main run loop to outer runWorker */ 29 public void run() { 30 runWorker(this); 31 } 32 33 // Lock methods 34 // 35 // The value 0 represents the unlocked state. 36 // The value 1 represents the locked state. 37 38 protected boolean isHeldExclusively() { 39 return getState() != 0; 40 } 41 42 protected boolean tryAcquire(int unused) { 43 if (compareAndSetState(0, 1)) { 44 setExclusiveOwnerThread(Thread.currentThread()); 45 return true; 46 } 47 return false; 48 } 49 50 protected boolean tryRelease(int unused) { 51 setExclusiveOwnerThread(null); 52 setState(0); 53 return true; 54 } 55 56 public void lock() { acquire(1); } 57 public boolean tryLock() { return tryAcquire(1); } 58 public void unlock() { release(1); } 59 public boolean isLocked() { return isHeldExclusively(); } 60 61 void interruptIfStarted() { 62 Thread t; 63 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 64 try { 65 t.interrupt(); 66 } catch (SecurityException ignore) { 67 } 68 } 69 } 70 }
4.6 runWorker方法
Worker類中的run方法調用了runWorker方法來執行任務,執行過程如下:
- 線程啟動之后,通過unlock方法釋放鎖,設置AQS的state為0,表示運行可中斷;
- Worker執行firstTask或從workQueue中獲取任務:
- 進行加鎖操作,保證thread不被其他線程中斷(除非線程池被中斷)
- 檢查線程池狀態,倘若線程池處於中斷狀態,當前線程將中斷。
- 執行beforeExecute
- 執行任務的run方法
- 執行afterExecute方法
- 解鎖操作
1 final void runWorker(Worker w) { 2 Thread wt = Thread.currentThread(); 3 // 獲取第一個任務 4 Runnable task = w.firstTask; 5 w.firstTask = null; 6 // 允許中斷 7 w.unlock(); // allow interrupts 8 boolean completedAbruptly = true; 9 try { 10 // 如果task為空,則通過getTask來獲取任務 11 while (task != null || (task = getTask()) != null) { 12 w.lock(); 13 // If pool is stopping, ensure thread is interrupted; 14 // if not, ensure thread is not interrupted. This 15 // requires a recheck in second case to deal with 16 // shutdownNow race while clearing interrupt 17 if ((runStateAtLeast(ctl.get(), STOP) || 18 (Thread.interrupted() && 19 runStateAtLeast(ctl.get(), STOP))) && 20 !wt.isInterrupted()) 21 wt.interrupt(); 22 try { 23 beforeExecute(wt, task); 24 Throwable thrown = null; 25 try { 26 task.run(); 27 } catch (RuntimeException x) { 28 thrown = x; throw x; 29 } catch (Error x) { 30 thrown = x; throw x; 31 } catch (Throwable x) { 32 thrown = x; throw new Error(x); 33 } finally { 34 afterExecute(task, thrown); 35 } 36 } finally { 37 task = null; 38 w.completedTasks++; 39 w.unlock(); 40 } 41 } 42 completedAbruptly = false; 43 } finally { 44 processWorkerExit(w, completedAbruptly); 45 } 46 }
4.7 getTask方法
getTask方法用來從阻塞隊列中取等待的任務
1 private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 8 // Check if queue empty only if necessary. 9 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 10 decrementWorkerCount(); 11 return null; 12 } 13 14 int wc = workerCountOf(c); 15 16 // Are workers subject to culling? 17 // timed變量用於判斷是否需要進行超時控制。 18 // allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時; 19 // wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量; 20 // 對於超過核心線程數量的這些線程,需要進行超時控制 21 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 22 23 if ((wc > maximumPoolSize || (timed && timedOut)) 24 && (wc > 1 || workQueue.isEmpty())) { 25 if (compareAndDecrementWorkerCount(c)) 26 return null; 27 continue; 28 } 29 30 try { 31 /* 32 * 根據timed來判斷,如果為true,則通過阻塞隊列的poll方法進行超時控制,如果在keepAliveTime時間內沒有獲取到任務,則返回null; 33 * 否則通過take方法,如果這時隊列為空,則take方法會阻塞直到隊列不為空。 34 * 35 */ 36 Runnable r = timed ? 37 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 38 workQueue.take(); 39 if (r != null) 40 return r; 41 timedOut = true; 42 } catch (InterruptedException retry) { 43 timedOut = false; 44 } 45 } 46 }
5 任務的提交
- submit任務,等待線程池execute
- 執行FutureTask類的get方法時,會把主線程封裝成WaitNode節點並保存在waiters鏈表中, 並阻塞等待運行結果;
- FutureTask任務執行完成后,通過UNSAFE設置waiters相應的waitNode為null,並通過LockSupport類unpark方法喚醒主線程。
1 public class Test{ 2 3 public static void main(String[] args) { 4 5 ExecutorService es = Executors.newCachedThreadPool(); 6 Future<String> future = es.submit(new Callable<String>() { 7 @Override 8 public String call() throws Exception { 9 try { 10 TimeUnit.SECONDS.sleep(2); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 return "future result"; 15 } 16 }); 17 try { 18 String result = future.get(); 19 System.out.println(result); 20 } catch (Exception e) { 21 e.printStackTrace(); 22 } 23 } 24 }
在實際業務場景中,Future和Callable基本是成對出現的,Callable負責產生結果,Future負責獲取結果。
- Callable接口類似於Runnable,只是Runnable沒有返回值。
- Callable任務除了返回正常結果之外,如果發生異常,該異常也會被返回,即Future可以拿到異步執行任務各種結果;
- Future.get方法會導致主線程阻塞,直到Callable任務執行完成;
5.1 submit方法
AbstractExecutorService.submit()實現了ExecutorService.submit(),可以獲得執行完的返回值。而ThreadPoolExecutor是AbstractExecutorService的子類,所以submit方法也是ThreadPoolExecutor的方法。
1 public Future<?> submit(Runnable task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<Void> ftask = newTaskFor(task, null); 4 execute(ftask); 5 return ftask; 6 } 7 public <T> Future<T> submit(Runnable task, T result) { 8 if (task == null) throw new NullPointerException(); 9 RunnableFuture<T> ftask = newTaskFor(task, result); 10 execute(ftask); 11 return ftask; 12 } 13 public <T> Future<T> submit(Callable<T> task) { 14 if (task == null) throw new NullPointerException(); 15 RunnableFuture<T> ftask = newTaskFor(task); 16 execute(ftask); 17 return ftask; 18 }
通過submit方法提交的Callable或者Runnable任務會被封裝成了一個FutureTask對象。通過Executor.execute方法提交FutureTask到線程池中等待被執行,最終執行的是FutureTask的run方法。
5.2 FutureTask對象
類圖
內部狀態
/** *... * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
內部狀態的修改通過sun.misc.Unsafe修改。
get方法
1 public V get() throws InterruptedException, ExecutionException { 2 int s = state; 3 if (s <= COMPLETING) 4 s = awaitDone(false, 0L); 5 return report(s); 6 }
內部通過awaitDone方法對主線程進行阻塞,具體實現如下:
1 /** 2 * Awaits completion or aborts on interrupt or timeout. 3 * 4 * @param timed true if use timed waits 5 * @param nanos time to wait, if timed 6 * @return state upon completion 7 */ 8 private int awaitDone(boolean timed, long nanos) 9 throws InterruptedException { 10 final long deadline = timed ? System.nanoTime() + nanos : 0L; 11 WaitNode q = null; 12 boolean queued = false; 13 for (;;) { 14 if (Thread.interrupted()) { 15 removeWaiter(q); 16 throw new InterruptedException(); 17 } 18 19 int s = state; 20 if (s > COMPLETING) { 21 if (q != null) 22 q.thread = null; 23 return s; 24 } 25 else if (s == COMPLETING) // cannot time out yet 26 Thread.yield(); 27 else if (q == null) 28 q = new WaitNode(); 29 else if (!queued) 30 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, 31 q.next = waiters, q); 32 else if (timed) { 33 nanos = deadline - System.nanoTime(); 34 if (nanos <= 0L) { 35 removeWaiter(q); 36 return state; 37 } 38 LockSupport.parkNanos(this, nanos); 39 } 40 else 41 LockSupport.park(this); 42 } 43 }
- 如果主線程被中斷,則拋出中斷異常;
- 判斷FutureTask當前的state,如果大於COMPLETING,說明任務已經執行完成,則直接返回;
- 如果當前state等於COMPLETING,說明任務已經執行完,這時主線程只需通過yield方法讓出cpu資源,等待state變成NORMAL;
- 通過WaitNode類封裝當前線程,並通過UNSAFE添加到waiters鏈表;
- 最終通過LockSupport的park或parkNanos掛起線程。
run方法
1 public void run() { 2 if (state != NEW || 3 !UNSAFE.compareAndSwapObject(this, runnerOffset, 4 null, Thread.currentThread())) 5 return; 6 try { 7 Callable<V> c = callable; 8 if (c != null && state == NEW) { 9 V result; 10 boolean ran; 11 try { 12 result = c.call(); 13 ran = true; 14 } catch (Throwable ex) { 15 result = null; 16 ran = false; 17 setException(ex); 18 } 19 if (ran) 20 set(result); 21 } 22 } finally { 23 // runner must be non-null until state is settled to 24 // prevent concurrent calls to run() 25 runner = null; 26 // state must be re-read after nulling runner to prevent 27 // leaked interrupts 28 int s = state; 29 if (s >= INTERRUPTING) 30 handlePossibleCancellationInterrupt(s); 31 } 32 }
FutureTask.run方法是在線程池中被執行的,而非主線程
- 通過執行Callable任務的call方法;
- 如果call執行成功,則通過set方法保存結果;
- 如果call執行有異常,則通過setException保存異常。
6 Executors類
Exectors工廠類提供了線程池的初始化接口,主要有如下幾種:
newFixedThreadPool
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
創建一個固定大小、任務隊列容量無界(Integer.MAX_VALUE)的線程池,其中corePoolSize =maximumPoolSize=nThreads,阻塞隊列為LinkedBlockingQuene。
注意點:
- 線程池的線程數量達corePoolSize后,即使線程池沒有可執行任務時,也不會釋放線程;
- 線程池里的線程數量不超過
corePoolSize
,這導致了maximumPoolSize
和keepAliveTime
將會是個無用參數 ; - 由於使用了無界隊列, 所以FixedThreadPool永遠不會拒絕, 即飽和策略失效。
newSingleThreadExecutor
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }
只有一個線程來執行無界任務隊列的單一線程池。如果該線程異常結束,會重新創建一個新的線程繼續執行任務,唯一的線程可以保證所提交任務的順序執行。由於使用了無界隊列, 所以SingleThreadPool永遠不會拒絕,即飽和策略失效。與newFixedThreadPool(1)的區別在於單一線程池的大小不能再改變。
newCachedThreadPool
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
創建一個大小無界的緩沖線程池。任務隊列是一個同步隊列。緩沖線程池適用於執行耗時較小的異步任務。池的核心線程數=0 最大線程數=Integer.MAX_VLUE。與前兩種稍微不同的是:
- 任務加入到池中,如果池中有空閑線程,則用空閑線程執行,如無則創建新線程執行。
- 池中的線程空閑超過60秒,將被銷毀釋放。
- 池中的線程數隨任務的多少變化。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
能定時執行任務的線程,池的核心線程數由參數指定。和前面3個線程池基於ThreadPoolExecutor類實現不同的是,它基於ScheduledThreadPoolExecutor實現。
7 線程池的監控
可以使用ThreadPoolExecutor以下方法:
- getTaskCount:線程池已經執行的和未執行的任務總數;
- getCompletedTaskCount:線程池已完成的任務數量,該值小於等於taskCount;
- getLargestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過,也就是達到了maximumPoolSize;
- getPoolSize:線程池當前的線程數量;
- getActiveCount:當前線程池中正在執行任務的線程數量。