JUC的線程池架構
1.Executor
Executor是Java異步任務的執行者接口,目標是執行目標任務。Executor作為執行者角色,目的是提供一種將“任務提交者”與“任務執行者”分離的機制。它只有一個函數式方法:
public interface Executor {
void execute(Runnable command);
}
2.ExecutorService
ExecutorService繼承於Executor。它對外提供異步任務的接收服務。ExecutorService提供了“接受異步任務並轉交給執行者”的方法,比如submit、invoke方法等。具體如下:
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
3.AbstractExecutorService
AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。AbstractExecutorService存在的目的是為ExecutorService中的接口提供默認實現。(模板模式)
4.ThreadPoolExecutor
大名鼎鼎的線程池實現類,繼承於AbstractExecutorService。它是核心實現類,它可以預先提供指定數量的可重用線程,可以對線程進行管理和監控。
5.ScheduledExecutorService
她繼承於ExecutorService。是一個完成延時和周期性任務的接口。
6.Executors
是一個靜態工廠類,內置的靜態工廠方法可以理解為快捷創建線程池的方法。
Executors的4種快捷創建線程池的方法
newSingleThreadExecutor 創建只有一個線程的線程池
newFixedThreadPool 創建固定大小的線程池
newCachedThreadPool 創建一個不限制線程數量的線程池,任何提交的任務都立即執行,空閑線程會及時回收
newScheduledThreadPool 創建一個可定期或延時執行任務的線程池
- newSingleThreadExecutor
public static void main(String[] args) {
final AtomicInteger integer = new AtomicInteger(0);
ExecutorService pool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread() + " :doing" + "-" + integer.incrementAndGet());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
pool.shutdown();
}
Thread[pool-1-thread-1,5,main] :doing-1
Thread[pool-1-thread-1,5,main] :doing-2
Thread[pool-1-thread-1,5,main] :doing-3
Thread[pool-1-thread-1,5,main] :doing-4
Thread[pool-1-thread-1,5,main] :doing-5
場景:任務按照提交順序,一個任務一個任務逐個執行。
以上代碼最后調用shutdown來關閉線程池。執行shutdown方法后,線程池狀態變為shutdown,線程池將拒絕新任務,不能再往線程池中添加新任務。此時,線程池不會立刻退出,直到線程池中的任務處理完成后才會退出。還有一個shutdownNow方法,執行這個后,線程狀態變為stop,試圖停止所有正在執行的線程,並且不再處理阻塞隊列中等待的任務,會返回那些未執行的任務。
- newFixedThreadPool
ExecutorService pool = Executors.newFixedThreadPool(3);
Thread[pool-1-thread-1,5,main] :doing-1
Thread[pool-1-thread-3,5,main] :doing-2
Thread[pool-1-thread-2,5,main] :doing-3
Thread[pool-1-thread-3,5,main] :doing-4
Thread[pool-1-thread-1,5,main] :doing-5
適用場景:需要任務長期執行的場景。“固定數量的線程池”能穩定的保證一個數,避免頻繁 回收和創建線程,適用於CPU密集型的任務,在CPU被線程長期占用的情況下,能確保少分配線程。
弊端:內部使用無界隊列存放任務,當有大量任務,隊列無限增大,服務器資源迅速耗盡。
newFixedThreadPool工廠方法返回一個ThreadPoolExecutor實例,該線程池實例的corePoolSize數量為參數nThread,其maximumPoolSize數量也為參數nThread,其workQueue屬性的值為LinkedBlockingQueue
- newCachedThreadPool
線程池內的某些線程無事可干成為空閑線程,可以靈活回收這些空閑線程。
ExecutorService pool = Executors.newCachedThreadPool();
Thread[pool-1-thread-5,5,main] :doing-5
Thread[pool-1-thread-1,5,main] :doing-1
Thread[pool-1-thread-2,5,main] :doing-2
Thread[pool-1-thread-3,5,main] :doing-3
Thread[pool-1-thread-4,5,main] :doing-4
特點:在執行任務時,如果池內所有線程忙,則會添加新線程來處理。不會限制線程的大小,完全依賴於操作系統能夠創建的最大線程大小。如果存量線程超過了處理任務數量,就會回收線程。、
適用場景:快速處理突發性強、耗時短的任務場景,如Netty的NIO處理場景、REST API接口的瞬時削峰場景。
弊端:沒有最大線程數量限制,如果大量的異步任務提交,服務器資源可能耗盡。
- newScheduledThreadPool
public static void main(String[] args) {
final AtomicInteger integer = new AtomicInteger(0);
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 5; i++) {
pool.scheduleAtFixedRate(
() -> {
System.out.println(Thread.currentThread() + " :doing" + "-" + integer.incrementAndGet());
}, 0, 500, TimeUnit.MILLISECONDS);
// 0表示首次執行任務的執行時間,500表示每次執行任務的間隔時間
}
// pool.shutdown();
}
因為可以周期性執行任務,所以不shutdown。
適用場景:周期性執行任務的場景。
線程池的標准創建方式
使用ThreadPoolExecutor構造方法創建,一個比較重要呃構造器如下:
public ThreadPoolExecutor(int corePoolSize,核心線程數
int maximumPoolSize, 最大線程數
long keepAliveTime, TimeUnit unit, 空閑時間
BlockingQueue<Runnable> workQueue, 阻塞隊列
ThreadFactory threadFactory, 線程工廠(線程產生方式)
RejectedExecutionHandler handler 拒絕策略) {
...
}
1.核心和最大線程數量
接收新任務時,並且當前工作線程池數少於核心線程數量,即使有工作線程是空閑的,它也會創建新線程處理任務,直到達到核心線程數。
2.BlockingQueue
阻塞隊列用於暫時接收任務。
3.KeepAliveTime
設置線程最大空閑時長,如果超過這個時間,非核心線程會被回收。當然,也可以調用allowCoreThreadTimeOut方法將超時策略應用到核心線程。
線程池的任務調度流程
- 工作線程數量小於核心線程數量,執行新任務時會優先創建線程,而不是獲取空閑線程。
- 任務數量大於核心線程數量,新任務將被加入阻塞隊列中。執行任務時,也是先從阻塞隊列中獲取任務。
- 在核心線程用完,阻塞隊列已滿的情況下,會創建非核心線程處理新任務。
- 在如果線程池總數超過maximumPoolSize,線程池會拒絕接收任務,為新任務執行拒絕策略。
ThreadFactory(線程工廠)
創建線程方式
阻塞隊列
阻塞隊列與普通度列相比:阻塞隊列為空時,會阻塞當前線程的元素獲取操作。當隊列中有元素,被阻塞的線程會被自動喚醒。
BlockingQueue是JUC包的一個超級接口,比較常用的實現類有:
(1)ArrayBlockingQueue:數組隊列
(2)LinkedBlockingQueue:鏈表隊列
(3)PriorityBlockingQueue:優先級隊列
(4)DelayQueue:延遲隊列
(5)SynchronousQueue:同步隊列
調度器的鈎子方法
ThreadPoolExecutor為每個任務執行前后都提供了鈎子方法。
// 任務執行之前的鈎子方法(前鈎子)
protected void beforeExecute(Thread t, Runnable r) { }
// 之后(后鈎子)
protected void afterExecute(Runnable r, Throwable t) { }
// 終止(停止鈎子)
protected void terminated() { }
beforeExecute:可用於重新初始化ThreadLocal線程本地變量實例、更新日志記錄、計時統計等。
afterExecute:更新日志記錄、計時統計等。
terminated:Executor終止時調用。
演示一下前鈎子。
public class TestMain {
public static void main(String[] args) {
final ThreadPoolExecutor pool = new ThreadPoolExecutor(
2,
4,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2)) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("前鈎子嗷 ~ ~ ~ ");
}
};
for (int i = 0; i < 5; i++) {
pool.execute(() -> {
System.out.println("你誰啊");
});
}
}
}
線程池拒絕策略
任務被拒絕有兩種情況:
- 線程池已經關閉。
- 工作隊列已滿且最大線程數已滿。
拒絕策略有以下實現:
- AbortPolicy:拒絕策略。拋異常。
- DiscardPolicy:拋棄策略。丟棄新來的任務。
- DiscardOldestPolicy:拋棄最老任務策略。因為隊列是隊尾進對頭出,所以每次都是移除隊頭元素后再入隊。
- CallerRunsPolicy:調用者執行策略。提交任務線程自己執行任務,不使用線程池中的線程。
- 自定義策略。實現RejectExecutionHandler接口的rejectedExecution方法。
線程池中執行任務的Worker為什么要繼承AQS?而不是用ReentrantLock
因為R是可重入鎖,Woker通過AQS實現的是不可重入鎖。另外,Worker在執行任務時,會lock住執行邏輯,原因是怕其他線程執行shutdown中斷他的執行。
Re
《Java高並發編程》