線程池簡介
線程過多會帶來額外的開銷,其中包括創建銷毀線程的開銷、調度線程的開銷等等,同時也降低了計算機的整體性能。
線程池(Thread Pool)是一種基於池化思想管理線程的工具,它維護多個線程。在線程池中,總有幾個活躍線程。當需要使用線程來執行任務時,可以從池子中隨便拿一個空閑線程來用,當完成工作時,該線程並不會死亡,而是再次返回線程池中成為空閑狀態,等待執行下一個任務。
這種做法,一方面避免了處理任務時創建銷毀線程開銷的代價,另一方面避免了線程數量膨脹導致的過分調度問題,保證了對內核的充分利用。
線程池創建核心參數
線程池的工作流程
- 默認情況下,創建完線程池后並不會立即創建線程, 而是等到有任務提交時才會創建線程來進行處理。(除非調用prestartCoreThread或prestartAllCoreThreads方法)
- 當線程數小於核心線程數時,每提交一個任務就創建一個線程來執行,即使當前有線程處於空閑狀態,直到當前線程數達到核心線程數。
- 當前線程數達到核心線程數時,如果這個時候還提交任務,這些任務會被放到工作隊列里,等到線程處理完了手頭的任務后,會來工作隊列中取任務處理。
- 當前線程數達到核心線程數並且工作隊列也滿了,如果這個時候還提交任務,則會繼續創建線程來處理,直到線程數達到最大線程數。
- 當前線程數達到最大線程數並且隊列也滿了,如果這個時候還提交任務,則會觸發飽和策略。
- 如果某個線程的控線時間超過了keepAliveTime,那么將被標記為可回收的,並且當前線程池的當前大小超過了核心線程數時,這個線程將被終止。
飽和策略(拒絕策略)
當有界隊列被填滿后,飽和策略開始發揮作用。
- AbortPolicy:中止策略。默認的飽和策略,拋出未檢查的RejectedExecutionException。調用者可以捕獲這個異常,然后根據需求編寫自己的處理代碼。
- DiscardPolicy:拋棄策略。當新提交的任務無法保存到隊列中等待執行時,該策略會悄悄拋棄該任務。
- DiscardOldestPolicy:拋棄最舊的策略。當新提交的任務無法保存到隊列中等待執行時,則會拋棄下一個將被執行的任務,然后嘗試重新提交新的任務。
- CallerRunsPolicy:調用者運行策略。該策略實現了一種調節機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者(調用線程池執行任務的主線程)。它不會在線程池的某個線程中執行新提交的任務,而是在一個調用了execute的線程中執行該任務。當線程池的所有線程都被占用,並且工作隊列被填滿后,下一個任務會在調用execute時在主線程中執行(調用線程池執行任務的主線程)。
總體設計
Java中的線程池核心實現類是ThreadPoolExecutor。
ThreadPoolExecutor的繼承關系:
ThreadPoolExecutor運行機制:
線程池在內部實際上構建了一個生產者消費者模型,將線程和任務兩者解耦,並不直接關聯,從而良好的緩沖任務,復用線程。
線程池的運行主要分成兩部分:任務管理、線程管理。
- 任務管理部分充當生產者的角色,當任務提交后,線程池會判斷該任務后續的流轉:(1)直接申請線程執行該任務;(2)緩沖到隊列中等待線程執行;(3)拒絕該任務。
- 線程管理部分是消費者,它們被統一維護在線程池內,根據任務請求進行線程的分配,當線程執行完任務后則會繼續獲取新的任務去執行,最終當線程獲取不到任務的時候,線程就會被回收。
線程池實現例子
ThreadPool接口
public interface ThreadPool {
//提交任務到線程池
void execute(Runnable runnable);
//關閉線程池
void shutdown();
//獲取線程池的初始化大小
int getInitSize();
//獲取線程池的核心線程數量
int getCoreSize();
//獲取線程池的最大線程數量
int getMaxSize();
//獲取線程池中用於緩存任務隊列的大小
int getQueueSize();
//獲取線程池中活躍的線程的數量
int getActiveCount();
//查看線程池是否已經被shutdown
boolean isShutdown();
}
ThreadFactory接口
/**
* 創建個性化線程
*
* ThreadFactory提供創建線程的接口,以便個性化定制Thread,比如Thread應該被加入到哪個
* Thread Group中,優先級、線程名稱,以及是否為守護線程等
**/
@FunctionalInterface
public interface ThreadFactory {
Thread createThread(Runnable runnable);
}
RunnableQueue接口
/**
* 線程隊列基本操作
*
* RunnableQueue主要用於存放提交的Runnable
* 該Runnable是一個BlockedQueue,並且有limit限制
**/
public interface RunnableQueue {
//當有新的任務進來時首先會offer到隊列中
void offer(Runnable runnable);
//工作線程通過take方法獲取Runnable
Runnable take() throws InterruptedException;
//獲取任務隊列中任務的數量
int size();
}
DenyPolicy接口
/**
* 線程池滿時拒絕策略
**/
@FunctionalInterface
public interface DenyPolicy {
void reject(Runnable runnable,ThreadPool threadPool);
//該拒絕策略會直接將任務丟棄
class DiscardDenyPolicy implements DenyPolicy
{
@Override
public void reject(Runnable runnable,ThreadPool threadPool)
{
//do nothing
}
}
//該拒絕策略向任務提交者拋出異常
class AbortDenyPolicy implements DenyPolicy
{
@Override
public void reject(Runnable runnable,ThreadPool threadPool)
{
throw new RuntimeException("The runnable "+runnable+" will be abort.");
}
}
//該拒絕策略會使任務在提交者所在的線程中執行任務
class RunnerDenyPolicy implements DenyPolicy
{
@Override
public void reject(Runnable runnable,ThreadPool threadPool)
{
if(!threadPool.isShutdown())
{
runnable.run();
}
}
}
}
InternalTask
/**
* 不斷從runnableQueue中取出Runnable並執行任務
**/
public class InternalTask implements Runnable{
private final RunnableQueue runnableQueue;
private volatile boolean running=true;
public InternalTask(RunnableQueue runnableQueue){
this.runnableQueue=runnableQueue;
}
@Override
public void run()
{
//如果當前任務為running且沒有被中斷,則將其不斷地從queue中獲取runnable,然后執行run
while(running && !Thread.currentThread().isInterrupted())
{
try
{
Runnable task=runnableQueue.take();
task.run();
}catch (InterruptedException e){
running=false;
break;
}
}
}
//停止當前任務,主要會在線程池的shutdown方法中使用
public void stop()
{
this.running=false;
}
}
LinkedRunnableQueue
/**
* 雙向循環鏈表實現線程任務隊列基本操作
**/
public class LinkedRunnableQueue implements RunnableQueue{
//任務隊列的最大容量,在構造時傳入
private final int limit;
//若任務隊列中的任務已經滿了,則需要執行拒絕策略
private final DenyPolicy denyPolicy;
//存放任務的隊列
private final LinkedList<Runnable> runnableList = new LinkedList<>();
private final ThreadPool threadPool;
public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
this.limit = limit;
this.denyPolicy = denyPolicy;
this.threadPool = threadPool;
}
@Override
public void offer(Runnable runnable) {
synchronized (runnableList){
if (runnableList.size()>=limit){
//無法容納新的任務時執行拒絕策略
denyPolicy.reject(runnable,threadPool);
}else {
//將任務加入到隊尾,並且喚醒阻塞中的線程
runnableList.addLast(runnable);
runnableList.notifyAll();
}
}
}
@Override
public Runnable take() throws InterruptedException {
synchronized (runnableList){
while (runnableList.isEmpty()){
try {
//如果任務隊列沒有可執行任務,則當前線程會掛起,
//進入runnableList關聯的monitor set中等待喚醒
runnableList.wait();
}catch (InterruptedException e){
//被中斷時將異常拋出
throw e;
}
}
return runnableList.removeFirst();
}
}
@Override
public int size() {
synchronized (runnableList){
//返回當前任務隊列的任務數
return runnableList.size();
}
}
}
RunnableDenyException
/**
* 錯誤拋出
*
* RunnableDenyException是RuntimeException的子類,主要通知人物提交者,任務隊列
* 無法再接收新的任務
**/
public class RunnableDenyException extends RuntimeException{
public RunnableDenyException(String message)
{
super(message);
}
}
BasicThreadPool
/**
* 實現ThreadPool
*
* 線程池的初始化:數量控制屬性、創建線程工廠、任務隊列策略等功能
**/
public class BasicThreadPool extends Thread implements ThreadPool{
//初始化線程數量
private final int initSize;
//線程池最大線程數量
private final int maxSize;
//線程池核心線程數量
private final int coreSize;
//當前活躍的線程數量
private int activeCount;
//創建線程所需的工廠
private final ThreadFactory threadFactory;
//任務隊列
private final RunnableQueue runnableQueue;
//線程池是否已經被shutdown
private volatile boolean isShutdown = false;
//工作線程隊列
private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();
private static final DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
private final long keepAliveTime;
private final TimeUnit timeUnit;
//構造線程時傳參
public BasicThreadPool(int initSize,int maxSize,int coreSize,int queueSize){
this(initSize,maxSize,coreSize,DEFAULT_THREAD_FACTORY,queueSize, DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);
}
public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory,
int queueSize,DenyPolicy denyPolicy,
long keepAliveTime, TimeUnit timeUnit) {
this.initSize = initSize;
this.maxSize = maxSize;
this.coreSize = coreSize;
this.threadFactory = threadFactory;
this.runnableQueue = new LinkedRunnableQueue(queueSize,denyPolicy,this);
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
this.init();
}
//初始化時,先創建initSize個線程
private void init(){
start();
for (int i = 0; i < initSize; i++){
newThread();
}
}
@Override
public void execute(Runnable runnable) {
if (this.isShutdown){
throw new IllegalStateException("The thread pool is destroy");
}
//提交任務只是簡單第往任務隊列中插入Runnable
this.runnableQueue.offer(runnable);
}
private void newThread(){
//創建任務線程並且啟動
InternalTask internalTask=new InternalTask(runnableQueue);
Thread thread=this.threadFactory.createThread(internalTask);
ThreadTask threadTask=new ThreadTask(thread,internalTask);
threadQueue.offer(threadTask);
this.activeCount++;
thread.start();
}
private void removeThread(){
//從線程池移除某個線程
ThreadTask threadTask=threadQueue.remove();
threadTask.internalTask.stop();
this.activeCount--;
}
@Override
public void run() {
//run方法繼承自Thread,主要用於維護線程數量,比如擴容,回收
while (!isShutdown && !isInterrupted()){
try {
timeUnit.sleep(keepAliveTime);
}catch (InterruptedException e){
isShutdown=true;
break;
}
synchronized (this){
if (isShutdown){
break;
}
//當前隊列中有任務尚未處理,並且activeCount<coreSize則繼續擴容
if (runnableQueue.size()>0&&activeCount<coreSize){
for (int i=initSize;i<coreSize;i++){
newThread();
}
//continue的目的在於不想讓線程的擴容直接達到maxSize
continue;
}
//當前隊列中有任務尚未處理,並且activeCount<maxSize則繼續擴容
if (runnableQueue.size()>0&&activeCount<maxSize){
for (int i=coreSize;i<maxSize;i++){
newThread();
}
}
//如果任務隊列中沒有任務,則需要回收,回收至coreSize即可
if (runnableQueue.size()==0&&activeCount>coreSize){
for (int i=coreSize;i<activeCount;i++){
removeThread();
}
}
}
}
}
@Override
public void shutdown() {
synchronized (this){
if (isShutdown)return;
isShutdown=true;
threadQueue.forEach(threadTask -> {
threadTask.internalTask.stop();
threadTask.thread.interrupt();
});
this.interrupt();
}
}
@Override
public int getInitSize() {
if (isShutdown)
throw new IllegalStateException("The thread pool is destroy");
return this.initSize;
}
@Override
public int getCoreSize() {
if (isShutdown)
throw new IllegalStateException("The thread pool is destroy");
return this.coreSize;
}
@Override
public int getQueueSize() {
if (isShutdown)
throw new IllegalStateException("The thread pool is destroy");
return runnableQueue.size();
}
@Override
public int getMaxSize() {
if (isShutdown)
throw new IllegalStateException("The thread pool is destroy");
return this.maxSize;
}
@Override
public int getActiveCount() {
if (isShutdown)
throw new IllegalStateException("The thread pool is destroy");
return this.activeCount;
}
@Override
public boolean isShutdown() {
return this.isShutdown;
}
private static class ThreadTask{
Thread thread;
InternalTask internalTask;
public ThreadTask(Thread thread, InternalTask internalTask) {
this.thread = thread;
this.internalTask = internalTask;
}
}
private static class DefaultThreadFactory implements ThreadFactory{
private static final AtomicInteger group_counter=new AtomicInteger(1);
private static final ThreadGroup group =
new ThreadGroup("myGroup-"+group_counter.getAndDecrement());
public static final AtomicInteger COUNTER =new AtomicInteger(0);
@Override
public Thread createThread(Runnable runnable) {
return new Thread(group,runnable,"thread-poll-"+COUNTER.getAndDecrement());
}
}
}
測試線程池
/**
* 一個簡單的程序分別測試線程池的任務提交、線程池線程數量的動態擴展,以及線程池的銷毀功能
*/
public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
//定義線程池,初始化程數為2,核心或程數為4,最大程數為6.任務隊列最多允許1000個任務
final ThreadPool threadPool=new BasicThreadPool(2,6,4,1000);
//定義20個任務並且提交蛤線程池
for (int i=0;i<20;i++){
threadPool.execute(()->{
try {
TimeUnit.SECONDS.sleep(10);
System.out.println(Thread.currentThread().getName()+" is" +
" running and done.");
}catch (InterruptedException e){
e.printStackTrace();
}
});
}
for (; ; ){
//不斷輸出線程池的信息
System.out.println("getActiveCount = "+threadPool.getActiveCount());
System.out.println("getQueueSize = "+threadPool.getQueueSize());
System.out.println("getCoreSize = "+threadPool.getCoreSize());
System.out.println("getMaxSize = "+threadPool.getMaxSize());
System.out.println("==========================================");
TimeUnit.SECONDS.sleep(5);
}
}
}