目錄
相信像我一樣的很多同學,沒事刷刷面經,就會發現多線程在面試中出現很頻繁,對於Java選手來說,線程池的知識肯定必不可少,今天我們就來詳細了解Java線程池的七大參數,積累面試經驗。
JDK1.8線程池參數源代碼:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue
RejectedExecutionHandler handler)
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue
Executors.defaultThreadFactory(), handler);
}
一、corePoolSize
指的是核心線程大小,線程池中維護一個最小的線程數量,即使這些線程處於空閑狀態,也一直存在池中,除非設置了核心線程超時時間。
這也是源碼中的注釋說明。
/** @param corePoolSize the number of threads to keep in the pool, * even if they are idle, unless {@code allowCoreThreadTimeOut} is set. */
二、maximunPoolSize
指的是線程池中允許的最大線程數量。當線程池中核心線程都處理執行狀態,有新請求的任務:
1、工作隊列未滿:新請求的任務加入工作隊列
2、工作隊列已滿:線程池會創建新線程,來執行這個任務。當然,創建新線程不是無限制的,因為會受到maximumPoolSize最大線程數量的限制。
三、keepAliveTime
指的是空閑線程存活時間。具體說,當線程數大於核心線程數時,空閑線程在等待新任務到達的最大時間,如果超過這個時間還沒有任務請求,該空閑線程就會被銷毀。
可見官方注釋:
/** @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. */
四、unit
是指空閑線程存活時間的單位。keepAliveTime的計量單位。枚舉類型TimeUnit類。
五、workQueue
1、ArrayBlockingQueue
基於數組的有界阻塞隊列,特點FIFO(先進先出)。
當線程池中已經存在最大數量的線程時候,再請求新的任務,這時就會將任務加入工作隊列的隊尾,一旦有空閑線程,就會取出隊頭執行任務。因為是基於數組的有界阻塞隊列,所以可以避免系統資源的耗盡。
那么如果出現有界隊列已滿,最大數量的所有線程都處於執行狀態,這時又有新的任務請求,怎么辦呢?
這時候會采用Handler拒絕策略,對請求的任務進行處理。后面會詳細介紹。
2、LinkedBlockingQueue
基於鏈表的無界阻塞隊列,默認最大容量Integer.MAX_VALUE( ),可認為是無限隊列,特點FIFO。
關於maximumPoolSize參數在工作隊列為LinkedBlockingQueue時候,是否起作用這個問題,我們需要視情況而定!
情況①:如果指定了工作隊列大小,比如core=2,max=3,workQueue=2,任務數task=5,這種情況的最大線程數量的限制是有效的。
情況②:如果工作隊列大小默認
,這時maximumPoolSize不起作用,因為新請求的任務一直可以加到隊列中。
3、PriorityBlockingQueue
優先級無界阻塞隊列,前面兩種工作隊列特點都是FIFO,而優先級阻塞隊列可以通過參數Comparator實現對任務進行排序,不按照FIFO執行。
4、SynchronousQueue
不緩存任務的阻塞隊列,它實際上不是真正的隊列,因為它沒有提供存儲任務的空間。生產者一個任務請求到來,會直接執行,也就是說這種隊列在消費者充足的情況下更加適合。因為這種隊列沒有存儲能力,所以只有當另一個線程(消費者)准備好工作,put(入隊)和take(出隊)方法才不會是阻塞狀態。
以上四種工作隊列,跟線程池結合就是一種生產者-消費者 設計模式。生產者把新任務加入工作隊列,消費者從隊列取出任務消費,BlockingQueue可以使用任意數量的生產者和消費者,這樣實現了解耦,簡化了設計。
六、threadFactory
線程工廠,創建一個新線程時使用的工廠,可以用來設定線程名、是否為daemon線程等。
守護線程(Daemon Thread) 在Java中有兩類線程:用戶線程 (User Thread)、守護線程 (Daemon
Thread)。所謂守護
線程,是指在程序運行的時候在后台提供一種通用服務的線程,比如垃圾回收線程就是一個很稱職的守護者,並且這種線程並不屬於程序中不可或缺的部分。因此,當所有的非守護線程結束時,程序也就終止了,同時會殺死進程中的所有守護線程。反過來說,只要任何非守護線程還在運行,程序就不會終止。用戶線程和守護線程兩者幾乎沒有區別,唯一的不同之處就在於虛擬機的離開:如果用戶線程已經全部退出運行了,只剩下守護線程存在了,虛擬機也就退出了。
因為沒有了被守護者,守護線程也就沒有工作可做了,也就沒有繼續運行程序的必要了。將線程轉換為守護線程可以通過調用Thread對象的setDaemon(true)方法來實現。在使用守護線程時需要注意一下幾點:
(1)
thread.setDaemon(true)必須在thread.start()之前設置,否則會跑出一個IllegalThreadStateException異常。你不能把正在運行的常規線程設置為守護線程。(2) 在Daemon線程中產生的新線程也是Daemon的。
(3) 守護線程應該永遠不去訪問固有資源,如文件、數據庫,因為它會在任何時候甚至在一個操作的中間發生中斷。
官方使用默認的線程工廠源碼如下:
/** * The default thread factory */
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t
}
}
七、handler
Java 並發超出線程數和工作隊列時候的任務請求處理策略,使用了策略設計模式。
策略1:ThreadPoolExecutor.AbortPolicy(默認)
在默認的處理策略。該處理在拒絕時拋出RejectedExecutionException,拒絕執行。
public static class AbortPolicy implements RejectedExecutionHandler {
/** * Creates an {@code AbortPolicy}. */
public AbortPolicy() { }
/** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
策略2:ThreadPoolExecutor.CallerRunsPolicy
調用 execute 方法的線程本身運行任務。這提供了一個簡單的反饋控制機制,可以降低新任務提交的速度。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/** * Creates a {@code CallerRunsPolicy}. */
public CallerRunsPolicy() { }
/** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
if (!e.isShutdown()) {
r.run();
}
}
}
策略3:ThreadPoolExecutor.DiscardOldestPolicy
如果執行程序未關閉,則刪除工作隊列頭部的任務,然后重試執行(可能再次失敗,導致重復執行)。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/** * Creates a {@code DiscardOldestPolicy} for the given executor. */
public DiscardOldestPolicy() { }
/** * Obtains and ignores the next task that the executor * * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
策略4:ThreadPoolExecutor.DiscardPolicy
無法執行的任務被簡單地刪除,將會丟棄當前任務,通過源碼可以看出,該策略不會執行任務操作。
public static class DiscardPolicy implements RejectedExecutionHandler {
/** * Creates a {@code DiscardPolicy}. */
public DiscardPolicy() { }
/** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
} }
八 ThreadPoolExecutor線程池參數設置技巧
一、ThreadPoolExecutor的重要參數
- corePoolSize:核心線程數
-
- 核心線程會一直存活,及時沒有任務需要執行
- 當線程數小於核心線程數時,即使有線程空閑,線程池也會優先創建新線程處理
- 設置allowCoreThreadTimeout=true(默認false)時,核心線程會超時關閉
- queueCapacity:任務隊列容量(阻塞隊列)
-
- 當核心線程數達到最大時,新任務會放在隊列中排隊等待執行
- maxPoolSize:最大線程數
-
- 當線程數>=corePoolSize,且任務隊列已滿時。線程池會創建新線程來處理任務
- 當線程數=maxPoolSize,且任務隊列已滿時,線程池會拒絕處理任務而拋出異常
- keepAliveTime:線程空閑時間
-
- 當線程空閑時間達到keepAliveTime時,線程會退出,直到線程數量=corePoolSize
- 如果allowCoreThreadTimeout=true,則會直到線程數量=0
- allowCoreThreadTimeout:允許核心線程超時
- rejectedExecutionHandler:任務拒絕處理器
-
- 兩種情況會拒絕處理任務:
-
- 當線程數已經達到maxPoolSize,切隊列已滿,會拒絕新任務
- 當線程池被調用shutdown()后,會等待線程池里的任務執行完畢,再shutdown。如果在調用shutdown()和線程池真正shutdown之間提交任務,會拒絕新任務
- 線程池會調用rejectedExecutionHandler來處理這個任務。如果沒有設置默認是AbortPolicy,會拋出異常
- ThreadPoolExecutor類有幾個內部實現類來處理這類情況:
-
- AbortPolicy 丟棄任務,拋運行時異常
- CallerRunsPolicy 執行任務
- DiscardPolicy 忽視,什么都不會發生
- DiscardOldestPolicy 從隊列中踢出最先進入隊列(最后一個執行)的任務
- 實現RejectedExecutionHandler接口,可自定義處理器
二、ThreadPoolExecutor執行順序:
線程池按以下行為執行任務
- 當線程數小於核心線程數時,創建線程。
- 當線程數大於等於核心線程數,且任務隊列未滿時,將任務放入任務隊列。
- 當線程數大於等於核心線程數,且任務隊列已滿
- 若線程數小於最大線程數,創建線程
- 若線程數等於最大線程數,拋出異常,拒絕任務
三、如何設置參數
- 默認值
-
- corePoolSize=1
- queueCapacity=Integer.MAX_VALUE
- maxPoolSize=Integer.MAX_VALUE
- keepAliveTime=60s
- allowCoreThreadTimeout=false
- rejectedExecutionHandler=AbortPolicy()
- 如何來設置
-
- 需要根據幾個值來決定
-
- tasks :每秒的任務數,假設為500~1000
- taskcost:每個任務花費時間,假設為0.1s
- responsetime:系統允許容忍的最大響應時間,假設為1s
- 做幾個計算
-
- corePoolSize = 每秒需要多少個線程處理?
-
- threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個線程。corePoolSize設置應該大於50
- 根據8020原則,如果80%的每秒任務數小於800,那么corePoolSize設置為80即可
- queueCapacity = (coreSizePool/taskcost)*responsetime
-
- 計算可得 queueCapacity = 80/0.1*1 = 80。意思是隊列里的線程可以等待1s,超過了的需要新開線程來執行
- 切記不能設置為Integer.MAX_VALUE,這樣隊列會很大,線程數只會保持在corePoolSize大小,當任務陡增時,不能新開線程來執行,響應時間會隨之陡增。
- maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
-
- 計算可得 maxPoolSize = (1000-80)/10 = 92
- (最大任務數-隊列容量)/每個線程每秒處理能力 = 最大線程數
- rejectedExecutionHandler:根據具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩沖機制來處理
- keepAliveTime和allowCoreThreadTimeout采用默認通常能滿足
- 以上都是理想值,實際情況下要根據機器性能來決定。如果在未達到最大線程數的情況機器cpu load已經滿了,則需要通過升級硬件(呵呵)和優化代碼,降低taskcost來處理。
九 真實環境實踐
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/** * 線程池配置 * * @author oldlu **/
@Configuration
public class ThreadPoolConfig {
// 核心線程池大小
@Value("${threadPoolConfig.corePoolSize}")
private int corePoolSize;
// 最大可創建的線程數
@Value("${threadPoolConfig.maxPoolSize}")
private int maxPoolSize;
// 隊列最大長度
@Value("${threadPoolConfig.queueCapacity}")
private int queueCapacity;
// 線程池維護線程所允許的空閑時間
@Value("${threadPoolConfig.keepAliveSeconds}")
private int keepAliveSeconds;
// 線程池對拒絕任務(無線程可用)的處理策略
@Value("${threadPoolConfig.rejectedExecutionHandler}")
private String rejectedExecutionHandler;
@Bean(name = "threadPoolTaskExecutor")
@ConditionalOnProperty(prefix = "threadPoolTaskExecutor", name = "enabled", havingValue = "true")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(maxPoolSize);
executor.setCorePoolSize(corePoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
RejectedExecutionHandler handler;
if (rejectedExecutionHandler.equals("CallerRunsPolicy")) {
handler = new ThreadPoolExecutor.CallerRunsPolicy();
} else if (rejectedExecutionHandler.equals("DiscardOldestPolicy")) {
handler = new ThreadPoolExecutor.DiscardOldestPolicy();
} else if (rejectedExecutionHandler.equals("DiscardPolicy")) {
handler = new ThreadPoolExecutor.DiscardPolicy();
} else {
handler = new ThreadPoolExecutor.AbortPolicy();
}
executor.setRejectedExecutionHandler(handler);
return executor;
}
/** * 執行周期性或定時任務 */
@Bean(name = "scheduledExecutorService")
protected ScheduledExecutorService scheduledExecutorService() {
return new ScheduledThreadPoolExecutor(corePoolSize,
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Threads.printException(r, t);
}
};
}
}
十 個人總結
Java-如何合理的設置線程池大小
想要合理配置線程池線程數的大小,需要分析任務的類型,任務類型不同,線程池大小配置也不同。
配置線程池的大小可根據以下幾個維度進行分析來配置合理的線程數:
任務性質可分為:CPU密集型任務,IO密集型任務,混合型任務。
任務的執行時長。
任務是否有依賴——依賴其他系統資源,如數據庫連接等。
CPU密集型任務
盡量使用較小的線程池,一般為CPU核心數+1。
因為CPU密集型任務使得CPU使用率很高,若開過多的線程數,只能增加上下文切換的次數,因此會帶來額外的開銷。
IO密集型任務
可以使用稍大的線程池,一般為2*CPU核心數+1。
因為IO操作不占用CPU,不要讓CPU閑下來,應加大線程數量,因此可以讓CPU在等待IO的時候去處理別的任務,充分利用CPU時間。
混合型任務
可以將任務分成IO密集型和CPU密集型任務,然后分別用不同的線程池去處理。
只要分完之后兩個任務的執行時間相差不大,那么就會比串行執行來的高效。
因為如果划分之后兩個任務執行時間相差甚遠,那么先執行完的任務就要等后執行完的任務,最終的時間仍然取決於后執行完的任務,而且還要加上任務拆分與合並的開銷,得不償失
依賴其他資源
如某個任務依賴數據庫的連接返回的結果,這時候等待的時間越長,則CPU空閑的時間越長,那么線程數量應設置得越大,才能更好的利用CPU。
借鑒別人的文章 對線程池大小的估算公式:
最佳線程數目 = ((線程等待時間+線程CPU時間)/線程CPU時間 )* CPU數目
比如平均每個線程CPU運行時間為0.5s,而線程等待時間(非CPU運行時間,比如IO)為1.5s,CPU核心數為8,那么根據上面這個公式估算得到:((0.5+1.5)/0.5)*8=32。
可以得出一個結論:
線程等待時間所占比例越高,需要越多線程。線程CPU時間所占比例越高,需要越少線程。