一、線程池作用
在上一篇隨筆中有提到多線程具有同一時刻處理多個任務的特點,即並行工作,因此多線程的用途非常廣泛,特別在性能優化上顯得尤為重要。然而,多線程處理消耗的時間包括創建線程時間T1、工作時間T2、銷毀線程時間T3,創建和銷毀線程需要消耗一定的時間和資源,如果能夠減少這部分的時間消耗,性能將會進一步提高,線程池就能夠很好解決問題。線程池在初始化時會創建一定數量的線程,當需要線程執行任務時,從線程池取出線程,當任務執行完成后,線程置回線程池成為空閑線程,等待下一次任務。JDK1.5提供了一個Executors工廠類來產生線程池,該工廠類提供5種靜態方法來創建線程池,詳細請參見:http://www.cnblogs.com/firstsheng618/p/3861097.html。
二、認識隊列
隊列具有先進先出(FIFO)的特點,不同於堆的后進先出(LIFO),隊列(Queue)是只允許在一端進行插入,而在另一端進行刪除的運算受限的線性表。線程池就是通過隊列的方式實現任務的調用。下面介紹幾個常用的隊列:
1、ArrayBlockingQueue:一個有數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序,隊列的頭部 是在隊列中存在時間最長的元素,隊列的尾部 是在隊列中存在時間最短的元素,新元素插入到隊列的尾部,隊列獲取操作則是從隊列頭部開始獲得元素。
2、ConcurrentLinkedQueue:一個基於鏈接節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序,隊列的頭部 是隊列中時間最長的元素,隊列的尾部 是隊列中時間最短的元素,新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素,當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇,此隊列不允許使用 null 元素。
3、LinkedBlockingQueue:一個基於已鏈接節點的、范圍任意的BlockingQueue。此隊列按 FIFO(先進先出)排序元素,隊列的頭部 是在隊列中時間最長的元素,隊列的尾部 是在隊列中時間最短的元素,新元素插入到隊列的尾部,並且隊列獲取操作會獲得位於隊列頭部的元素,鏈接隊列的吞吐量通常要高於基於數組的隊列,但是在大多數並發應用程序中,其可預知的性能要低。
4、SynchronousQueue:一種阻塞隊列,其中每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。
三、隊列排隊策略
1、ThreadPoolExecutor.AbortPolicy:用於被拒絕任務的處理程序,它將拋出 RejectedExecutionException,線程池默認被拒絕任務的處理策略。
2、ThreadPoolExecutor.CallerRunsPolicy:用於被拒絕任務的處理程序,它直接在 execute 方法的調用線程(上一層線程)中運行被拒絕的任務;如果執行程序已關閉,則會丟棄該任務。
3、ThreadPoolExecutor.DiscardOldestPolicy:用於被拒絕任務的處理程序,它放棄最舊的未處理請求,然后重試 execute;如果執行程序已關閉,則會丟棄該任務。
4、ThreadPoolExecutor.DiscardPolicy:用於被拒絕任務的處理程序,默認情況下它將丟棄被拒絕的任務。
四、自定義線程
線程池工作策略:A. 如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。B. 如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。C. 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。
ArrayBlockingQueue和LinkedBlockingQueue是兩個最常用的阻塞隊列,一般情況下足以處理多線程間的生產者和消費者問題,LinkedBlockingQueue內部分別采用獨立鎖來控制數據同步,實現生產者端和消費者端並行工作,高效的處理並發數據。
基於ThreadPoolExecutor自定義線程池如下:

public class LinkedBqThreadPool extends ThreadPoolExecutor { protected Logger log = Logger.getLogger(getClass()); /** * 正在執行任務數量 */ private AtomicInteger taskNum = new AtomicInteger(0); /** * 構建線程池 * @param corePoolSize 池中所保存的核心線程數 * @param maximumPoolSize 池中允許的最大線程數 * @param keepActiveTime 非核心線程空閑等待新任務的最長時間 * @param timeunit keepActiveTime參數的時間單位 * @param blockingqueue 任務隊列 */ public LinkedBqThreadPool(int corePoolSize, int maximumPoolSize, long keepActiveTime, TimeUnit timeunit, BlockingQueue<Runnable> blockingqueue) { super(corePoolSize, maximumPoolSize, keepActiveTime, timeunit, blockingqueue); } /** * 構建線程池 * @param corePoolSize 池中所保存的核心線程數 * @param maximumPoolSize 池中允許的最大線程數 * @param keepActiveTime 非核心線程空閑等待新任務的最長時間(單位:秒) * @param blockingqueue 任務隊列 */ public LinkedBqThreadPool(int corePoolSize, int maximumPoolSize, long keepActiveTime, BlockingQueue<Runnable> blockingqueue) { this(corePoolSize, maximumPoolSize, keepActiveTime, TimeUnit.SECONDS, blockingqueue); } /** * 構建線程池 * @param corePoolSize 池中所保存的核心線程數 * @param maximumPoolSize 池中允許的最大線程數 * @param keepActiveTime 非核心線程空閑等待新任務的最長時間(單位:秒) */ public LinkedBqThreadPool(int corePoolSize, int maximumPoolSize, long keepActiveTime) { this(corePoolSize, maximumPoolSize, keepActiveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); } /** * 構建單線程的線程池 */ public LinkedBqThreadPool() { this(1, 1, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); } /** * 任務執行,以原子方式將當前值加 1 */ public void execute(Runnable task) { taskNum.getAndIncrement(); super.execute(task); } /** * 任務執行之后 */ public void afterExecute(Runnable task, Throwable throwable) { taskNum.decrementAndGet(); log.debug("task : " + task.getClass().getSimpleName() + " completed,Throwable:" + throwable + ",taskNum:" + getTaskNum()); synchronized(this) { notifyAll(); } } /** * 掛起當前線程,直到所有任務執行完成 */ public void waitComplete() { try { synchronized(this){ while(getTaskNum() > 0){ wait(500); } } } catch (InterruptedException e) { log.error(e + ", taskNum:" + getTaskNum()); } } /** * @return 未執行的任務數 */ public int getTaskNum() { return taskNum.get(); } /** * @param time 非核心線程空閑等待新任務的最長時間(單位:秒) */ public void setKeepAliveTime(int time) { super.setKeepAliveTime(time, TimeUnit.SECONDS); } /** * @param size 池中所保存的核心線程數 */ public void setCorePoolSize(int size) { super.setCorePoolSize(size); } /** * @param size 池中允許的最大線程數 */ public void setMaximumPoolSize(int size) { super.setMaximumPoolSize(size); } }

public class ArrayBqThreadPool extends ThreadPoolExecutor { protected Logger log = Logger.getLogger(getClass()); /** * 待執行任務數量 */ private AtomicInteger taskNum = new AtomicInteger(0); /** * 構建線程池 * @param corePoolSize 池中所保存的核心線程數 * @param maximumPoolSize 池中允許的最大線程數 * @param keepActiveTime 非核心線程空閑等待新任務的最長時間(單位:秒) * @param queueCapacity 隊列容量,即等待執行任務數 */ public ArrayBqThreadPool(int corePoolSize, int maximumPoolSize, long keepActiveTime, int queueCapacity) { super(corePoolSize, maximumPoolSize, keepActiveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity)); } /** * 構建單線程的線程池 */ public ArrayBqThreadPool(int queueCapacity) { this(1, 1, 1, queueCapacity); } /** * 任務執行,以原子方式將當前值加 1 */ public void execute(Runnable task) { taskNum.getAndIncrement(); super.execute(task); } /** * 任務執行之后 */ public void afterExecute(Runnable task, Throwable throwable) { taskNum.decrementAndGet(); log.debug("task : " + task.getClass().getSimpleName() + " completed,Throwable:" + throwable + ",taskNum:" + getTaskNum()); synchronized(this) { notifyAll(); } } /** * 掛起當前線程,直到所有任務執行完成 */ public void waitComplete() { try { synchronized(this){ while(getTaskNum() > 0){ wait(500); } } } catch (InterruptedException e) { log.error(e + ", taskNum:" + getTaskNum()); } } /** * @return 待執行的任務數 */ public int getTaskNum() { return taskNum.get(); } /** * @param time 非核心線程空閑等待新任務的最長時間(單位:秒) */ public void setKeepAliveTime(int time) { super.setKeepAliveTime(time, TimeUnit.SECONDS); } /** * @param size 池中所保存的核心線程數 */ public void setCorePoolSize(int size) { super.setCorePoolSize(size); } /** * @param size 池中允許的最大線程數 */ public void setMaximumPoolSize(int size) { super.setMaximumPoolSize(size); } }