池化技術——自定義線程池
1、為什么要使用線程池?
池化技術
1.1、池化技術的特點:
-
程序的運行,本質:占用系統的資源! 優化資源的使用!=>池化技術
-
線程池、連接池、內存池、對象池///..... 創建、銷毀。十分浪費資源
-
池化技術:事先准備好一些資源,有人要用,就來我這里拿,用完之后還給我。
1.2、線程池的好處:
- 降低資源的消耗
- 降低資源的消耗
- 方便管理。
核心:線程復用、可以控制最大並發數、管理線程
1.3、如何自定義一個線程池
牢記:三大方法、7大參數、4種拒絕策略
2、三大方法
三大方法
在java的JDK中提夠了Executors開啟JDK默認線程池的類,其中有三個方法可以用來開啟線程池。
2.1、單個線程的線程池方法
ExecutorService threadPool = Executors.newSingleThreadExecutor(); //單個線程的線程池
該方法開啟的線程池,故名思義該池中只有一個線程。
2.2、固定的線程池的大小的方法
ExecutorService threadPool = Executors.newFixedThreadPool(5); //固定的線程池的大小
其中方法中傳遞的int類型的參數,就是池中的線程數量
2.3、可伸縮的線程池的方法
ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸縮
該方法創建的線程池是不固定大小的,可以根據需求動態的在池子里創建線程,遇強則強。
2.4、完整的測試代碼為:
package com.xgp.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 工具類,三大方法
*/
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor(); //單個線程的線程池
// ExecutorService threadPool = Executors.newFixedThreadPool(5); //固定的線程池的大小
// ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸縮
try{
for(int i = 0;i < 10;i++) {
//使用線程池去創建
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " OK");
});
}
}catch (Exception e) {
e.printStackTrace();
}finally {
//關閉線程池
threadPool.shutdown();
}
}
}
將三行注釋部分依次打開的運行結果為:
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
上述運行結果為單個線程的線程池的結果:可以看出的確只有一條線程在執行。
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
上述運行結果為固定線程的線程池的結果:因為固定的大小為5,可以看出的確有5條線程在執行。
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
pool-1-thread-7 OK
pool-1-thread-9 OK
pool-1-thread-10 OK
pool-1-thread-8 OK
pool-1-thread-6 OK
上述運行結果為彈性線程的線程池的結果:可以看出的確有10條線程在執行。
3、為什么要自定義線程池?三大方法創建線程池的弊端分析
-
在單個線程池和固定大小的線程池中,因為處理的線程有限,當大量的請求進來時,都會在阻塞隊列中等候,而允許請求的隊列長度為Integet.MAX_VALUE,整數的最大值約為21億,會導致JVM內存溢出。
-
在彈性伸縮的線程池中,允許創建的線程數量為Integet.MAX_VALUE,可能會創建大量的線程,使得Jvm內存溢出。
對於上述的兩點,其數值會在后面分析源碼的環節看到,關於這一點,在阿里巴巴開發手冊中有着詳細的說明,並極力推薦采用自定義線程池,而不使用這三大方法。
4、七大參數
七大參數
源碼分析:我們要指定義自己的線程池,先從源碼的角度看一看JDK現有的三個線程池是如何編寫的。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
從三大方法的源代碼中可以看出,三種線程池都是new 了一個 ThreadPoolExecutor 對象,點擊源碼中看看。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
這里調用了一個 this() 方法,在點擊進去一看。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
到這里就可以很明顯的看出本節想要講述的7大參數,是哪7大了吧。根據英文意思,可以很容易的說明這七大參數的意思了。
public ThreadPoolExecutor(int corePoolSize, //核心線程數
int maximumPoolSize, //最大線程數
long keepAliveTime, //超時等待
TimeUnit unit, //超時等待的單位
BlockingQueue<Runnable> workQueue, //阻塞隊列
ThreadFactory threadFactory, //線程池工廠
RejectedExecutionHandler handler) { //拒絕策略
在阿里巴巴開發手冊中,推薦的也是使用 ThreadPoolExecutor 來進行創建線程池的。
這里可以用一張在銀行辦理業務的圖來生動的說明這七大參數。
這里,解釋下這張圖對應的含義:
-
銀行在人很少的時候也只開放兩個窗口,並且什么時候都不會進行關閉。——核心線程數
-
當人數大於2又小於5人時,后來的三人就在候客區等候辦理業務。——阻塞隊列
-
當人數大於5人又小於8人時,另外三個正在關閉的窗口需要開放進行辦理業務,於是乎就有了5個窗口在進行辦理業務。——最大線程數
-
將開啟其他三個窗口,需要領導將這三個窗口的員工叫回。——線程池工廠
-
當人數實在太多時,銀行都擠不下了,此時就會把門關了,不接受新的服務了。——拒絕策略
-
當銀行人數又變少時,另外的三個非核心窗口太久沒又生意,為了節約成本,則又會進行關閉。——超時等待
通過對上述7大參數的分析,同學們也能夠更加理解JDK自帶的三大方法的弊端,以及為什么是整數的最大值的這個數值。
5、如何手動的去創建一個線程池
手動創建連接池
七大參數的說明也都講了,於是乎我們可以仿照這七大參數,來定義一個自己的線程池。對於其中的線程工廠,我們也一般采用默認的工廠,而其中的拒絕策略我們可以通過源碼分析,先使用三大方法使用的拒絕策略。
點擊進入 defaultHandler 的源碼中可以看到。
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
其中的 new AbortPolicy(); 就是三大方法使用的拒絕策略,我們先仿照銀行的例子,自定義一個線程池。代碼如下:
package com.xgp.pool;
import java.util.concurrent.*;
/**
* 自定義線程池
*/
public class Demo02 {
/* public ThreadPoolExecutor(int corePoolSize, //核心線程數
int maximumPoolSize, //最大線程數
long keepAliveTime, //超時等待
TimeUnit unit, //超時等待的單位
BlockingQueue<Runnable> workQueue, //阻塞隊列
ThreadFactory threadFactory, //線程池工廠
RejectedExecutionHandler handler) {*/ //拒絕策略
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() //會拋出異常的實現類
// new ThreadPoolExecutor.CallerRunsPolicy() //哪來的去哪里
// new ThreadPoolExecutor.DiscardOldestPolicy() //不會拋出異常,會丟掉任務
// new ThreadPoolExecutor.AbortPolicy() //嘗試會和第一個競爭
);
try{
for(int i = 0;i < 8;i++) {
//使用線程池去創建
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " OK");
});
}
}catch (Exception e) {
e.printStackTrace();
}finally {
//關閉線程池
pool.shutdown();
}
}
}
於是乎,我們完成了一個自定義的線程池,核心線程數為2,最大線程數為5,超時等待的秒數為3s,阻塞隊列的長度為3。
6、四種拒絕策略
四種拒絕策略
通過分析源碼,可以知道三大方法默認的拒絕策略在ThreadPoolExecutor這個類中,由於該類較為復雜,尋找起來不方便,於是我們可以采用IDEA的代碼提示功能,非常明顯的提示出了四種拒絕策略,也就是上面自定義線程池中的被注釋部分。
將上面自定義線程池的代碼注釋一一打開,我們來進行測試:
6.1、會拋出異常的拒絕策略
new ThreadPoolExecutor.AbortPolicy() //會拋出異常的實現類
該拒絕策略運行的結果為:
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-5 OK
java.util.concurrent.RejectedExecutionException: Task com.xgp.pool.Demo02$$Lambda$1/2093631819@378bf509 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 4, queued tasks = 0, completed tasks = 4]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.xgp.pool.Demo02.main(Demo02.java:40)
該策略就是當最大線程數+阻塞隊列數都不滿足請求數時,系統將拋出異常來進行解決。
6.2、哪來的去哪里拒絕策略
new ThreadPoolExecutor.CallerRunsPolicy() //哪來的去哪里
該拒絕策略運行的結果為:
pool-1-thread-1 OK
main OK
main OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
可以看出,該拒絕策略當線程池的線程數不能夠滿足需求時,會將不能服務的任務打道回府,即交給main線程來解決,該拒絕策略適用於原來的線程能夠解決問題的情況。
6.3、丟掉任務拒絕策略
new ThreadPoolExecutor.DiscardOldestPolicy() //不會拋出異常,會丟掉任務
該拒絕策略運行的結果為:
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
數一數,一共是10個任務,但根據執行的情況只處理了8個任務,該拒絕策略將不能夠分配線程執行的任務全部丟棄了,會造成數據的丟失。
6.4、嘗試競爭拒絕策略
new ThreadPoolExecutor.DiscardPolicy() //嘗試會和第一個競爭
該拒絕策略運行的結果為:
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
數一數,一共是10個任務,但根據執行的情況只處理了9個任務,其中競爭成功了一個,失敗了一個。該策略將會於最早進來的線程進行競爭,類似於操作系統中的搶占式短作業優先算法,該拒絕策略同樣會造成數據的丟失。
7、關於最大線程數應該如何確定
7.1、CPU密集型
CPU密集型
對於有多核Cpu的電腦,應該讓cpu充分忙碌起來,不要低於Cpu的核數,並且不應該在代碼中寫死,而是應該能夠自動的獲取當前機器的Cpu核數,獲取的代碼如下:
System.out.println(Runtime.getRuntime().availableProcessors());
7.2、IO密集型
IO密集型
對於系統中有大量IO任務時,應該要預留出足夠的線程來處理IO任務,因為IO任務極度耗時。如果判斷出系統中的IO密集的任務有10個,則定義的線程數量需要大於10。
7.3、公式總結
最大線程數 = 機器核素*2 + IO密集型任務數
對於上述該公式,只是網上的一種總結,作者也沒有進行深入的測試於了解,讀者應該根據自己的業務需要進行合理的調整。