JDK 為我們內置了四種常見線程池的實現,均可以使用 Executors 工廠類創建。
1.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
可以看到,FixedThreadPool 的核心線程數和最大線程數都是指定值,也就是說當線程池中的線程數超過核心線程數后,任務都會被放到阻塞隊列中。
此外 keepAliveTime 為 0,也就是多余的空余線程會被立即終止(由於這里沒有多余線程,這個參數也沒什么意義了)。
而這里選用的阻塞隊列是 LinkedBlockingQueue,使用的是默認容量 Integer.MAX_VALUE,相當於沒有上限。
因此這個線程池執行任務的流程如下:
- 線程數少於核心線程數,也就是設置的線程數時,新建線程執行任務
- 線程數等於核心線程數后,將任務加入阻塞隊列
- 由於隊列容量非常大,可以一直加加加
- 執行完任務的線程反復去隊列中取任務執行
FixedThreadPool 用於負載比較重的服務器,為了資源的合理利用,需要限制當前線程數量。
2.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
從參數可以看出來,SingleThreadExecutor 相當於特殊的 FixedThreadPool,它的執行流程如下:
- 線程池中沒有線程時,新建一個線程執行任務
- 有一個線程以后,將任務加入阻塞隊列,不停加加加
- 唯一的這一個線程不停地去隊列里取任務執行
聽起來很可憐的樣子 - -。
SingleThreadExecutor 用於串行執行任務的場景,每個任務必須按順序執行,不需要並發執行。
3.newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
可以看到,CachedThreadPool 沒有核心線程,非核心線程數無上限,也就是全部使用外包,但是每個外包空閑的時間只有 60 秒,超過后就會被回收。
CachedThreadPool 使用的隊列是 SynchronousQueue,這個隊列的作用就是傳遞任務,並不會保存。
因此當提交任務的速度大於處理任務的速度時,每次提交一個任務,就會創建一個線程。極端情況下會創建過多的線程,耗盡 CPU 和內存資源。
它的執行流程如下:
- 沒有核心線程,直接向
SynchronousQueue中提交任務 - 如果有空閑線程,就去取出任務執行;如果沒有空閑線程,就新建一個
- 執行完任務的線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就拜拜
由於空閑 60 秒的線程會被終止,長時間保持空閑的 CachedThreadPool 不會占用任何資源。
CachedThreadPool 用於並發執行大量短期的小任務,或者是負載較輕的服務器。
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor, 最多線程數為 Integer.MAX_VALUE ,使用 DelayedWorkQueue 作為任務隊列。
ScheduledThreadPoolExecutor 添加任務和執行任務的機制與ThreadPoolExecutor 有所不同。
ScheduledThreadPoolExecutor 添加任務提供了另外兩個方法:
scheduleAtFixedRate():按某種速率周期執行scheduleWithFixedDelay():在某個延遲后執行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement()); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t;
可以看到,這兩種方法都是創建了一個 ScheduledFutureTask 對象,調用 decorateTask() 方法轉成 RunnableScheduledFuture 對象,然后添加到隊列中
看下 ScheduledFutureTask 的主要屬性:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { //添加到隊列中的順序 private final long sequenceNumber; //何時執行這個任務 private volatile long time; //執行的間隔周期 private final long period; //實際被添加到隊列中的 task RunnableScheduledFuture<V> outerTask = this; //在 delay queue 中的索引,便於取消時快速查找 int heapIndex; //...
DelayQueue 中封裝了一個優先級隊列,這個隊列會對隊列中的 ScheduledFutureTask 進行排序,兩個任務的執行 time 不同時,time 小的先執行;否則比較添加到隊列中的順序 sequenceNumber ,先提交的先執行。
ScheduledThreadPoolExecutor 的執行流程如下:
- 調用上面兩個方法添加一個任務
- 線程池中的線程從 DelayQueue 中取任務
- 然后執行任務
具體執行任務的步驟也比較復雜:
- 線程從 DelayQueue 中獲取 time 大於等於當前時間的 ScheduledFutureTask
DelayQueue.take()
- 執行完后修改這個 task 的 time 為下次被執行的時間
- 然后再把這個 task 放回隊列中
DelayQueue.add()
ScheduledThreadPoolExecutor 用於需要多個后台線程執行周期任務,同時需要限制線程數量的場景。
兩種提交任務的方法
ExecutorService 提供了兩種提交任務的方法:
execute
execute():提交不需要返回值的任務
submit():提交需要返回值的任務
void execute(Runnable command)
execute() 的參數是一個 Runnable,也沒有返回值。因此提交后無法判斷該任務是否被線程池執行成功。
ExecutorService executor = Executors.newCachedThreadPool(); executor.execute(new Runnable() { @Override public void run() { //do something } });
submit
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
submit() 有三種重載,參數可以是 Callable 也可以是 Runnable。
同時它會返回一個 Funture 對象,通過它我們可以判斷任務是否執行成功。
獲得執行結果調用 Future.get() 方法,這個方法會阻塞當前線程直到任務完成。
提交一個 Callable 任務時,需要使用 FutureTask 包一層:
FutureTask futureTask = new FutureTask(new Callable<String>() { //創建 Callable 任務 @Override public String call() throws Exception { String result = ""; //do something return result; } }); Future<?> submit = executor.submit(futureTask); //提交到線程池 try { Object result = submit.get(); //獲取結果 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
如何合理地選擇或者配置
了解 JDK 提供的幾種線程池實現,在實際開發中如何選擇呢?
根據任務類型決定。
前面已經介紹了,這里再小節一下:
CachedThreadPool用於並發執行大量短期的小任務,或者是負載較輕的服務器。FixedThreadPool用於負載比較重的服務器,為了資源的合理利用,需要限制當前線程數量。SingleThreadExecutor用於串行執行任務的場景,每個任務必須按順序執行,不需要並發執行。ScheduledThreadPoolExecutor用於需要多個后台線程執行周期任務,同時需要限制線程數量的場景。
自定義線程池時,如果任務是 CPU 密集型(需要進行大量計算、處理),則應該配置盡量少的線程,比如 CPU 個數 + 1,這樣可以避免出現每個線程都需要使用很長時間但是有太多線程爭搶資源的情況;
如果任務是 IO密集型(主要時間都在 I/O,CPU 空閑時間比較多),則應該配置多一些線程,比如 CPU 數的兩倍,這樣可以更高地壓榨 CPU。
為了錯誤避免創建過多線程導致系統奔潰,建議使用有界隊列。因為它在無法添加更多任務時會拒絕任務,這樣可以提前預警,避免影響整個系統。
執行時間、順序有要求的話可以選擇優先級隊列,同時也要保證低優先級的任務有機會被執行。
總結
這篇文章簡單介紹了 Java 中線程池的工作原理和一些常見線程池的使用,在實際開發中最好使用線程池來統一管理異步任務,而不是直接 new 一個線程執行任務。
