上一篇博文介紹了線程池的實現原理,現在介紹如何使用線程池。
目錄
一、創建線程池
二、向線程池提交任務
三、關閉線程池
四、合理配置線程池
五、線程池的監控
線程池創建規范
一、創建線程池
我們可以通過ThreadPoolExecutor來創建一個線程池。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
timeUnit, runnableTaskQueue, threadFactory, handler);
創建一個線程池時需要輸入以下幾個關鍵參數:
1. corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有基本線程。
2. maximumPoolSize(線程池最大數量):線程池允許創建的最大線程數。如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是,如果使用了無界的任務隊列這個參數就沒什么效果。
3. runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。可以選擇以下幾個阻塞隊列。
隊列 | 描述 |
ArrayBlockingQueue | 一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。 |
LinkedBlockingQueue | 一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。 |
SynchronousQueue | 一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。 |
PriorityBlockingQueue | 一個具有優先級的無限阻塞隊列。 |
4. threadFactory:用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給線程池里的線程設置有意義的名字,代碼如下。
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
如果不想使用ThreadFactoryBuilder,也可以自定義線程工廠類:
/** * 生成線程池所用的線程,只是改寫了線程池默認的線程工廠,傳入線程池名稱,便於問題追蹤 */ static class EventThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); /** * 線程所屬的線程組 */ private final ThreadGroup group; /** * 用AtomicInteger來為線程計數,每次加1 */ private final AtomicInteger threadNumber = new AtomicInteger(1); /** * 線程名稱 */ private final String namePrefix; /** * 初始化線程工廠 * * @param poolName 線程池名稱 */ EventThreadFactory(String poolName) { SecurityManager s = System.getSecurityManager(); group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); // 命名線程 namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); // 設置相同的線程優先級,避免線程池里的線程根據優先級爭搶資源,保證任務的正常執行 if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
5. RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。在JDK 1.5中Java線程池框架提供了以下4種策略。
策略 | 描述 |
AbortPolicy | 直接拋出異常。 |
CallerRunsPolicy | 只用調用者所在線程來運行任務。 |
DiscardOldestPolicy | 丟棄隊列里最近的一個任務,並執行當前任務。 |
DiscardPolicy | 不處理,丟棄掉。 |
當然,也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化存儲不能處理的任務。
6. keepAliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高線程的利用率。
7. timeUnit(線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鍾(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。
二、向線程池提交任務
可以使用兩個方法向線程池提交任務,分別為execute()和submit()方法。
execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。通過以下代碼可知execute()方法輸入的任務是一個Runnable類的實例。
threadsPool.execute(new Runnable() {
@Override
public void run() { // TODO Auto-generated method stub } });
submit()方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間后立即返回,這時候有可能任務沒有執行完。
Future<Object> future = executor.submit(harReturnValuetask);
try { Object s = future.get(); } catch (InterruptedException e) { // 處理中斷異常 } catch (ExecutionException e) { // 處理無法執行任務異常 } finally { // 關閉線程池 executor.shutdown(); }
三、關閉線程池
可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,shutdownNow首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程。
只要調用了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown方法來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow方法。
四、合理配置線程池
要想合理地配置線程池,就必須首先分析任務特性,可以從以下幾個角度來分析。
- 任務的性質:CPU密集型任務、IO密集型任務和混合型任務。
- 任務的優先級:高、中和低。
- 任務的執行時間:長、中和短。
- 任務的依賴性:是否依賴其他系統資源,如數據庫連接。
性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務應配置盡可能小的線程,如配置Ncpu+1個線程的線程池。由於IO密集型任務線程並不是一直在執行任務,則應配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐量將高於串行執行的吞吐量。如果這兩個任務執行時間相差太大,則沒必要進行分解。可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。
優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先執行。
注意 如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行。
執行時間不同的任務可以交給不同規模的線程池來處理,或者可以使用優先級隊列,讓執行時間短的任務先執行。
依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,等待的時間越長,則CPU空閑時間就越長,那么線程數應該設置得越大,這樣才能更好地利用CPU。
建議使用有界隊列。有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點兒,比如幾千。有一次,我們系統里后台任務線程池的隊列和線程池全滿了,不斷拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后台任務線程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻塞,任務積壓在線程池里。如果當時我們設置成無界隊列,那么線程池的隊列就會越來越多,有可能會撐滿內存,導致整個系統不可用,而不只是后台任務出現問題。當然,我們的系統所有的任務是用單獨的服務器部署的,我們使用不同規模的線程池完成不同類型的任務,但是出現這樣問題時也會影響到其他任務。
五、線程池的監控
如果在系統中大量使用線程池,則有必要對線程池進行監控,方便在出現問題時,可以根據線程池的使用狀況快速定位問題。可以通過線程池提供的參數進行監控,在監控線程池的時候可以使用以下屬性:
屬性 | 描述 |
getTaskCount() | 線程池需要執行的任務數量。 |
getCompletedTaskCount() | 線程池在運行過程中已完成的任務數量,小於或等於taskCount。 |
getLargestPoolSize() | 線程池里曾經創建過的最大線程數量。通過這個數據可以知道線程池是否曾經滿過。如該數值等於線程池的最大大小,則表示線程池曾經滿過。 |
getPoolSize() | 線程池的線程數量。如果線程池不銷毀的話,線程池里的線程不會自動銷毀,所以這個大小只增不減。 |
getActiveCount() | 獲取活動的線程數。 |
通過擴展線程池進行監控。可以通過繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行后和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。這幾個方法在線程池里是空方法。
protected void beforeExecute(Thread t, Runnable r) { }
線程池創建規范
良好的開端是成功的一半。以下內容摘自阿里巴巴的Java開發手冊,對我們排查線程池問題有一定幫助:
1.【強制】線程資源必須通過線程池提供,不允許在應用中自行顯式創建線程。
說明:使用線程池的好處是減少在創建和銷毀線程上所花的時間以及系統資源的開銷,解決資源不足的問題。如果不使用線程池,有可能造成系統創建大量同類線程而導致消耗完內存或者 “過度切換” 的問題。
2. 線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。 說明:Executors 各個方法的弊端:
-
newFixedThreadPool 和 newSingleThreadExecutor: 主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至 OOM。
-
newCachedThreadPool 和 newScheduledThreadPool: 主要問題是線程數最大數是 Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至 OOM。
3.【強制】創建線程或線程池時請指定有意義的線程名稱,方便出錯時回溯。
public class TimerTaskThread extends Thread { public TimerTaskThread(){ super.setName("TimerTaskThread"); ... } }