Java線程池概述
線程池技術在並發時經常會使用到,java中的線程池的使用是通過調用ThreadPoolExecutor來實現的。
ThreadPoolExecutor提供了四個構造函數,最后都會歸結於下面這個構造方法:
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
這些參數的意義如下:
corePoolSize:該線程池中核心線程數最大值
maximumPoolSize: 該線程池中線程總數最大值
keepAliveTime:該線程池中非核心線程閑置超時時長
unit:keepAliveTime的單位
workQueue:阻塞隊列BlockingQueue,維護着等待執行的Runnable對象
threadFactory:創建線程的接口,需要實現他的Thread newThread(Runnable r)方法。
RejectedExecutionHandler:飽和策略,最大線程和工作隊列容量且已經飽和時execute方法都將調用RejectedExecutionHandler
ThreadPoolExecutor工作流程
大致過程陳述為:
1. 向線程池中添加任務,當任務數量少於corePoolSize時,會自動創建thead來處理這些任務;
2. 當添加任務數大於corePoolSize且少於maximmPoolSize時,不在創建線程,而是將這些任務放到阻塞隊列中,等待被執行;
3. 接上面2的條件,且當阻塞隊列滿了之后,繼續創建thread,從而加速處理阻塞隊列;
4. 當添加任務大於maximmPoolSize時,根據飽和策略決定是否容許繼續向線程池中添加任務,默認的飽和策略是AbortPolicy(直接丟棄)。
線程池中使用的阻塞隊列
ArrayBlockingQueue:基於數組結構的有界阻塞隊列,構造函數一定要傳大小,FIFO(先進先出);
LinkedBlockingQueue:無界,默認大小65536(Integer.MAX_VALUE),當大量請求任務時,容易造成內存耗盡。
SynchronousQueue:同步隊列,是一個特殊的BlockingQueue,它沒有容量(這是因為在SynchronousQueue中,插入將等待另一個線程的刪除操作,反之亦然)。具體可以參考:《Java SynchronousQueue Examples(譯)》
PriorityBlockingQueue: 優先隊列,無界。DelayedWorkQueue:這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務
阻塞隊列常見的方法如下表所示:

常見四種線程池
newCachedThreadPool
newFixedThreadPool
newSingleThreadExecutor
newScheduledThreadPool

它們通過Executors以靜態方法的方式直接調用,實質上是它們最終調用的是ThreadPoolExecutor的構造方法,也就是本文最前面那段代碼。
注:KeepAliveTime=0的話,表示不等待
摘自阿里巴巴開發手冊:
【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣 的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
說明:Executors 返回的線程池對象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool: 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool: 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。
應用樣例,更多請參看我的github
package multiThread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorTest { public static void main(String[] args) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 1, 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new ThreadPoolExecutor.DiscardPolicy()); System.out.println("getQueue:" + threadPool.getQueue().size()); System.out.println("remainingCapacity:" + threadPool.getQueue().remainingCapacity()); threadPool.execute(() -> { try { int count = 0; Thread.currentThread().setName("aa"); while (count <= 10) { System.out.println(Thread.currentThread().getName() + "getQueue:" + threadPool.getQueue().size()); System.out.println(Thread.currentThread().getName() + System.currentTimeMillis()); Thread.sleep(1000); count++; } } catch (Exception e) { e.printStackTrace(); } }); threadPool.execute(() -> { try { int count = 0; Thread.currentThread().setName("bbb"); while (count <= 100) { System.out.println(Thread.currentThread().getName() + "getQueue:" + threadPool.getQueue().size()); System.out.println(Thread.currentThread().getName() + System.currentTimeMillis()); Thread.sleep(1000); count++; } } catch (Exception e) { e.printStackTrace(); } }); } }