一、阻塞隊列
1.介紹
阻塞隊列會對當前線程產生阻塞,比如一個線程從一個空的阻塞隊列中取元素,此時線程會被阻塞直到阻塞隊列中有了元素。當隊列中有元素后,被阻塞的線程會自動被喚醒(不需要我們編寫代碼去喚醒)。
2.實現
ArrayBlockingQueue:基於數組實現的一個阻塞隊列,在創建ArrayBlockingQueue對象時必須制定容量大小。並且可以指定公平性與非公平性,默認情況下為非公平的,即不保證等待時間最長的隊列最優先能夠訪問隊列。
LinkedBlockingQueue:基於鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE。
PriorityBlockingQueue:以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標志),前面2種都是有界隊列。
DelayQueue:基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
二、線程池
1.介紹
使用線程池的好處有:1.創建/銷毀線程伴隨着系統開銷,過於頻繁的創建/銷毀線程,會很大程度上影響處理效率 2.線程並發數量過多,搶占系統資源從而導致阻塞 3.線程池可以對線程進行一些簡單的管理
ThreadPoolExecutor類是線程池中最核心的一個類, 在ThreadPoolExecutor類中提供了四個構造方法,其中 三個構造器都是調用的第四個構造器進行的初始化工作。
2.構造方法為:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
七個參數的含義:
* corePoolSize:核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
* maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;
* keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
* unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
TimeUnit.DAYS;
//天
TimeUnit.HOURS;
//小時
TimeUnit.MINUTES;
//分鍾
TimeUnit.SECONDS;
//秒
TimeUnit.MILLISECONDS;
//毫秒
TimeUnit.MICROSECONDS;
//微妙
TimeUnit.NANOSECONDS;
//納秒
|
* workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
|
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。
* threadFactory:線程工廠,主要用來創建線程;
* handler:表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
|
3.線程池的執行策略
* 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
* 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
* 如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
* 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。
4..測試
public
class
Test {
public
static
void
main(String[] args) {
ThreadPoolExecutor executor =
new
ThreadPoolExecutor(
5
,
10
,
200
, TimeUnit.MILLISECONDS,
new
ArrayBlockingQueue<Runnable>(
5
));
for
(
int
i=
0
;i<
15
;i++){
MyTask myTask =
new
MyTask(i);
executor.execute(myTask);
System.out.println(
"線程池中線程數目:"
+executor.getPoolSize()+
",隊列中等待執行的任務數目:"
+
executor.getQueue().size()+
",已執行玩別的任務數目:"
+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class
MyTask
implements
Runnable {
private
int
taskNum;
public
MyTask(
int
num) {
this
.taskNum = num;
}
@Override
public
void
run() {
System.out.println(
"正在執行task "
+taskNum);
try
{
Thread.currentThread().sleep(
4000
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
System.out.println(
"task "
+taskNum+
"執行完畢"
);
}
}
|
這個程序中當線程池中線程的數目大於5時,便將任務放入任務緩存隊列里面,當任務緩存隊列滿了之后,便創建新的線程。
5.使用Executors類中提供的幾個靜態方法來創建線程池
Executors.newCachedThreadPool();
//創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor();
//創建容量為1的緩沖池
Executors.newFixedThreadPool(
int
);
//創建固定容量大小的緩沖池(定長)
|
newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;1. 有且僅有一個工作線程執行任務2. 所有任務按照指定順序執行,即遵循隊列的入隊出隊規則
newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。
實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。
另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。
三、Callable
創建線程在上一篇wiki中說明了兩種方法,分別是繼承Thread,重寫run方法和實現Runnable接口,重新run方法。現在介紹第三種實現Callable接口,重寫call方法。
區別:
-
Callable可以在任務結束的時候提供一個返回值Future對象,Runnable無法提供這個功能
-
Callable的call方法分可以拋出異常,而Runnable的run方法不能拋出異常。
-
Callable規定的方法是call(),而Runnable規定的方法是run().
測試:
/*
* FileName: TestCallable
* Author: aiguo.sun
* Date: 2019/3/28 20:06
* Description: 測試callable方法
*/
package
JavaTest;
import
java.util.concurrent.Callable;
import
java.util.concurrent.ExecutionException;
import
java.util.concurrent.FutureTask;
/**
* 一、創建執行線程的方式三:實現 Callable 接口。 相較於實現 Runnable 接口的方式,方法可以有返回值,並且可以拋出異常。
*
* 二、執行 Callable 方式,需要 FutureTask 實現類的支持,用於接收運算結果。 FutureTask 是 Future 接口的實現類
*/
public
class
TestCallable {
public
static
void
main(String[] args) {
ThreadDemo td =
new
ThreadDemo();
//1.執行 Callable 方式,需要 FutureTask 實現類的支持,用於接收運算結果。
FutureTask<Integer> result =
new
FutureTask<>(td);
new
Thread(result).start();
//2.接收線程運算后的結果
try
{
//判斷是否完成
if
(!result.isDone())
{
System.out.println(
"-sorry-----------------------"
);;
}
//FutureTask 可用於 閉鎖 類似於CountDownLatch的作用,在所有的線程沒有執行完成之后這里是不會執行的
Integer sum = result.get();
System.out.println(sum);
System.out.println(
"------------------------------------"
);
}
catch
(InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
class
ThreadDemo
implements
Callable<Integer> {
@Override
public
Integer call()
throws
Exception {
int
sum =
0
;
for
(
int
i =
0
; i <=
100000000
; i++) {
sum += i;
}
return
sum;
}
}
|