項目中,有時會需要多線程來提高處理速度。
線程重用的核心是,它把Thread.start()給屏蔽起來了(一定不要重復調用),所以要重用Thread,就不能讓Thread執行完一個任務后終止,因此就必須阻塞Thread.run()方法,讓該方法不停地從任務隊列中獲取任務並執行。循環在跑的過程中不斷檢查我們是否有新加入的子Runnable對象,有就調一下我們的run(),其實就一個大run()把其它小run()#1,run()#2,…給串聯起來了,基本原理就這么簡單
spring為我們提供了TaskExecutor的抽象,
spring會默認提供一個taskExecutor的實現,但一般我們需要根據項目的需要來進行自定義。
在Spring發行包中預定義了一些TaskExecutor實現。有了它們,你甚至不需要再自行實現了。
SimpleAsyncTaskExecutor 類
這個實現不重用任何線程,或者說它每次調用都啟動一個新線程。但是,它還是支持對並發總數設限,當超過線程並發總數限制時,阻塞新的調用,直到有位置被釋放。如果你需要真正的池,請繼續往下看。
SyncTaskExecutor類
這個實現不會異步執行。相反,每次調用都在發起調用的線程中執行。它的主要用處是在不需要多線程的時候,比如簡單的test case。
ConcurrentTaskExecutor 類
這個實現是對Java 5 java.util.concurrent.Executor類的包裝。有另一個備選, ThreadPoolTaskExecutor類,它暴露了Executor的配置參數作為bean屬性。很少需要使用ConcurrentTaskExecutor, 但是如果ThreadPoolTaskExecutor不敷所需,ConcurrentTaskExecutor是另外一個備選。
SimpleThreadPoolTaskExecutor 類
這個實現實際上是Quartz的SimpleThreadPool類的子類,它會監聽Spring的生命周期回調。當你有線程池,需要在Quartz和非Quartz組件中共用時,這是它的典型用處。
ThreadPoolTaskExecutor 類
它不支持任何對java.util.concurrent包的替換或者下行移植。Doug Lea和Dawid Kurzyniec對java.util.concurrent的實現都采用了不同的包結構,導致它們無法正確運行。
這個實現只能在Java 5環境中使用,但是卻是這個環境中最常用的。它暴露的bean properties可以用來配置一個java.util.concurrent.ThreadPoolExecutor,把它包裝到一個TaskExecutor中。如果你需要更加先進的類,比如ScheduledThreadPoolExecutor,我們建議你使用ConcurrentTaskExecutor來替代。
TimerTaskExecutor類
這個實現使用一個TimerTask作為其背后的實現。它和SyncTaskExecutor的不同在於,方法調用是在一個獨立的線程中進行的,雖然在那個線程中是同步的。
WorkManagerTaskExecutor類
CommonJ 是BEA和IBM聯合開發的一套規范。這些規范並非Java EE的標准,但它是BEA和IBM的應用服務器實現的共同標准
這個實現使用了CommonJ WorkManager作為其底層實現,是在Spring context中配置CommonJ WorkManager應用的最重要的類。和SimpleThreadPoolTaskExecutor類似,這個類實現了WorkManager接口,因此可以直接作為WorkManager使用。
而其中ThreadPoolTaskExecutor是最常用的一種。
1.ThreadPoolTaskExecutor
1.1 先看一個常用的配置項
@Configuration
@EnableAsync//開啟異步任務的支持
public class TaskExecutorConfig {
@Bean("TaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//如果池中的實際線程數小於corePoolSize,無論是否其中有空閑的線程,都會給新的任務產生新的線程
taskExecutor.setCorePoolSize(5);
//連接池中保留的最大連接數。
taskExecutor.setMaxPoolSize(15);
//queueCapacity 線程池所使用的緩沖隊列
taskExecutor.setQueueCapacity(6000);
//強烈建議一定要給線程起一個有意義的名稱前綴,便於分析日志
taskExecutor.setThreadNamePrefix("demo Thread-");
taskExecutor.initialize();
return taskExecutor;
}
}
如果在方法上添加@Async,會自動被注入使用ThreadPoolTaskExecutor作為TaskExecutor(線程池),如果配置了多個ThreadPoolTaskExecutor,可以@Async(“ThreadPoolTaskExecutor1”)來指定。
1.2 重點概念解析
private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態鎖,對線程池狀態(比如線程池大小
//、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集
private volatile long keepAliveTime; //線程存貨時間
private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設置存活時間
private volatile int corePoolSize; //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int maximumPoolSize; //線程池最大能容忍的線程數
private volatile int poolSize; //線程池中當前的線程數
private volatile RejectedExecutionHandler handler; //任務拒絕策略
private volatile ThreadFactory threadFactory; //線程工廠,用來創建線程
private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數
private long completedTaskCount; //用來記錄已經執行完畢的任務個數
corePoolSize: 線程池維護線程的最少數量
keepAliveSeconds 線程池維護線程所允許的空閑時間
maxPoolSize 線程池維護線程的最大數量
queueCapacity 線程池所使用的緩沖隊列
當一個任務通過execute(Runnable)方法欲添加到線程池時:
l 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
l 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
l 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
l 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
l 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。 另外MaxPoolSize的設定如果比系統支持的線程數還要大時,會拋出java.lang.OutOfMemoryError: unable to create new native thread 異常
1.3 拒絕策略解析
當最大線程也滿后,會使用handler來處理被拒絕的任務,默認的四種處理策略為:
- ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 是一個RuntimeException,因此會中斷調用者的處理過程,為java線程池默認的阻塞策略,不執行此任務,而且直接拋出一個運行時異常,切記ThreadPoolExecutor.execute需要try catch,否則程序會直接退出。
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面(最舊)的任務,然后重新嘗試執行任務(重復此過程)。
- ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務 。
- 另外還可以定義拒絕策略,這里提供一種方式:
taskExecutor.setRejectedExecutionHandler((Runnable r, ThreadPoolExecutor executor) -> {
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
logger.error(e.toString(), e);
Thread.currentThread().interrupt();
}
}
}
);
這里的executor.getQueue()會得到BlockingQueue,
BlockingQueue的核心方法:
放入數據:
offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,
則返回true,否則返回false.(本方法不阻塞當前執行方法的線程)
offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往隊列中
加入BlockingQueue,則返回失敗。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷
直到BlockingQueue里面有空間再繼續.
獲取數據:
poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,
取不到時返回null;
poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,
隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗。
take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到
BlockingQueue有新的數據被加入;
drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數),
通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。
我們可以利用put方法來阻塞調用線程,來避免默認拒絕策略的丟棄任務或者拋出異常。
用ThreadPoolExecutor自定義線程池,看線程是的用途,如果任務量不大,可以用無界隊列,如果任務量非常大,要用有界隊列,防止OOM。
如果任務量很大,還要求每個任務都處理成功,要對提交的任務進行阻塞提交,重寫拒絕機制,改為阻塞提交。保證不拋棄一個任務。
最大線程數一般設為2N+1最好,N是CPU核數。
核心線程數,看應用,如果是任務,一天跑一次,設置為0,合適,因為跑完就停掉了,如果是常用線程池,看任務量,是保留一個核心還是幾個核心線程數
源碼分析參考:https://www.cnblogs.com/sessionbest/articles/8689220.html
1.4 提交任務
無返回值的任務使用execute(Runnable)
有返回值的任務使用submit(Runnable)