java線程池ThreadPoolExecutor類使用詳解


在《阿里巴巴java開發手冊》中指出了線程資源必須通過線程池提供,不允許在應用中自行顯示的創建線程,這樣一方面是線程的創建更加規范,可以合理控制開辟線程的數量;另一方面線程的細節管理交給線程池處理,優化了資源的開銷。而線程池不允許使用Executors去創建,而要通過ThreadPoolExecutor方式,這一方面是由於jdk中Executor框架雖然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等創建線程池的方法,但都有其局限性,不夠靈活;另外由於前面幾種方法內部也是通過ThreadPoolExecutor方式實現,使用ThreadPoolExecutor有助於大家明確線程池的運行規則,創建符合自己的業務場景需要的線程池,避免資源耗盡的風險。

下面我們就對ThreadPoolExecutor的使用方法進行一個詳細的概述。

首先看下ThreadPoolExecutor的構造函數

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

構造函數的參數含義如下:

corePoolSize:指定了線程池中的線程數量,它的數量決定了添加的任務是開辟新的線程去執行,還是放到workQueue任務隊列中去;

maximumPoolSize:指定了線程池中的最大線程數量,這個參數會根據你使用的workQueue任務隊列的類型,決定線程池會開辟的最大線程數量;

keepAliveTime:當線程池中空閑線程數量超過corePoolSize時,多余的線程會在多長時間內被銷毀;

unit:keepAliveTime的單位

workQueue:任務隊列,被添加到線程池中,但尚未被執行的任務;它一般分為直接提交隊列、有界任務隊列、無界任務隊列、優先任務隊列幾種;

threadFactory:線程工廠,用於創建線程,一般用默認即可;

handler:拒絕策略;當任務太多來不及處理時,如何拒絕任務;

接下來我們對其中比較重要參數做進一步的了解:

一、workQueue任務隊列

上面我們已經介紹過了,它一般分為直接提交隊列、有界任務隊列、無界任務隊列、優先任務隊列;

1、直接提交隊列:設置為SynchronousQueue隊列,SynchronousQueue是一個特殊的BlockingQueue,它沒有容量,沒執行一個插入操作就會阻塞,需要再執行一個刪除操作才會被喚醒,反之每一個刪除操作也都要等待對應的插入操作。

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //maximumPoolSize設置為2 ,拒絕策略為AbortPolic策略,直接拋出異常
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for(int i=0;i<3;i++) {
            pool.execute(new ThreadTask());
        }   
    }
}

public class ThreadTask implements Runnable{
    
    public ThreadTask() {
        
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

輸出結果為

pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
    at com.hhxx.test.ThreadPool.main(ThreadPool.java:17)

可以看到,當任務隊列為SynchronousQueue,創建的線程數大於maximumPoolSize時,直接執行了拒絕策略拋出異常。

使用SynchronousQueue隊列,提交的任務不會被保存,總是會馬上提交執行。如果用於執行任務的線程數量小於maximumPoolSize,則嘗試創建新的進程,如果達到maximumPoolSize設置的最大值,則根據你設置的handler執行拒絕策略。因此這種方式你提交的任務不會被緩存起來,而是會被馬上執行,在這種情況下,你需要對你程序的並發量有個准確的評估,才能設置合適的maximumPoolSize數量,否則很容易就會執行拒絕策略;

2、有界的任務隊列:有界的任務隊列可以使用ArrayBlockingQueue實現,如下所示

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用ArrayBlockingQueue有界任務隊列,若有新的任務需要執行時,線程池會創建新的線程,直到創建的線程數量達到corePoolSize時,則會將新的任務加入到等待隊列中。若等待隊列已滿,即超過ArrayBlockingQueue初始化的容量,則繼續創建線程,直到線程數量達到maximumPoolSize設置的最大線程數量,若大於maximumPoolSize,則執行拒絕策略。在這種情況下,線程數量的上限與有界任務隊列的狀態有直接關系,如果有界隊列初始容量較大或者沒有達到超負荷的狀態,線程數將一直維持在corePoolSize以下,反之當任務隊列已滿時,則會以maximumPoolSize為最大線程數上限。

3、無界的任務隊列:有界任務隊列可以使用LinkedBlockingQueue實現,如下所示

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用無界任務隊列,線程池的任務隊列可以無限制的添加新的任務,而線程池創建的最大線程數量就是你corePoolSize設置的數量,也就是說在這種情況下maximumPoolSize這個參數是無效的,哪怕你的任務隊列中緩存了很多未執行的任務,當線程池的線程數達到corePoolSize后,就不會再增加了;若后續有新的任務加入,則直接進入隊列等待,當使用這種任務隊列模式時,一定要注意你任務提交與處理之間的協調與控制,不然會出現隊列中的任務由於無法及時處理導致一直增長,直到最后資源耗盡的問題。

4、優先任務隊列:優先任務隊列通過PriorityBlockingQueue實現,下面我們通過一個例子演示下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //優先任務隊列
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
          
        for(int i=0;i<20;i++) {
            pool.execute(new ThreadTask(i));
        }    
    }
}

public class ThreadTask implements Runnable,Comparable<ThreadTask>{
    
    private int priority;
    
    public int getPriority() {
        return priority;
    }

    public void setPriority(int priority) {
        this.priority = priority;
    }

    public ThreadTask() {
        
    }
    
    public ThreadTask(int priority) {
        this.priority = priority;
    }

    //當前對象和其他對象做比較,當前優先級大就返回-1,優先級小就返回1,值越小優先級越高
    public int compareTo(ThreadTask o) {
         return  this.priority>o.priority?-1:1;
    }
    
    public void run() {
        try {
            //讓線程阻塞,使后續任務進入緩存隊列
            Thread.sleep(1000);
            System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}

我們來看下執行的結果情況

priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1

大家可以看到除了第一個任務直接創建線程執行外,其他的任務都被放入了優先任務隊列,按優先級進行了重新排列執行,且線程池的線程數一直為corePoolSize,也就是只有一個。

通過運行的代碼我們可以看出PriorityBlockingQueue它其實是一個特殊的無界隊列,它其中無論添加了多少個任務,線程池創建的線程數也不會超過corePoolSize的數量,只不過其他隊列一般是按照先進先出的規則處理任務,而PriorityBlockingQueue隊列可以自定義規則根據任務的優先級順序先后執行。

二、拒絕策略

一般我們創建線程池時,為防止資源被耗盡,任務隊列都會選擇創建有界任務隊列,但種模式下如果出現任務隊列已滿且線程池創建的線程數達到你設置的最大線程數時,這時就需要你指定ThreadPoolExecutor的RejectedExecutionHandler參數即合理的拒絕策略,來處理線程池"超載"的情況。ThreadPoolExecutor自帶的拒絕策略如下:

1、AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作;

2、CallerRunsPolicy策略:如果線程池的線程數量達到上限,該策略會把任務隊列中的任務放在調用者線程當中運行;

3、DiscardOledestPolicy策略:該策略會丟棄任務隊列中最老的一個任務,也就是當前任務隊列中最先被添加進去的,馬上要被執行的那個任務,並嘗試再次提交;

4、DiscardPolicy策略:該策略會默默丟棄無法處理的任務,不予任何處理。當然使用此策略,業務場景中需允許任務的丟失;

以上內置的策略均實現了RejectedExecutionHandler接口,當然你也可以自己擴展RejectedExecutionHandler接口,定義自己的拒絕策略,我們看下示例代碼:

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定義拒絕策略
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+"執行了拒絕策略");
                
            }
        });
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        try {
            //讓線程阻塞,使后續任務進入緩存隊列
            Thread.sleep(1000);
            System.out.println("ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}

輸出結果:

com.hhxx.test.ThreadTask@33909752執行了拒絕策略
com.hhxx.test.ThreadTask@55f96302執行了拒絕策略
com.hhxx.test.ThreadTask@3d4eac69執行了拒絕策略
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1

可以看到由於任務加了休眠阻塞,執行需要花費一定時間,導致會有一定的任務被丟棄,從而執行自定義的拒絕策略;

三、ThreadFactory自定義線程創建

 線程池中線程就是通過ThreadPoolExecutor中的ThreadFactory,線程工廠創建的。那么通過自定義ThreadFactory,可以按需要對線程池中創建的線程進行一些特殊的設置,如命名、優先級等,下面代碼我們通過ThreadFactory對線程池中創建的線程進行記錄與命名

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定義線程工廠
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("線程"+r.hashCode()+"創建");
                //線程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        //輸出執行線程的名稱
        System.out.println("ThreadName:"+Thread.currentThread().getName());
    }
}

我們看下輸出結果

線程118352462創建
線程1550089733創建
線程865113938創建
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
線程1442407170創建
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170

可以看到線程池中,每個線程的創建我們都進行了記錄輸出與命名。

四、ThreadPoolExecutor擴展

ThreadPoolExecutor擴展主要是圍繞beforeExecute()、afterExecute()和terminated()三個接口實現的,

1、beforeExecute:線程池中任務運行前執行

2、afterExecute:線程池中任務運行完畢后執行

3、terminated:線程池退出后執行

通過這三個接口我們可以監控每個任務的開始和結束時間,或者其他一些功能。下面我們可以通過代碼實現一下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args ) throws InterruptedException
    {
        //實現自定義接口
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("線程"+r.hashCode()+"創建");
                //線程命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy()) {
    
            protected void beforeExecute(Thread t,Runnable r) {
                System.out.println("准備執行:"+ ((ThreadTask)r).getTaskName());
            }
            
            protected void afterExecute(Runnable r,Throwable t) {
                System.out.println("執行完畢:"+((ThreadTask)r).getTaskName());
            }
            
            protected void terminated() {
                System.out.println("線程池退出");
            }
        };
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask("Task"+i));
        }    
        pool.shutdown();
    }
}

public class ThreadTask implements Runnable{    
    private String taskName;
    public String getTaskName() {
        return taskName;
    }
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }
    public ThreadTask(String name) {
        this.setTaskName(name);
    }
    public void run() {
        //輸出執行線程的名稱
        System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
    }
}

我看下輸出結果

線程118352462創建
線程1550089733創建
准備執行:Task0
准備執行:Task1
TaskNameTask0---ThreadName:threadPool118352462
線程865113938創建
執行完畢:Task0
TaskNameTask1---ThreadName:threadPool1550089733
執行完畢:Task1
准備執行:Task3
TaskNameTask3---ThreadName:threadPool1550089733
執行完畢:Task3
准備執行:Task2
准備執行:Task4
TaskNameTask4---ThreadName:threadPool1550089733
執行完畢:Task4
准備執行:Task5
TaskNameTask5---ThreadName:threadPool1550089733
執行完畢:Task5
准備執行:Task6
TaskNameTask6---ThreadName:threadPool1550089733
執行完畢:Task6
准備執行:Task8
TaskNameTask8---ThreadName:threadPool1550089733
執行完畢:Task8
准備執行:Task9
TaskNameTask9---ThreadName:threadPool1550089733
准備執行:Task7
執行完畢:Task9
TaskNameTask2---ThreadName:threadPool118352462
TaskNameTask7---ThreadName:threadPool865113938
執行完畢:Task7
執行完畢:Task2
線程池退出

可以看到通過對beforeExecute()、afterExecute()和terminated()的實現,我們對線程池中線程的運行狀態進行了監控,在其執行前后輸出了相關打印信息。另外使用shutdown方法可以比較安全的關閉線程池, 當線程池調用該方法后,線程池中不再接受后續添加的任務。但是,此時線程池不會立刻退出,直到添加到線程池中的任務都已經處理完成,才會退出。

五、線程池線程數量

線程吃線程數量的設置沒有一個明確的指標,根據實際情況,只要不是設置的偏大和偏小都問題不大,結合下面這個公式即可

            /**
             * Nthreads=CPU數量
             * Ucpu=目標CPU的使用率,0<=Ucpu<=1
             * W/C=任務等待時間與任務計算時間的比率
             */
            Nthreads = Ncpu*Ucpu*(1+W/C)

以上就是對ThreadPoolExecutor類從構造函數、拒絕策略、自定義線程創建等方面介紹了其詳細的使用方法,從而我們可以根據自己的需要,靈活配置和使用線程池創建線程,其中如有不足與不正確的地方還望指出與海涵。

 

關注微信公眾號,查看更多技術文章。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM