線程池架構
上圖是線程池的架構圖。Java里面線程池的頂級接口是Executor,Executor並不是一個線程
池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService。
比較重要的幾個類:
類/接口 | 描述 |
---|---|
ExecutorService | 真正的線程池接口 |
ScheduledExecutorService | 能和Timer/TimerTask類似,解決那些需要任務重復執行的問題 |
ThreadPoolExecutor | ExecutorService的默認實現 |
ScheduledThreadPoolExecutor | 繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,周期性任務調度的類實現 |
要配置一個線程池是比較復雜的,尤其是對於線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的,因此在Executors類里面提供了一些靜態工廠,生成一些常用的線程池。
Java通過Executors工廠類提供四種線程池,分別為:
- newCachedThreadPool :創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,否則新建線程。(線程最大並發數不可控制)
- newFixedThreadPool:創建一個固定大小的線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
- newScheduledThreadPool : 創建一個定時線程池,支持定時及周期性任務執行。
- newSingleThreadExecutor :創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
我們先創建一個統一的線程任務,方便測試四種線程池
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is running...");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
newSingleThreadExecutor
public class SingleThreadExecutorTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
executorService.execute(myRunnable);
}
System.out.println("線程任務開始執行");
executorService.shutdown();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
輸出結果
線程任務開始執行
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-1 is running...
- 1
- 2
- 3
- 4
- 5
- 6
底層實現
/** * 核心線程池大小=1 * 最大線程池大小為1 * 線程過期時間為0ms * LinkedBlockingQueue作為工作隊列 */
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
從參數可以看出來,SingleThreadExecutor 相當於特殊的 FixedThreadPool,它的執行流程如下:
- 線程池中沒有線程時,新建一個線程執行任務
- 有一個線程以后,將任務加入阻塞隊列,不停的加
- 唯一的這一個線程不停地去隊列里取任務執行
SingleThreadExecutor 用於串行執行任務的場景,每個任務必須按順序執行,不需要並發執行。
newFixedThreadPool
public class FixedThreadPoolTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
executorService.execute(myRunnable);
}
System.out.println("線程任務開始執行");
executorService.shutdown();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
輸出結果
線程任務開始執行
pool-1-thread-1 is running...
pool-1-thread-1 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-2 is running...
- 1
- 2
- 3
- 4
- 5
- 6
底層實現
/** * 核心線程池大小=傳入參數 * 最大線程池大小為傳入參數 * 線程過期時間為0ms * LinkedBlockingQueue作為工作隊列 */
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
可以看到,FixedThreadPool 的核心線程數和最大線程數都是指定值,也就是說當線程池中的線程數超過核心線程數后,任務都會被放到阻塞隊列中。
此外 keepAliveTime 為 0,也就是多余的空余線程會被立即終止(由於這里沒有多余線程,這個參數也沒什么意義了)。
而這里選用的阻塞隊列是 LinkedBlockingQueue,使用的是默認容量 Integer.MAX_VALUE,相當於沒有上限。
因此這個線程池執行任務的流程如下:
- 線程數少於核心線程數,也就是設置的線程數時,新建線程執行任務
- 線程數等於核心線程數后,將任務加入阻塞隊列
- 由於隊列容量非常大,可以一直加
- 執行完任務的線程反復去隊列中取任務執行
FixedThreadPool 用於負載比較重的服務器,為了資源的合理利用,需要限制當前線程數量。
newCachedThreadPool
public class CachedThreadPoolTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
executorService.execute(myRunnable);
}
System.out.println("線程任務開始執行");
executorService.shutdown();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
輸出結果
線程任務開始執行
pool-1-thread-1 is running...
pool-1-thread-4 is running...
pool-1-thread-2 is running...
pool-1-thread-5 is running...
pool-1-thread-3 is running...
- 1
- 2
- 3
- 4
- 5
- 6
底層實現
/** * 核心線程池大小=0 * 最大線程池大小為Integer.MAX_VALUE * 線程過期時間為60s * 使用SynchronousQueue作為工作隊列 */
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
可以看到,CachedThreadPool 沒有核心線程,非核心線程數無上限,也就是全部使用外包,但是每個外包空閑的時間只有 60 秒,超過后就會被回收。
CachedThreadPool 使用的隊列是 SynchronousQueue,這個隊列的作用就是傳遞任務,並不會保存。
因此當提交任務的速度大於處理任務的速度時,每次提交一個任務,就會創建一個線程。極端情況下會創建過多的線程,耗盡 CPU 和內存資源。
它的執行流程如下:
- 沒有核心線程,直接向 SynchronousQueue 中提交任務
- 如果有空閑線程,就去取出任務執行;如果沒有空閑線程,就新建一個
- 執行完任務的線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就拜拜
- 由於空閑 60 秒的線程會被終止,長時間保持空閑的 CachedThreadPool 不會占用任何資源。
CachedThreadPool 用於並發執行大量短期的小任務,或者是負載較輕的服務器。
newScheduledThreadPool
public class ScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
MyRunnable myRunnable = new MyRunnable();
for (int i = 0; i < 5; i++) {
// 參數1:目標對象,參數2:隔多長時間開始執行線程,參數3:執行周期,參數4:時間單位
scheduledExecutorService.scheduleAtFixedRate(myRunnable, 1, 2, TimeUnit.SECONDS);
}
System.out.println("線程任務開始執行");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
輸出結果
線程任務開始執行
// 打印【線程任務開始執行】后1秒輸出
pool-1-thread-1 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-3 is running...
pool-1-thread-2 is running...
// 2秒后輸出
pool-1-thread-1 is running...
pool-1-thread-3 is running...
pool-1-thread-2 is running...
pool-1-thread-1 is running...
pool-1-thread-3 is running...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
底層實現
/** * 核心線程池大小=傳入參數 * 最大線程池大小為Integer.MAX_VALUE * 線程過期時間為0ms * DelayedWorkQueue作為工作隊列 */
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
ScheduledThreadPoolExecutor 的執行流程如下:
- 添加一個任務
- 線程池中的線程從 DelayQueue 中取任務
- 然后執行任務
具體執行任務的步驟也比較復雜:
-
線程從 DelayQueue 中獲取 time 大於等於當前時間的 ScheduledFutureTask
-
執行完后修改這個 task 的 time 為下次被執行的時間
-
然后再把這個 task 放回隊列中
ScheduledThreadPoolExecutor 用於需要多個后台線程執行周期任務,同時需要限制線程數量的場景。
Executors和ThreaPoolExecutor創建線程池的區別
Executors 各個方法的弊端:
- newFixedThreadPool 和 newSingleThreadExecutor:
主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至 OOM。 - newCachedThreadPool 和 newScheduledThreadPool:
主要問題是線程數最大數是 Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至 OOM。
ThreaPoolExecutor
- 創建線程池方式只有一種,就是走它的構造函數,參數自己指定
兩種提交任務的方法
ExecutorService 提供了兩種提交任務的方法:
- execute():提交不需要返回值的任務
- submit():提交需要返回值的任務
execute
void execute(Runnable command);
- 1
execute() 的參數是一個 Runnable,也沒有返回值。因此提交后無法判斷該任務是否被線程池執行成功。
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
@Override
public void run() {
//do something
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
submit
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
- 1
- 2
- 3
submit() 有三種重載,參數可以是 Callable 也可以是 Runnable。
同時它會返回一個 Funture 對象,通過它我們可以判斷任務是否執行成功。
獲得執行結果調用 Future.get() 方法,這個方法會阻塞當前線程直到任務完成。
提交一個 Callable 任務時,需要使用 FutureTask 包一層:
FutureTask futureTask = new FutureTask(new Callable<String>() { //創建 Callable 任務
@Override
public String call() throws Exception {
String result = "";
//do something
return result;
}
});
Future<?> submit = executor.submit(futureTask); //提交到線程池
try {
Object result = submit.get(); //獲取結果
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16