線程池是什么


線程池簡介

線程過多會帶來額外的開銷,其中包括創建銷毀線程的開銷、調度線程的開銷等等,同時也降低了計算機的整體性能。

線程池(Thread Pool)是一種基於池化思想管理線程的工具,它維護多個線程。在線程池中,總有幾個活躍線程。當需要使用線程來執行任務時,可以從池子中隨便拿一個空閑線程來用,當完成工作時,該線程並不會死亡,而是再次返回線程池中成為空閑狀態,等待執行下一個任務。

這種做法,一方面避免了處理任務時創建銷毀線程開銷的代價,另一方面避免了線程數量膨脹導致的過分調度問題,保證了對內核的充分利用。

線程池創建核心參數

線程池的工作流程

  1. 默認情況下,創建完線程池后並不會立即創建線程, 而是等到有任務提交時才會創建線程來進行處理。(除非調用prestartCoreThread或prestartAllCoreThreads方法)
  2. 當線程數小於核心線程數時,每提交一個任務就創建一個線程來執行,即使當前有線程處於空閑狀態,直到當前線程數達到核心線程數。
  3. 當前線程數達到核心線程數時,如果這個時候還提交任務,這些任務會被放到工作隊列里,等到線程處理完了手頭的任務后,會來工作隊列中取任務處理。
  4. 當前線程數達到核心線程數並且工作隊列也滿了,如果這個時候還提交任務,則會繼續創建線程來處理,直到線程數達到最大線程數。
  5. 當前線程數達到最大線程數並且隊列也滿了,如果這個時候還提交任務,則會觸發飽和策略。
  6. 如果某個線程的控線時間超過了keepAliveTime,那么將被標記為可回收的,並且當前線程池的當前大小超過了核心線程數時,這個線程將被終止。

飽和策略(拒絕策略)

當有界隊列被填滿后,飽和策略開始發揮作用。

  1. AbortPolicy:中止策略。默認的飽和策略,拋出未檢查的RejectedExecutionException。調用者可以捕獲這個異常,然后根據需求編寫自己的處理代碼。
  2. DiscardPolicy:拋棄策略。當新提交的任務無法保存到隊列中等待執行時,該策略會悄悄拋棄該任務。
  3. DiscardOldestPolicy:拋棄最舊的策略。當新提交的任務無法保存到隊列中等待執行時,則會拋棄下一個將被執行的任務,然后嘗試重新提交新的任務。
  4. CallerRunsPolicy:調用者運行策略。該策略實現了一種調節機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者(調用線程池執行任務的主線程)。它不會在線程池的某個線程中執行新提交的任務,而是在一個調用了execute的線程中執行該任務。當線程池的所有線程都被占用,並且工作隊列被填滿后,下一個任務會在調用execute時在主線程中執行(調用線程池執行任務的主線程)。

總體設計

Java中的線程池核心實現類是ThreadPoolExecutor。

ThreadPoolExecutor的繼承關系:

ThreadPoolExecutor運行機制:

線程池在內部實際上構建了一個生產者消費者模型,將線程和任務兩者解耦,並不直接關聯,從而良好的緩沖任務,復用線程。

線程池的運行主要分成兩部分:任務管理、線程管理。

  • 任務管理部分充當生產者的角色,當任務提交后,線程池會判斷該任務后續的流轉:(1)直接申請線程執行該任務;(2)緩沖到隊列中等待線程執行;(3)拒絕該任務。
  • 線程管理部分是消費者,它們被統一維護在線程池內,根據任務請求進行線程的分配,當線程執行完任務后則會繼續獲取新的任務去執行,最終當線程獲取不到任務的時候,線程就會被回收。

線程池實現例子

ThreadPool接口

public interface ThreadPool {
    //提交任務到線程池
    void execute(Runnable runnable);

    //關閉線程池
    void shutdown();

    //獲取線程池的初始化大小
    int getInitSize();

    //獲取線程池的核心線程數量
    int getCoreSize();

    //獲取線程池的最大線程數量
    int getMaxSize();

    //獲取線程池中用於緩存任務隊列的大小
    int getQueueSize();

    //獲取線程池中活躍的線程的數量
    int getActiveCount();

    //查看線程池是否已經被shutdown
    boolean isShutdown();
}

ThreadFactory接口

/**
 * 創建個性化線程
 *
 * ThreadFactory提供創建線程的接口,以便個性化定制Thread,比如Thread應該被加入到哪個
 * Thread Group中,優先級、線程名稱,以及是否為守護線程等
 **/

@FunctionalInterface
public interface ThreadFactory {
    Thread createThread(Runnable runnable);
}

RunnableQueue接口

/**
 * 線程隊列基本操作
 *
 * RunnableQueue主要用於存放提交的Runnable
 * 該Runnable是一個BlockedQueue,並且有limit限制
 **/
public interface RunnableQueue {
    //當有新的任務進來時首先會offer到隊列中
    void offer(Runnable runnable);

    //工作線程通過take方法獲取Runnable
    Runnable take() throws InterruptedException;

    //獲取任務隊列中任務的數量
    int size();
}

DenyPolicy接口

/**
  * 線程池滿時拒絕策略
  **/
@FunctionalInterface
public interface DenyPolicy {
    void reject(Runnable runnable,ThreadPool threadPool);

    //該拒絕策略會直接將任務丟棄
    class DiscardDenyPolicy implements DenyPolicy
    {
        @Override
        public void reject(Runnable runnable,ThreadPool threadPool)
        {
            //do nothing
        }

    }
    //該拒絕策略向任務提交者拋出異常
    class AbortDenyPolicy implements DenyPolicy
    {
        @Override
        public void reject(Runnable runnable,ThreadPool threadPool)
        {
            throw new RuntimeException("The runnable "+runnable+" will be abort.");
        }
    }
    //該拒絕策略會使任務在提交者所在的線程中執行任務
    class RunnerDenyPolicy implements DenyPolicy
    {
        @Override
        public void reject(Runnable runnable,ThreadPool threadPool)
        {
            if(!threadPool.isShutdown())
            {
                runnable.run();
            }
        }
    }
}

InternalTask

/**
 * 不斷從runnableQueue中取出Runnable並執行任務
 **/
public class InternalTask implements Runnable{
    private final RunnableQueue runnableQueue;

    private volatile boolean running=true;

    public InternalTask(RunnableQueue runnableQueue){
        this.runnableQueue=runnableQueue;
    }

    @Override
    public void run()
    {
        //如果當前任務為running且沒有被中斷,則將其不斷地從queue中獲取runnable,然后執行run
        while(running && !Thread.currentThread().isInterrupted())
        {
            try
            {
                Runnable task=runnableQueue.take();
                task.run();
            }catch (InterruptedException e){
                running=false;
                break;
            }
        }
    }

    //停止當前任務,主要會在線程池的shutdown方法中使用
    public void stop()
    {
        this.running=false;
    }
}

LinkedRunnableQueue

/**
 * 雙向循環鏈表實現線程任務隊列基本操作
 **/
public class LinkedRunnableQueue implements RunnableQueue{
    //任務隊列的最大容量,在構造時傳入
    private final int limit;
    //若任務隊列中的任務已經滿了,則需要執行拒絕策略
    private final DenyPolicy denyPolicy;
    //存放任務的隊列
    private final LinkedList<Runnable> runnableList = new LinkedList<>();
    private final ThreadPool threadPool;

    public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
        this.limit = limit;
        this.denyPolicy = denyPolicy;
        this.threadPool = threadPool;
    }

    @Override
    public void offer(Runnable runnable) {
        synchronized (runnableList){
            if (runnableList.size()>=limit){
                //無法容納新的任務時執行拒絕策略
                denyPolicy.reject(runnable,threadPool);
            }else {
                //將任務加入到隊尾,並且喚醒阻塞中的線程
                runnableList.addLast(runnable);
                runnableList.notifyAll();
            }
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        synchronized (runnableList){
            while (runnableList.isEmpty()){
                try {
                    //如果任務隊列沒有可執行任務,則當前線程會掛起,
                    //進入runnableList關聯的monitor set中等待喚醒
                    runnableList.wait();
                }catch (InterruptedException e){
                    //被中斷時將異常拋出
                    throw e;
                }
            }
            return runnableList.removeFirst();
        }
    }

    @Override
    public int size() {
        synchronized (runnableList){
            //返回當前任務隊列的任務數
            return runnableList.size();
        }
    }
}

RunnableDenyException

/**
 * 錯誤拋出
 * 
 * RunnableDenyException是RuntimeException的子類,主要通知人物提交者,任務隊列
 * 無法再接收新的任務
 **/
public class RunnableDenyException extends RuntimeException{
    public RunnableDenyException(String message)
    {
        super(message);
    }
}

BasicThreadPool

/**
 * 實現ThreadPool
 *
 * 線程池的初始化:數量控制屬性、創建線程工廠、任務隊列策略等功能
 **/
public class BasicThreadPool extends Thread implements ThreadPool{
    //初始化線程數量
    private final int initSize;

    //線程池最大線程數量
    private final int maxSize;

    //線程池核心線程數量
    private final int coreSize;

    //當前活躍的線程數量
    private int activeCount;

    //創建線程所需的工廠
    private final ThreadFactory threadFactory;

    //任務隊列
    private final RunnableQueue runnableQueue;

    //線程池是否已經被shutdown
    private volatile boolean isShutdown = false;

    //工作線程隊列
    private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();
    private static final DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
    private static final ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
    private final long keepAliveTime;
    private final TimeUnit timeUnit;


    //構造線程時傳參
    public BasicThreadPool(int initSize,int maxSize,int coreSize,int queueSize){
        this(initSize,maxSize,coreSize,DEFAULT_THREAD_FACTORY,queueSize, DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);
    }
    public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory,
                           int queueSize,DenyPolicy denyPolicy,
                           long keepAliveTime, TimeUnit timeUnit) {
        this.initSize = initSize;
        this.maxSize = maxSize;
        this.coreSize = coreSize;
        this.threadFactory = threadFactory;
        this.runnableQueue = new LinkedRunnableQueue(queueSize,denyPolicy,this);
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;
        this.init();
    }

    //初始化時,先創建initSize個線程
    private void init(){
        start();
        for (int i = 0; i < initSize; i++){
            newThread();
        }
    }

    @Override
    public void execute(Runnable runnable) {
        if (this.isShutdown){
            throw new IllegalStateException("The thread pool is destroy");
        }
        //提交任務只是簡單第往任務隊列中插入Runnable
        this.runnableQueue.offer(runnable);
    }


    private void newThread(){
        //創建任務線程並且啟動
        InternalTask internalTask=new InternalTask(runnableQueue);
        Thread thread=this.threadFactory.createThread(internalTask);
        ThreadTask threadTask=new ThreadTask(thread,internalTask);
        threadQueue.offer(threadTask);
        this.activeCount++;
        thread.start();
    }

    private void removeThread(){
        //從線程池移除某個線程
        ThreadTask threadTask=threadQueue.remove();
        threadTask.internalTask.stop();
        this.activeCount--;
    }

    @Override
    public void run() {
        //run方法繼承自Thread,主要用於維護線程數量,比如擴容,回收
        while (!isShutdown && !isInterrupted()){
            try {
                timeUnit.sleep(keepAliveTime);
            }catch (InterruptedException e){
                isShutdown=true;
                break;
            }
            synchronized (this){
                if (isShutdown){
                    break;
                }
                //當前隊列中有任務尚未處理,並且activeCount<coreSize則繼續擴容
                if (runnableQueue.size()>0&&activeCount<coreSize){
                    for (int i=initSize;i<coreSize;i++){
                        newThread();
                    }
                    //continue的目的在於不想讓線程的擴容直接達到maxSize
                    continue;
                }
                //當前隊列中有任務尚未處理,並且activeCount<maxSize則繼續擴容
                if (runnableQueue.size()>0&&activeCount<maxSize){
                    for (int i=coreSize;i<maxSize;i++){
                        newThread();
                    }
                }
                //如果任務隊列中沒有任務,則需要回收,回收至coreSize即可
                if (runnableQueue.size()==0&&activeCount>coreSize){
                    for (int i=coreSize;i<activeCount;i++){
                        removeThread();
                    }
                }
            }
        }
    }

    @Override
    public void shutdown() {
        synchronized (this){
            if (isShutdown)return;
            isShutdown=true;
            threadQueue.forEach(threadTask -> {
                threadTask.internalTask.stop();
                threadTask.thread.interrupt();
            });
            this.interrupt();
        }
    }

    @Override
    public int getInitSize() {
        if (isShutdown)
            throw new IllegalStateException("The thread pool is destroy");
        return this.initSize;
    }

    @Override
    public int getCoreSize() {
        if (isShutdown)
            throw new IllegalStateException("The thread pool is destroy");
        return this.coreSize;
    }

    @Override
    public int getQueueSize() {
        if (isShutdown)
            throw new IllegalStateException("The thread pool is destroy");
        return runnableQueue.size();
    }

    @Override
    public int getMaxSize() {
        if (isShutdown)
            throw new IllegalStateException("The thread pool is destroy");
        return this.maxSize;
    }

    @Override
    public int getActiveCount() {
        if (isShutdown)
            throw new IllegalStateException("The thread pool is destroy");
        return this.activeCount;
    }

    @Override
    public boolean isShutdown() {
        return this.isShutdown;
    }


    private static class ThreadTask{
        Thread thread;
        InternalTask internalTask;

        public ThreadTask(Thread thread, InternalTask internalTask) {
            this.thread = thread;
            this.internalTask = internalTask;
        }
    }


    private static class DefaultThreadFactory implements ThreadFactory{
        private static final AtomicInteger group_counter=new AtomicInteger(1);
        private static final ThreadGroup group =
                new ThreadGroup("myGroup-"+group_counter.getAndDecrement());
        public static final AtomicInteger COUNTER =new AtomicInteger(0);
        @Override
        public Thread createThread(Runnable runnable) {
            return new Thread(group,runnable,"thread-poll-"+COUNTER.getAndDecrement());
        }
    }
}

測試線程池

/**
 * 一個簡單的程序分別測試線程池的任務提交、線程池線程數量的動態擴展,以及線程池的銷毀功能
 */
public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        //定義線程池,初始化程數為2,核心或程數為4,最大程數為6.任務隊列最多允許1000個任務
        final ThreadPool threadPool=new BasicThreadPool(2,6,4,1000);
        //定義20個任務並且提交蛤線程池
        for (int i=0;i<20;i++){
            threadPool.execute(()->{
                try {
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println(Thread.currentThread().getName()+" is" +
                            " running and done.");
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            });
        }
        for (; ; ){
            //不斷輸出線程池的信息
            System.out.println("getActiveCount = "+threadPool.getActiveCount());
            System.out.println("getQueueSize = "+threadPool.getQueueSize());
            System.out.println("getCoreSize = "+threadPool.getCoreSize());
            System.out.println("getMaxSize = "+threadPool.getMaxSize());
            System.out.println("==========================================");
            TimeUnit.SECONDS.sleep(5);
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM