如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間。那么有沒有一種辦法使得線程可以復用,就是執行完一個任務,並不被銷毀,而是可以繼續執行其他的任務?在Java中可以通過線程池來達到這樣的效果。首先我們從最核心的ThreadPoolExecutor類中的方法講起。
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。
在ThreadPoolExecutor類中提供了四個構造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。
下面解釋下一下構造器中各個參數的含義:
corePoolSize:核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;
keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
unit:參數keepAliveTime的時間單位
workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
threadFactory:線程工廠,主要用來創建線程;
handler:表示當拒絕處理任務時的策略,默認有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
線程池執行的流程:
當任務提交給ThreadPoolExecutor 線程池中,先檢查核心線程數是否已經全部使用,如果沒有交由核心線程去執行任務,如果核心線程數已經全部占用,則將任務添加到隊列里面,如果隊列已經占滿,比較當前線程池的中線程的數量是不是與超過maximumPoolSize,如果沒有查過則創建線程去執行,也就是說線程池最多可以接受多少任務呢?就是maximumPoolSize+隊列的大小。當線程池中的線程的數量大於corePoolSize數量有空閑線程則執行回收,回收時間是keepAliveTime,單位是unit,都是初始化的時候設置的。
下面通過代碼來說明:
定義一個實現了Runnable接口的類,當作任務類;
public class MyTask implements Runnable {
private int taskId;
private String taskName;
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public MyTask(int taskId, String taskName) {
this.taskId = taskId;
this.taskName = taskName;
}
@Override
public void run() {
System.out.println("taskId:" + taskId + ",taskName:" + taskName);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
定義如下的線程池:
ThreadPoolExecutor pool = new ThreadPoolExecutor(// 自定義一個線程池
1, // coreSize
2, // maxSize
60, // 60s
TimeUnit.SECONDS, new ArrayBlockingQueue<>(3) // 有界隊列,容量是3個
, Executors.defaultThreadFactory()
, new ThreadPoolExecutor.AbortPolicy()
);
該線程池最多可以放2+3個任務,現在我們放6個任務進去,看看執行的效果:
pool.execute(new MyTask(1, "任務1"));
System.out.println("活躍的線程數:"+pool.getActiveCount() + ",核心線程數:" + pool.getCorePoolSize() + ",線程池大小:" + pool.getPoolSize() + ",隊列的大小" + pool.getQueue().size());
pool.execute(new MyTask(2, "任務2"));
System.out.println("活躍的線程數:"+pool.getActiveCount() + ",核心線程數:" + pool.getCorePoolSize() + ",線程池大小:" + pool.getPoolSize() + ",隊列的大小" + pool.getQueue().size());
pool.execute(new MyTask(3, "任務3"));
System.out.println("活躍的線程數:"+pool.getActiveCount() + ",核心線程數:" + pool.getCorePoolSize() + ",線程池大小:" + pool.getPoolSize() + ",隊列的大小" + pool.getQueue().size());
pool.execute(new MyTask(4, "任務4"));
System.out.println("活躍的線程數:"+pool.getActiveCount() + ",核心線程數:" + pool.getCorePoolSize() + ",線程池大小:" + pool.getPoolSize() + ",隊列的大小" + pool.getQueue().size());
pool.execute(new MyTask(5, "任務5"));
System.out.println("活躍的線程數:"+pool.getActiveCount() + ",核心線程數:" + pool.getCorePoolSize() + ",線程池大小:" + pool.getPoolSize() + ",隊列的大小" + pool.getQueue().size());
pool.execute(new MyTask(6, "任務6"));
System.out.println("活躍的線程數:"+pool.getActiveCount() + ",核心線程數:" + pool.getCorePoolSize() + ",線程池大小:" + pool.getPoolSize() + ",隊列的大小" + pool.getQueue().size());
pool.shutdown();
我們的執行結果:

我們可以看到,首先拋出了異常,大致意思是拒絕了一個線程加入到線程池,因為我線程池最大允許5個線程的加入,當線程池滿了執行的拒絕策略是DiscardPolicy直接拒絕線程的加入,並拋出異常。
接下來,我們看每次添加一個線程打印的活躍的線程數等相關消息。
當任務1加入到線程池中:
活躍的線程數:1,核心線程數:1,線程池大小:1,隊列的大小0,也就是說,任務1加入核心未被占滿,開啟一個核心線程去執行。此時線程的大小也為1.
當任務2加入到線程池中時:
活躍的線程數:1,核心線程數:1,線程池大小:1,隊列的大小1 。也就是說,此時核心已經占滿了,隊列沒有滿,則往隊列里面增加任務。此時線程的大小仍然也為1.因為就一個核心線程在執行任務。
當任務3加入到線程池中時:同任務2.
當任務4加入到線程池中時:同任務2.此時隊列已經滿。
當任務5加入到線程池中時:
活躍的線程數:2,核心線程數:1,線程池大小:2,隊列的大小3,活躍的線程變為2,也就是maximumPoolSize數量,因為任務4加入到線程池時,線程池的隊列已經滿了,此時會檢查活躍的線程是不是大於maximumPoolSize,如果不大於則創建線程去執行任務,到底執行新加入還是隊列里面最老加入的。此時通過下面的執行結果來判斷。

我們看到任務1和任務5最先執行,任務1不用講自然會在最先執行的里面,任務5在最先執行的任務里面,說明,當線程隊列滿了,如果開起了新線程,則會去執行新加入的任務,不是從隊列里面去老的任務。從后面執行來看,當之前的任務直線完成了,線程池會從隊列里面獲取任務去執行。這就是一個線程池的大致執行流程。
當然個人覺得那個原生的拒絕策略都不太實用,比如互聯網任務,我們定義一個線程池,當線程池滿了,我們要合理的處理后續的任務,比如記錄下來下次再去執行,或者告知責任人那些任務沒有處理等等,個人任務這個應該自己定義,當線程滿了,我們可以自由控制。下面定義一個拒絕策略。
public class MyRejected implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
MyTask task = (MyTask) r;
System.out.println("報警信息:"+task.getTaskName()+" 被線程池拒絕,沒有被執行");
//可以往消息隊列中間件里面放 可以發Email等等
}
}
如上,我們實現RejectedExecutionHandler 接口。就可以自定義一個拒絕策略很簡單。
我們需要線程池的定義,使用自己的拒絕策略。
ThreadPoolExecutor pool = new ThreadPoolExecutor(// 自定義一個線程池
1, // coreSize
2, // maxSize
60, // 60s
TimeUnit.SECONDS, new ArrayBlockingQueue<>(3) // 有界隊列,容量是3個
, Executors.defaultThreadFactory()
, new MyRejected()
);
其他的代碼不用修改,執行結果如下:

現在就執行了自己自定義拒絕策略。
以上只是講解的自定義的線程池,當然java本身已經內置了一些線程,比如:
newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
這里就不詳細講解了,內部實現都是ThreadPoolExecutor ,個人傾向自定義的線程池,這樣比較靈活。
