前言
之前的例子中基本上都用到了線程池,一般我們都是把任務初始化好之后直接丟到線程池就可以了,使用起來非常簡單方便。
主體概要
- 線程池與new Thread對比
- 線程池的幾個類介紹
- 線程池的幾種狀態
- ThreadPoolExecutor的幾個方法
- 線程池的合理配置
主體內容
一、線程池與new Thread對比
new Thread弊端
1.每次new Thread都要新建一個對象,性能差。
2.線程缺少統一管理,可能無限制的新建線程,相互競爭,有可能占用過多系統資源導致死機或者OOM。
3.缺少更多功能,如更多執行、定期執行、線程中斷。
所以,我們一般不常用Thread,這里就不再細講了。
線程池好處
1.重用存在的線程,減少對象創建,消亡的開銷,性能佳。
2.可有效的控制最大並發線程數,提高系統資源利用率,同時可以避免過多資源競爭,避免阻塞。
3.提供定時執行、定期執行、單線程、並發數控制等功能。
二、線程池的幾個類介紹
1.ThreadPoolExecutor
我們來看看ThreadPoolExecutor可以接收的幾個參數來做初始化。
(1)corePoolSize:核心線程數量
(2)maximumPoolSize:線程最大線程數
(3)workQueue:阻塞隊列,存儲等待執行的任務,很重要,會對線程池運行過程產生重大影響
(4)keepAliveTime:線程沒有任務執行時最多保持多久時間終止(當線程池的線程數量大於corePoolSize,如果這時沒有新的任務提交,核心線程不會立即銷毀,而是讓他等待,直到超過這里的keepAliveTime)
(5)unit:keepAliveTime的時間單位
(6)ThreadFactoryL:線程工廠,用來創建線程
(7)rejectHandler:當拒絕處理任務時的策略
如果運行的線程數少於我們的corePoolSize,直接創建新線程來處理任務,即使線程池中的其他線程是空閑的;
如果運行的線程數大於我們的corePoolSize,且小於我們的maximumPoolSize,則只有當workQueue滿了的時候,才創建新的線程去處理任務;
如果我們設置的corePoolSize和maximumPoolSize相同的話,那么創建的線程池的大小是固定的,這個時候如果有新任務提交,並且workQueue沒滿的時候,就把請求放到workQueue里面,等待有空閑的線程來這個隊列取出任務;
如果運行的線程數大於maximumPoolSize的時候,這時workQueue也已經滿了,那么它就要指定我們后面要講的指定策略參數來處理。
接下來我們詳細介紹一下workQueue隊列:
它是保存等待執行的任務的一個阻塞隊列,當我們提交一個新的任務到線程池的時候,線程池會根據當前線程池中正在運行着的數量來決定該任務的處理方式,處理方式總共有三種:直接切換,無界隊列,有界隊列。直接切換就是之前提到的SynchronousQueue,使用的無界隊列一般是使用鏈表隊列-LinkedBlockingQueue,如果采用無界隊列,線程池中能創建的最大線程數就是corePoolSize。我們這里介紹的workQueue的有界隊列,一般是ArrayBlockingQueue,使用這種方式呢我們可以把線程池最大線程數目限制為maximumPoolSize。
詳細介紹一下rejectHandler的四種策略:
如果workQueue阻塞隊列滿了,並且沒有空閑的線程池,此時,繼續提交任務,需要采取一種策略來處理這個任務。
線程池總共提供了四種策略:
- 直接拋出異常,這也是默認的策略。實現類為AbortPolicy。
- 用調用者所在的線程來執行任務。實現類為CallerRunsPolicy。、
- 丟棄隊列中最靠前的任務並執行當前任務。實現類為DiscardOldestPolicy。
- 直接丟棄當前任務。實現類為DiscardPolicy。
三、線程池的幾種狀態
如圖,線程池的5種狀態轉換如下:
分別來介紹一下這5中狀態:
1、RUNNING
(1) 狀態說明:線程池處在RUNNING狀態時,能夠接收新任務,以及對阻塞隊列中已添加的任務進行處理。
(2) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處於RUNNING狀態,並且線程池中的任務數為0!
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
2、 SHUTDOWN
(1) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理阻塞隊列中已添加的任務。
(2) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。
3、STOP
(1) 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理阻塞隊列種已添加的任務,並且會中斷正在處理的任務。
(2) 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1) 狀態說明:當所有的任務已終止,ctl(ctl
是對線程池的運行狀態和線程池中有效線程的數量進行控制的一個字段, 它包含兩部分的信息: 線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount))記錄的”任務數量”為0,線程池會變為TIDYING狀態。當線程池變為TIDYING狀態時,會執行鈎子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。
(2) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空並且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。
當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING。
5、 TERMINATED
(1) 狀態說明:線程池徹底終止,就變成TERMINATED狀態。
(2) 狀態切換:線程池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING -> TERMINATED。
四、ThreadPoolExecutor的幾個方法
1.基礎方法
- execute():提交任務,交給線程池執行
- submit():提交任務,能夠返回執行結果。(execute+Future)
- shutdown():關閉線程池,等待任務都執行完
- shutdownNow():關閉線程池,不等待任務執行完
2.監控方法
- getTaskCount():線程池已執行和未執行的任務總數
- getCompletedTaskCount():已完成的任務數量
- getPoolSize():線程池當前的線程數量
- getActiveCount():當前線程池中正在執行任務的線程數量
3.線程池類圖
如圖所示,J.U.C中有三個Executor接口:
- Executor:一個運行新任務的簡單接口;
- ExecutorService:擴展了Executor接口。添加了一些用來管理執行器生命周期和任務生命周期的方法;
- ScheduledExecutorService:擴展了ExecutorService。支持Future和定期執行任務。
而我們的ThreadPoolExecutor是功能最強大的,因為它可以自定義參數。
4.J.U.C框架是極其強大的,他還為我們提供了許多額外的方法
- Executors.newCachedThreadPool():可以創建一個可緩存的線程池,如果線程池長度超過了處理的需要,可以靈活回收空閑線程;如果沒有可以回收的,那么就新建線程。
- Executors.newFixedThreadPool():它創建的是一個定長的線程池,可以控制線程的最大並發數,超出的線程會在隊列中等待。
- Executors.newScheduledThreadPool():它創建的也是一個定長的線程池,支持定時以及周期性的任務執行。
- Executors.newSingleThreadPool():它創建的是一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指令順序執行。
5 接下來我們分別對這幾個方法做一個了解
(1)這是newCachedThreadPool的基本方法
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
還有一個可以傳入指定的ThreadFactory參數
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
(2)這是newFixedThreadPool的基本方法
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
同樣還有一個可以傳入指定的ThreadFactory參數
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
(3)這是newScheduledThreadPool的基本方法
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
同樣的,可以傳入指定的threadFactory參數
public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); }
(4)這是newSingleThreadExecutor的基本方法
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
同樣的,也可以傳入指定的threadFactory參數
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
需要注意的是,他們返回值都是ExecutorService對象,而非ThreadPoolExecutor對象,因此缺少監控方法和部分基本方法,只有shutdown(),submit(),shutdownNow()基本方法,這是ExecutorService的局限性。
6.最后,分別演示一下代碼例子
(1)newCachedThreadPool
mport lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class ThreadPoolExample1 { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for(int i=0;i<10;i++){ final int index = i; executorService.execute(new Runnable() { @Override public void run() { log.info("task:{}",index); } }); } executorService.shutdown(); } }
結果:
21:52:22.217 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample1 - task:1 21:52:22.217 [pool-1-thread-4] INFO com.practice.aqs.ThreadPoolExample1 - task:3 21:52:22.217 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample1 - task:0 21:52:22.217 [pool-1-thread-8] INFO com.practice.aqs.ThreadPoolExample1 - task:7 21:52:22.217 [pool-1-thread-6] INFO com.practice.aqs.ThreadPoolExample1 - task:5 21:52:22.217 [pool-1-thread-5] INFO com.practice.aqs.ThreadPoolExample1 - task:4 21:52:22.217 [pool-1-thread-7] INFO com.practice.aqs.ThreadPoolExample1 - task:6 21:52:22.217 [pool-1-thread-10] INFO com.practice.aqs.ThreadPoolExample1 - task:9 21:52:22.217 [pool-1-thread-9] INFO com.practice.aqs.ThreadPoolExample1 - task:8 21:52:22.217 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample1 - task:2
(2)
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class ThreadPoolExample2 { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); for(int i=0;i<10;i++){ final int index = i; executorService.execute(new Runnable() { @Override public void run() { log.info("task:{}",index); } }); } executorService.shutdown(); } }
結果:
21:55:17.350 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample2 - task:0 21:55:17.350 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample2 - task:1 21:55:17.350 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:2 21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:5 21:55:17.354 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample2 - task:4 21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:6 21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:8 21:55:17.354 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample2 - task:7 21:55:17.354 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample2 - task:3 21:55:17.354 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample2 - task:9
(3)newSingleThreadExecutor
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class ThreadPoolExample3 { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for(int i=0;i<10;i++){ final int index = i; executorService.execute(new Runnable() { @Override public void run() { log.info("task:{}",index); } }); } executorService.shutdown(); } }
結果:
21:57:01.913 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:0 21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:1 21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:2 21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:3 21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:4 21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:5 21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:6 21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:7 21:57:01.916 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:8 21:57:01.917 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample3 - task:9
(4)newScheduledThreadPool(注意:返回值與以上不同)
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ThreadPoolExample4 {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
log.info("schedule run");
}
},3, TimeUnit.SECONDS);//延時3秒執行該輸出任務
scheduledExecutorService.shutdown();
}
}
結果(這個結果是延時3秒出來的):
22:03:22.303 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run
Process finished with exit code 0
除了schedule()方法,他還有scheduleAtFixedRate(以指定的速率去執行任務),scheduleWithFixedDelay(以指定的一個延遲執行任務)
先說scheduleAtFixedRate
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Slf4j public class ThreadPoolExample4 { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { log.info("schedule run"); } },1,3,TimeUnit.SECONDS);//延遲一秒,每隔3秒執行一次任務 } }
結果(注意觀察時間間隔):
22:10:22.168 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run 22:10:25.168 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run 22:10:28.168 [pool-1-thread-2] INFO com.practice.aqs.ThreadPoolExample4 - schedule run 22:10:31.168 [pool-1-thread-1] INFO com.practice.aqs.ThreadPoolExample4 - schedule run 22:10:34.167 [pool-1-thread-3] INFO com.practice.aqs.ThreadPoolExample4 - schedule run
五、線程池的合理配置
1.CPU密集型
盡量使用較小的線程池,一般Cpu核心數+1
因為CPU密集型任務CPU的使用率很高,若開過多的線程,只能增加線程上下文的切換次數,帶來額外的開銷
2.IO密集型
方法一:可以使用較大的線程池,一般CPU核心數 * 2
IO密集型CPU使用率不高,可以讓CPU等待IO的時候處理別的任務,充分利用cpu時間
下面舉個例子:
比如平均每個線程CPU運行時間為0.5s,而線程等待時間(非CPU運行時間,比如IO)為1.5s,CPU核心數為8,那么根據上面這個公式估算得到:((0.5+1.5)/0.5)*8=32。這個公式進一步轉化為:
最佳線程數目 = (線程等待時間與線程CPU時間之比 + 1)* CPU數目
可以將任務分為CPU密集型和IO密集型,然后分別使用不同的線程池去處理,按情況而定。