ThreadPool
線程池的優勢
線程池做的工作主要是控制運行的線程數量,處理過程中將任務放入隊列,然后在線程創建后啟動這些任務,如果線程數量超過了最大數量,超出的線程排隊等候,等待其他線程執行完畢,再從隊列中取出任務來執行
線程池的特點
線程復用、控制最大並發數、管理線程
-
降低資源消耗。重復利用已創建的線程,降低創建和銷毀線程的開銷
-
提高響應速度。當任務到達時,任務可以不需要等待線程創建就能立刻執行
-
提高線程的可管理性。使用線程池可以對線程進行統一的分配、調優和監控
1 線程池的方法
- 執行長期任務性能好,創建一個線程池,一池有N個固定的線程,可以控制線程最大並發數,有固定線程數的線程池
ExecutorService threadPool = Executors.newFixedThreadPool(N);
- 單個任務執行,它只會使用單個工作線程,一池一線程
ExecutorService threadPool = Executors.newSingleThreadExecutor();
- 執行短期異步任務,可緩存線程池,線程池根據需要創建新線程,但在先前構造的線程可以復用,也可靈活回收空閑的線程,可擴容的池
ExecutorService threadPool = Executors.newCachedThreadPool();
- 周期性線程池;支持定時及周期性任務執行
ExecutorService threadPool = Executors.newScheduledThreadPool();
(1) newFixedThreadPool
可以控制線程最大並發數的線程池:
public class FixedThreadPool {
private static AtomicInteger num = new AtomicInteger(0);
private static ExecutorService executorService = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
countSum c= new countSum();
//將coutSum作為Task,submit至線程池
for (int i = 0; i < 2; i++) {
executorService.submit(c);
}
//Task執行完成后關閉
executorService.shutdown();
}
static class countSum implements Runnable{
@Override
public void run() {
for (int i = 0; i < 500; i++) {
try{
System.out.println("Thread - "+Thread.currentThread().getName()+" count= "+ num.getAndIncrement());
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
結果:
(2) newSingleThreadExecutor
只會使用唯一的工作線程執行任務的線程池:
public class SingleThreadExecutor {
private static AtomicInteger num = new AtomicInteger(0);
private static ExecutorService executorService = Executors.newSingleThreadExecutor();
public static void main(String[] args) {
//將coutSum作為Task,submit至線程池
for (int i = 0; i < 2; i++) {
executorService.submit(new countSum());
}
//Task執行完成后關閉
executorService.shutdown();
}
static class countSum implements Runnable{
@Override
public void run() {
for (int i = 0; i < 500; i++) {
try{
System.out.println("Thread - "+Thread.currentThread().getName()+" count= "+ num.getAndIncrement());
Thread.sleep(100);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
結果:
(3) newScheduledThreadPool
傳參值為corePoolSize大小,支持定時及周期性任務執行
延期執行示例:調用schedule方法,三個參數:Task,Delay,TimeUnit
public class ScheduledThreadPool {
// corePoolSize = 2
private static ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
public static void main(String[] args) {
System.out.println("Thread - "+Thread.currentThread().getName()+" BEGIN "+ new Date());
service.schedule(new print(),5, TimeUnit.SECONDS);
service.shutdown();
}
static class print implements Runnable{
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try{
System.out.println("Thread - "+Thread.currentThread().getName()+" Delay 5 second and sleep 2 second "+ new Date());
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
結果:
定時執行示例:調用scheduleAtFixedRate方法,四個參數:Task,initialDelay,Period,TimeUnit
public class ScheduledThreadPool {
// corePoolSize = 1
private static ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
public static void main(String[] args) {
System.out.println("Thread - "+Thread.currentThread().getName()+" BEGIN "+ new Date());
service.scheduleAtFixedRate(new print(),5,3,TimeUnit.SECONDS);
}
static class print implements Runnable{
@Override
public void run() {
System.out.println("Thread - "+Thread.currentThread().getName()+" Delay 5 second and period 3 second "+ new Date());
}
}
}
結果:
(4) newCachedThreadPool
可緩存線程池,如果線程池長度超過處理需要,回收空閑線程,若無可回收,則新建線程。即若前一個任務已完成,則會接着復用該線程:
public class CachedThreadPool {
private static AtomicInteger num = new AtomicInteger(0);
private static ExecutorService service = Executors.newCachedThreadPool();
public static void main(String[] args) {
countSum c = new countSum();
for (int i = 0; i < 3; i++) {
try {
service.submit(c);
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
}
service.shutdown();
}
static class countSum implements Runnable{
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
System.out.println("Thread - "+Thread.currentThread().getName()+" countSum= "+num.getAndIncrement());
}
}
}
}
結果:Thread.sleep(1000)即sleep一秒,上個任務完成可繼續復用該線程,不需要創建新的線程
若將Tread.sleep(1000)注釋掉,你會發現有3個線程在跑
若感興趣可以去了解一下它們的底層源碼,對於CachedThreadPool而言,可新建線程最大數量為INTEGER.MAXIMUM
2 線程池底層原理
以newFixedThreadPool為例
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
線程池七大參數
-
corePoolSize:線程池中的常駐核心線程數
-
maximumPoolSize:線程池中能夠容納同時執行的最大線程數,必須大於1
-
keepAliveTime:多余的空閑線程的存活時間;當前池中線程數量超過corePoolSize時,當空閑時間達到keepAliveTime時,多余線程會被銷毀
-
unit:keepAliveTime的單位
-
workQueue:任務隊列,被提交但尚未執行的任務
-
threadFactory:表示生成線程池中工作線程的線程工廠,用於創建線程,一般默認
-
handler:拒絕策略,表示當隊列滿了,並且工作線程大於等於線程池的最大線程數時如何來拒絕請求執行的runnable的策略
線程池四大流程
1)創建線程池后,開始等待請求
2)當調用execute()方法添加一個請求任務時,線程池會做以下判斷:
-
如果正在運行的線程數量小於corePoolSize,馬上創建線程執行任務
-
如果正在運行的線程數量大於等於corePoolSize,將該任務放入等待隊列
-
如果等待隊列已滿,但正在運行線程數量小於max,創建非核心線程執行任務
-
如果隊列滿了且正在運行的線程數量大於max,線程池會啟動飽和拒絕策略
3)當一個線程完成任務時,會從等待隊列中取下一個任務來執行
4)當空閑線程超過keepAliveTime定義時間,會判斷:
-
如果當前運行線程大於corePoolSize,該線程銷毀
-
所有線程執行完任務后,線程個數恢復到corePoolSize大小
3 線程池策略及分析
Note:阿里巴巴JAVA開發手冊:線程池不允許使用Executors去創建線程池,而是通過使用ThreadPoolExecutor的方式自定義線程池,規避資源耗盡的風險
Executors返回的線程池對象的弊端:
1)FixedThreadPool和SingleThreadPool:
允許請求隊列長度為Integer.MAX_VALUE,可能會堆積大量請求導致OOM
2)CachedThreadPool和ScheduledThreadPool:
允許創建線程數量為Integer.MAX_VALUE,可能會創建大量的線程導致OOM
拒絕策略
1)AbortPolicy
直接拋出RejectedExecutionException異常阻止系統正常運行
2)CallerRunsPolicy
"調用者運行"的調節機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而降低新任務的流量
3)DiscardPolicy
該策略拋棄無法處理的任務,不予任何處理也不拋出異常。如果允許任務丟失,這是最好的一種策略
4)DiscardOldestPolicy
拋棄隊列中等待最久的任務,然后把當前任務加入隊列中嘗試再次提交當前任務
如何設置maximumPoolSize大小
Runtime.getRuntime().availableProcessors()
方法獲取核數
CPU密集型
maximumPoolSize設為核數+1
IO密集型
maximumPoolSize設為核數/阻塞系數