Java並發編程:線程池的使用


     在前面的文章中,我們使用線程的時候就去創建一個線程,這樣實現起來非常簡便,但是就會有一個問題:

   如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間。那么有沒有一種辦法使得線程可以復用,就是執行完一個任務,並不被銷毀,而是可以繼續執行其他的任務?

     在Java中可以通過線程池來達到這樣的效果。今天我們就來詳細講解一下Java的線程池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然后再講述它的實現原理,接着給出了它的使用示例,最后討論了一下如何合理配置線程池的大小。

一、Java中的ThreadPoolExecutor類

     java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。在ThreadPoolExecutor類中提供了四個構造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

     從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。

下面解釋下一下構造器中各個參數的含義:

下面解釋下一下構造器中各個參數的含義:

  • corePoolSize:核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;

  • maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;

  • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;

  • unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:

TimeUnit.DAYS;               //
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鍾
TimeUnit.SECONDS;           //
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

  • threadFactory:線程工廠,主要用來創建線程;

  • handler:表示當拒絕處理任務時的策略,有以下四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務 

     具體參數的配置與線程池的關系將在下一節講述。從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:

public abstract class AbstractExecutorService implements ExecutorService {
 
     
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}

AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。我們接着看ExecutorService接口的實現:

public interface ExecutorService extends Executor {
 
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現:

public interface Executor {
    void execute(Runnable command);
}

     到這里,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關系了。Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的。然后ExecutorService接口繼承了Executor接口,並聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等。抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法。然后ThreadPoolExecutor繼承了類AbstractExecutorService。

    在ThreadPoolExecutor類中有幾個非常重要的方法:

execute()
submit()
shutdown()
shutdownNow()

      execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。

   submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。

   shutdown()和shutdownNow()是用來關閉線程池的。還有很多其他的方法,比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,有興趣的朋友可以自行查閱API。

參數 說明
corePoolSize 線程池中核心線程數量
maximumPoolSize 線程池中最大線程數量
keepAliveTime 非核心線程存活時間
unit keepAliveTime的時間單位
workQueue 存放任務的隊列
threadFactory 用來生產線程的工廠
handler 當線程池中不能再放入任務時執行的handler

 

 

 

 

 

 

 

如果有一個corePoolSize為5,maximumPoolSize為10的線程池,可用下圖形象展示:

    這里要說明一下:所謂核心線程非核心線程只是一個數量的說明,並不是說核心線程非核心線程有本質上的不同,它們都是普通的線程而已,並且線程特性都一樣,不是說核心線程有特殊標記,線程池能“認”出來這是核心線程,對其有特殊操作。

二、深入剖析線程池實現原理

在上一節我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實現原理,將從下面幾個方面講解:

1.線程池狀態

2.任務的執行

3.線程池中的線程初始化

4.任務緩存隊列及排隊策略

5.任務拒絕策略

6.線程池的關閉

7.線程池容量的動態調整


1.線程池狀態

在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

runState表示當前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性。下面的幾個static final變量表示runState可能的幾個取值。

當創建線程池后,初始時,線程池處於RUNNING狀態;

如果調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢;

如果調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,並且會去嘗試終止正在執行的任務;

當線程池處於SHUTDOWN或STOP狀態,並且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束后,線程池被設置為TERMINATED狀態。

2.任務的執行

在了解將任務提交給線程池到任務執行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:

private final BlockingQueue<Runnable> workQueue;              //任務緩存隊列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock();   //線程池的主要狀態鎖,對線程池狀態(比如線程池大小
                                                              //、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>();  //用來存放工作集
 
private volatile long  keepAliveTime;    //線程存活時間   
private volatile boolean allowCoreThreadTimeOut;   //是否允許為核心線程設置存活時間
private volatile int   corePoolSize;     //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int   maximumPoolSize;   //線程池最大能容忍的線程數
 
private volatile int   poolSize;       //線程池中當前的線程數
 
private volatile RejectedExecutionHandler handler; //任務拒絕策略
 
private volatile ThreadFactory threadFactory;   //線程工廠,用來創建線程
 
private int largestPoolSize;   //用來記錄線程池中曾經出現過的最大線程數
 
private long completedTaskCount;   //用來記錄已經執行完畢的任務個數

     每個變量的作用都已經標明出來了,這里要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。

     corePoolSize在很多地方被翻譯成核心池大小,其實我的理解這個就是線程池的大小。舉個簡單的例子:

     假如有一個工廠,工廠里面有10個工人,每個工人同時只能做一件任務。因此只要當10個工人中有工人是空閑的,來了任務就分配給空閑的工人做。當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待。如果說新任務數目增長的速度遠遠大於工人做任務的速度,那么此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來。然后就將任務也分配給這4個臨時工人做。如果說着14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。當這14個工人當中有人空閑時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。

     這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。不過為了方便理解,在本文后面還是將corePoolSize翻譯成核心池大小。largestPoolSize只是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關系。

     下面我們進入正題,看一下任務從提交到最終執行完畢經歷了哪些過程。

   在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法里面最終調用的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

上面的代碼可能看起來不是那么容易理解,下面我們一句一句解釋:

首先,判斷提交的任務command是否為null,若是null,則拋出空指針異常;

接着是這句,這句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

     由於是或條件運算符,所以先計算前半部分的值,如果線程池中當前線程數不小於核心池大小,那么就會直接進入下面的if語句塊了。如果線程池中當前線程數小於核心池大小,則接着執行后半部分,也就是執行

addIfUnderCorePoolSize(command)

  如果執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,否則整個方法就直接執行完畢了。如果執行完addIfUnderCorePoolSize這個方法返回false,然后接着判斷:

if (runState == RUNNING && workQueue.offer(command))

如果當前線程池處於RUNNING狀態,則將任務放入任務緩存隊列;如果當前線程池不處於RUNNING狀態或者任務放入緩存隊列失敗,則執行:

addIfUnderMaximumPoolSize(command)

如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。

回到前面:

if (runState == RUNNING && workQueue.offer(command))

這句的執行,如果說當前線程池處於RUNNING狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:

if (runState != RUNNING || poolSize == 0)

這句判斷是為了防止在將此任務添加進任務緩存隊列的同時其他線程突然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。如果是這樣就執行:

ensureQueuedTaskHandled(command)

進行應急處理,從名字可以看出是保證 添加到任務緩存隊列中的任務得到處理。

我們接着看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);        //創建線程去執行firstTask任務   
        } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

     這個是addIfUnderCorePoolSize方法的具體實現,從名字可以看出它的意圖就是當低於核心池大小時執行的方法。下面看其具體實現,首先獲取到鎖,因為這地方涉及到線程池狀態的變化,先通過if語句判斷當前線程池中的線程數目是否小於核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有線程池當前線程數目小於核心池大小才會執行addIfUnderCorePoolSize方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷過程中並沒有加鎖,因此可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完之后,在其他線程中又向線程池提交了任務,就可能導致poolSize不小於corePoolSize了,所以需要在這個地方繼續判斷。然后接着判斷線程池的狀態是否為RUNNING,原因也很簡單,因為有可能在其他線程中調用了shutdown或者shutdownNow方法。然后就是執行

t = addThread(firstTask);

     這個方法也非常關鍵,傳進去的參數為提交的任務,返回值為Thread類型。然后接着在下面判斷t是否為空,為空則表明創建線程失敗(即poolSize>=corePoolSize或者runState不等於RUNNING),否則調用t.start()方法啟動線程。

我們來看一下addThread方法的實現:

private Thread addThread(Runnable firstTask) {
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);  //創建一個線程,執行任務   
    if (t != null) {
        w.thread = t;            //將創建的線程的引用賦值為w的成員變量       
        workers.add(w);
        int nt = ++poolSize;     //當前線程數加1       
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

     在addThread方法中,首先用提交的任務創建了一個Worker對象,然后調用線程工廠threadFactory創建了一個新的線程t,然后將線程t的引用賦值給了Worker對象的成員變量thread,接着通過workers.add(w)將Worker對象添加到工作集當中。

下面我們看一下Worker類的實現:

private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    volatile long completedTasks;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    boolean isActive() {
        return runLock.isLocked();
    }
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
        if (thread != Thread.currentThread())
        thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }
    void interruptNow() {
        thread.interrupt();
    }
 
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            boolean ran = false;
            beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶可以根據
            //自己需要重載這個方法和后面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等           
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }
 
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //當任務隊列中沒有任務時,進行清理工作       
        }
    }
}

它實際上實現了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:

Thread t = new Thread(w);

相當於傳進去了一個Runnable任務,在線程t中執行這個Runnable。既然Worker實現了Runnable接口,那么自然最核心的方法便是run()方法了:

public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}

      從run方法的實現可以看出,它首先執行的是通過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask之后,在while循環里面不斷通過getTask()去取新的任務來執行,那么去哪里取呢?自然是從任務緩存隊列里面去取,getTask是ThreadPoolExecutor類中的方法,並不是Worker類中的方法,下面是getTask方法的實現:

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數大於核心池大小或者允許為核心池線程設置空閑時間,
                //則通過poll取任務,若等待一定的時間取不到任務,則返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {    //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中斷處於空閑狀態的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

      在getTask中,先判斷當前線程池狀態,如果runState大於SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。如果runState為SHUTDOWN或者RUNNING,則從任務緩存隊列取任務。如果當前線程池的線程數大於核心池大小corePoolSize或者允許為核心池中的線程設置空閑存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就返回null。然后判斷取到的任務r是否為null,為null則通過調用workerCanExit()方法來判斷當前worker是否可以退出,我們看一下workerCanExit()的實現:

private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    //如果runState大於等於STOP,或者任務緩存隊列為空了
    //或者  允許為核心池線程設置空閑存活時間並且線程池中的線程數目大於1
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

     也就是說如果線程池處於STOP狀態、或者任務隊列已為空或者允許為核心池線程設置空閑存活時間並且線程數大於1時,允許worker退出。如果允許worker退出,則調用interruptIdleWorkers()中斷處於空閑狀態的worker,我們看一下interruptIdleWorkers()的實現:

void interruptIdleWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)  //實際上調用的是worker的interruptIfIdle()方法
            w.interruptIfIdle();
    } finally {
        mainLock.unlock();
    }
}

從實現可以看出,它實際上調用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {    //注意這里,是調用tryLock()來獲取鎖的,因為如果當前worker正在執行任務,鎖已經被獲取了,是無法獲取到鎖的
                                //如果成功獲取了鎖,說明當前worker處於空閑狀態
        try {
    if (thread != Thread.currentThread())  
    thread.interrupt();
        } finally {
            runLock.unlock();
        }
    }
}

     這里有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閑時,就從任務緩存隊列中取一個任務交給空閑線程執行。但是在這里,並沒有采用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和復雜度,這里直接讓執行完任務的線程去任務緩存隊列里面取任務來執行。

  我們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在於addIfUnderMaximumPoolSize方法是在線程池中的線程數達到了核心池大小並且往任務隊列中添加任務失敗的情況下執行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

看到沒有,其實它和addIfUnderCorePoolSize方法的實現基本一模一樣,只是if語句判斷條件中的poolSize < maximumPoolSize不同而已。

到這里,大部分朋友應該對任務提交給線程池之后到被執行的整個過程有了一個基本的了解,下面總結一下:

1)首先,要清楚corePoolSize和maximumPoolSize的含義;

2)其次,要知道Worker是用來起到什么作用的;

3)要知道任務提交給線程池之后的處理策略,這里總結一下主要有4點:

  • 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
  • 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
  • 如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
  • 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

接下來用一個流程圖來講一講,他究竟干了什么事。

 3.線程池中的線程初始化

默認情況下,創建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創建線程。在實際中如果需要線程池創建之后立即創建線程,可以通過以下兩個方法辦到:

  • prestartCoreThread():初始化一個核心線程;

  • prestartAllCoreThreads():初始化所有核心線程

下面是這2個方法的實現:

public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意傳進去的參數是null
}
 
public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null
        ++n;
    return n;
}

注意上面傳進去的參數是null,根據第2小節的分析可知如果傳進去的參數為null,則最后執行線程會阻塞在getTask方法中的

r = workQueue.take();

即等待任務隊列中有任務。

4.任務緩存隊列及排隊策略

在前面我們多次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。workQueue的類型為BlockingQueue<Runnable>,通常可以取下面三種類型:

1)ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;

2)LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;

3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

5.任務拒絕策略

當線程池的任務緩存隊列已滿並且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略,通常有以下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

6.線程池的關閉

ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:

  • shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務。

  • shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務。

7.線程池容量的動態調整

ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:設置核心池大小

  • setMaximumPoolSize:設置線程池最大能創建的線程數目大小

當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創建新的線程來執行任務。

三、使用示例

前面我們討論了關於線程池的實現原理,這一節我們來看一下它的具體使用:

package com.demo.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test {

    public static void main(String[] args) {   
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,  //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
            10, //線程池最大能容忍的線程數
            200, //線程存活時間   
            TimeUnit.MILLISECONDS, //參數keepAliveTime的時間單位
            new ArrayBlockingQueue<Runnable>(5) //任務緩存隊列,用來存放等待執行的任務
        );
         
        for(int i=0;i<15;i++){
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
            executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();
    }
}
package com.demo.test;

public class MyTask implements Runnable{
    
    private int taskNum;
    
    public MyTask(int num) {
        this.taskNum = num;
    }
     
    @Override
    public void run() {
        System.out.println("正在執行task "+taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task "+taskNum+"執行完畢");
    }
}

執行結果:

正在執行task 0
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 1
線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 2
線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 3
線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 4
線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
正在執行task 10
線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
正在執行task 11
線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
正在執行task 12
線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
正在執行task 13
線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
正在執行task 14
task 1執行完畢
task 0執行完畢
正在執行task 5
正在執行task 6
task 4執行完畢
正在執行task 7
task 3執行完畢
正在執行task 8
task 2執行完畢
正在執行task 9
task 11執行完畢
task 10執行完畢
task 13執行完畢
task 14執行完畢
task 12執行完畢
task 6執行完畢
task 5執行完畢
task 7執行完畢
task 8執行完畢
task 9執行完畢

     從執行結果可以看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列里面,當任務緩存隊列滿了之后,便創建新的線程。如果上面程序中,將for循環中改成執行20個任務,就會拋出任務拒絕異常了。

不過在java doc中,並不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來創建線程池:

Executors.newCachedThreadPool();        //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //創建容量為1的緩沖池
Executors.newFixedThreadPool(int);    //創建固定容量大小的緩沖池

下面是這三個靜態方法的具體實現:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。

  • newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  • newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;

  • newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。

 實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。

 另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。

幾種常見的線程池

    Executors 是提供了一組工廠方法用於創建常用的 ExecutorService ,分別是 FixedThreadPool,CachedThreadPool 以及 SingleThreadExecutor。這三種ThreadPoolExecutor都是調用 ThreadPoolExecutor 構造函數進行創建,區別在於參數不同。

1、FixedThreadPool - 線程池大小固定,任務隊列無界

下面是 Executors 類 newFixedThreadPool 方法的源碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

    可以看到 corePoolSize 和 maximumPoolSize 設置成了相同的值,此時不存在線程數量大於核心線程數量的情況,所以KeepAlive時間設置不會生效。任務隊列使用的是不限制大小的 LinkedBlockingQueue ,由於是無界隊列所以容納的任務數量沒有上限,因此,FixedThreadPool的行為如下:

1)從線程池中獲取可用線程執行任務,如果沒有可用線程則使用ThreadFactory創建新的線程,直到線程數達到nThreads。

2)線程池線程數達到nThreads以后,新的任務將被放入隊列。

    FixedThreadPool的優點是能夠保證所有的任務都被執行,永遠不會拒絕新的任務;同時缺點是隊列數量沒有限制,在任務執行時間無限延長的這種極端情況下會造成內存問題。

例子:

package com.demo.threadPool;

public class MyThread extends Thread {
    
    private Integer num; // 正在執行的任務數
    public MyThread(Integer num) {
        this.num = num;
    }
    
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" 正在執行第 "+ num + "個任務");
        try {
            Thread.sleep(500);// 模擬執行任務需要耗時
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+" 執行完畢第 " + num + "個任務");
    } 

}
package com.demo.threadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class FixedThreadExecutorTest {

    public static void main(String[] args) {
        
        //創建固定大小的線程池
        ExecutorService executor=Executors.newFixedThreadPool(2);
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
       
        for(int i = 1; i <= 5; i++) {
            Thread t = new MyThread(i);
            //將線程放到池中執行
            threadPoolExecutor.execute(t);
            System.out.println("線程池中現在的線程數目是:"+threadPoolExecutor.getPoolSize()+",  隊列中正在等待執行的任務數量為:"+  
                    threadPoolExecutor.getQueue().size());
        }
        
        //關閉線程池
        threadPoolExecutor.shutdown();
    }
}

運行結果:

線程池中現在的線程數目是:1,  隊列中正在等待執行的任務數量為:0
pool-1-thread-1 正在執行第 1個任務
線程池中現在的線程數目是:2,  隊列中正在等待執行的任務數量為:0
pool-1-thread-2 正在執行第 2個任務
線程池中現在的線程數目是:2,  隊列中正在等待執行的任務數量為:1
線程池中現在的線程數目是:2,  隊列中正在等待執行的任務數量為:2
線程池中現在的線程數目是:2,  隊列中正在等待執行的任務數量為:3
pool-1-thread-2 執行完畢第 2個任務
pool-1-thread-1 執行完畢第 1個任務
pool-1-thread-2 正在執行第 3個任務
pool-1-thread-1 正在執行第 4個任務
pool-1-thread-2 執行完畢第 3個任務
pool-1-thread-1 執行完畢第 4個任務
pool-1-thread-2 正在執行第 5個任務
pool-1-thread-2 執行完畢第 5個任務

2、SingleThreadExecutor - 線程池大小固定為1,任務隊列無界

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

    這個工廠方法中使用無界LinkedBlockingQueue,並且將線程數設置成1,除此以外還使用FinalizableDelegatedExecutorService類進行了包裝。這個包裝類的主要目的是為了屏蔽ThreadPoolExecutor中動態修改線程數量的功能,僅保留ExecutorService中提供的方法。雖然是單線程處理,一旦線程因為處理異常等原因終止的時候,ThreadPoolExecutor會自動創建一個新的線程繼續進行工作。

    SingleThreadExecutor 適用於在邏輯上需要單線程處理任務的場景,同時無界的LinkedBlockingQueue保證新任務都能夠放入隊列,不會被拒絕;缺點和FixedThreadPool相同,當處理任務無限等待的時候會造成內存問題。

例子:

package com.demo.threadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorTest {

    public static void main(String[] args) { 
        //創建一個單線程的線程池
        ExecutorService executor= Executors.newSingleThreadExecutor();
        
        for(int i = 1; i <= 5; i++) {
            Thread t = new MyThread(i);
            //將線程放到池中執行
            executor.execute(t);
        }
        
        //關閉線程池
        executor.shutdown();
    }

}

運行結果:

pool-1-thread-1 正在執行第 1個任務
pool-1-thread-1 執行完畢第 1個任務
pool-1-thread-1 正在執行第 2個任務
pool-1-thread-1 執行完畢第 2個任務
pool-1-thread-1 正在執行第 3個任務
pool-1-thread-1 執行完畢第 3個任務
pool-1-thread-1 正在執行第 4個任務
pool-1-thread-1 執行完畢第 4個任務
pool-1-thread-1 正在執行第 5個任務
pool-1-thread-1 執行完畢第 5個任務

3、CachedThreadPool - 線程池無限大(MAX INT),等待隊列長度為1

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

     SynchronousQueue是一個只有1個元素的隊列,入隊的任務需要一直等待直到隊列中的元素被移出。核心線程數是0,意味着所有任務會先入隊列;最大線程數是Integer.MAX_VALUE,可以認為線程數量是沒有限制的。KeepAlive時間被設置成60秒,意味着在沒有任務的時候線程等待60秒以后退出。CachedThreadPool對任務的處理策略是提交的任務會立即分配一個線程進行執行,線程池中線程數量會隨着任務數的變化自動擴張和縮減,在任務執行時間無限延長的極端情況下會創建過多的線程。

例子:

package com.demo.threadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class CachedThreadExecutorTest {

public static void main(String[] args) {
        
        //創建一個可緩存的線程池
        ExecutorService executor=Executors.newCachedThreadPool();
        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
       
        for(int i = 1; i <= 5; i++) {
            Thread t = new MyThread(i);
            //將線程放到池中執行
            threadPoolExecutor.execute(t);
            System.out.println("線程池中現在的線程數目是:"+threadPoolExecutor.getPoolSize()+",  隊列中正在等待執行的任務數量為:"+  
                    threadPoolExecutor.getQueue().size());
        }
        
        //關閉線程池
        threadPoolExecutor.shutdown();
    }
}

運行結果:

線程池中現在的線程數目是:1,  隊列中正在等待執行的任務數量為:0
pool-1-thread-1 正在執行第 1個任務
線程池中現在的線程數目是:2,  隊列中正在等待執行的任務數量為:0
pool-1-thread-2 正在執行第 2個任務
線程池中現在的線程數目是:3,  隊列中正在等待執行的任務數量為:0
pool-1-thread-3 正在執行第 3個任務
線程池中現在的線程數目是:4,  隊列中正在等待執行的任務數量為:0
pool-1-thread-4 正在執行第 4個任務
線程池中現在的線程數目是:5,  隊列中正在等待執行的任務數量為:0
pool-1-thread-5 正在執行第 5個任務
pool-1-thread-4 執行完畢第 4個任務
pool-1-thread-5 執行完畢第 5個任務
pool-1-thread-2 執行完畢第 2個任務
pool-1-thread-3 執行完畢第 3個任務
pool-1-thread-1 執行完畢第 1個任務

三種ExecutorService特性總結

類型 核心線程數 最大線程數 Keep Alive 時間 任務隊列 任務處理策略
FixedThreadPool 固定大小 固定大小(與核心線程數相同) 0 LinkedBlockingQueue 線程池大小固定,沒有可用線程的時候任務會放入隊列等待,隊列長度無限制
SingleThreadExecutor 1 1 0 LinkedBlockingQueue 與 FixedThreadPool 相同,區別在於線程池的大小為1,適用於業務邏輯上只允許1個線程進行處理的場景
CachedThreadPool 0 Integer.MAX_VALUE 1分鍾 SynchronousQueue 線程池的數量無限大,新任務會直接分配或者創建一個線程進行執行

 

 

 

 

 

 

 

總結:

1. newSingleThreadExecutor

創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。

2.newFixedThreadPool

創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。

3. newCachedThreadPool

創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,

那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴於操作系統(或者說JVM)能夠創建的最大線程大小。

4.newScheduledThreadPool

創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。

四、如何合理配置線程池的大小

     本節來討論一個比較重要的話題:如何合理配置線程池大小,僅供參考。

     一般需要根據任務的類型來配置線程池大小:如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1。如果是IO密集型任務,參考值可以設置為2*NCPU。當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。

補充:Java阻塞隊列

    隊列以一種先進先出的方式管理數據,阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列,這兩個附加的操作是:當從隊列中獲取或者移除元素時,如果隊列為空,需要等待,直到隊列不為空;同時如果向隊列中添加元素時,此時如果隊列無可用空間,也需要等待。在多線程進行合作時,阻塞隊列是很有用的工具。

    生產者-消費者模式:阻塞隊列常用於生產者和消費者的場景,生產者線程可以定期的把中間結果存到阻塞隊列中,而消費者線程把中間結果取出並在將來修改它們。隊列會自動平衡負載,如果生產者線程集運行的比消費者線程集慢,則消費者線程集在等待結果時就會阻塞;如果生產者線程集運行的快,那么它將等待消費者線程集趕上來。

Java中的阻塞隊列

    java.util.concurrent包提供了幾種不同形式的阻塞隊列,如數組阻塞隊列ArrayBlockingQueue、鏈表阻塞隊列LinkedBlockingQueue、優先級阻塞隊列PriorityBlockingQueue和延時隊列DelayQueue等,下面簡單介紹一下這幾個阻塞隊列:

 數組阻塞隊列:ArrayBlockingQueue是一個由數組支持的有界阻塞隊列,內部維持着一個定長的數據緩沖隊列(該隊列由數組構成),此隊列按照先進先出(FIFO)的原則對元素進行排序,在構造時需要給定容量。ArrayBlockingQueue內部還保存着兩個整形變量,分別標識着隊列的頭部和尾部在數組中的位置。

 對於數組阻塞隊列,可以選擇是否需要公平性,所謂公平訪問隊列是指阻塞的所有生產者線程或消費者線程,當隊列可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產者線程,可以先往隊列里插入元素,先阻塞的消費者線程,可以先從隊列里獲取元素。通常,公平性會使你在性能上付出代價,只有在的確非常需要的時候再使用它。

我們可以使用以下代碼創建一個公平的阻塞隊列:

ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

數組阻塞隊列的公平性是使用可重入鎖實現的,其構造函數代碼如下:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
      throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

    鏈表阻塞隊列:LinkedBlockingQueue基於鏈表的有界阻塞隊列,內部維持着一個數據緩沖隊列(該隊列由鏈表構成),此隊列按照先進先出的原則對元素進行排序。當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,並緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(可以通過LinkedBlockingQueue的構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程將會被喚醒,反之對於消費者這端的處理也基於同樣的原理。需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小(Integer.Max_VALUE)的容量,這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已經被消耗殆盡了。

  LinkedBlockingQueue之所以能夠高效的處理並發數據,是因為其對於生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味着在高並發的情況下生產者和消費者可以並行地操作隊列中的數據,以此來提高整個隊列的並發性能。

    優先級阻塞隊列:PriorityBlockingQueue是一個支持優先級排序的無界阻塞隊列,默認情況下元素采取自然順序排列,也可以通過構造函數傳入的Compator對象來決定。在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是公平鎖。需要注意的是PriorityBlockingQueue並不會阻塞數據生產者,而只是在沒有可消費的數據時阻塞數據的消費者,因此使用的時候要特別注意,生產者生產數據的速度絕對不能快於消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。

    延時隊列:DelayQueue是一個支持延時獲取元素的使用優先級隊列實現的無界阻塞隊列。隊列中的元素必須實現Delayed接口和Comparable接口(用以指定元素的順序),也就是說DelayQueue里面的元素必須有public void compareTo(To)和long getDelay(TimeUnit unit)方法存在;在創建元素時可以指定多久才能從隊列中獲取當前元素,只有在延遲期滿時才能從隊列中提取元素。

    SynchronousQueue:SynchronousQueue是一種無界、無緩沖的阻塞隊列,可以認為SynchronousQueue是一個緩存值為1的阻塞隊列,但是SynchronousQueue內部並沒有數據緩存空間,數據是在配對的生產者和消費者線程之間直接傳遞的。可以這樣來理解:SynchronousQueue是一個傳球手,SynchronousQueue不存儲數據元素,隊列頭元素是第一個排隊要插入數據的線程,而不是要交換的數據,SynchronousQueue負責把生產者線程處理的數據直接傳遞給消費者線程,生產者和消費者互相等待對方,握手,然后一起離開。SynchronousQueue的吞吐量高於LinkedBlockingQueue 和 ArrayBlockingQueue。


任務阻塞隊列

    當線程池池創建的線程數量大於 corePoolSize 后,新來的任務將會加入到堵塞隊列(workQueue)中等待有空閑線程來執行。workQueue的類型為BlockingQueue,通常可以取下面三種類型:

1、ArrayBlockingQueue:基於數組的FIFO隊列,是有界的,創建時必須指定大小

2、LinkedBlockingQueue: 基於鏈表的FIFO隊列,是無界的,默認大小是 Integer.MAX_VALUE

3、synchronousQueue:一個比較特殊的隊列,雖然它是無界的,但它不會保存任務,每一個新增任務的線程必須等待另一個線程取出任務,也可以把它看成容量為0的隊列。

所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此隊列與池大小進行交互:

     如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。(如果當前運行的線程小於corePoolSize,則任務根本不會存放,添加到queue中,而是直接抄家伙(thread)開始運行)如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。

排隊有三種通用策略:

    直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線 程具有增長的可能性。

    無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因 此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如, 在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。

    有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列 (如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊 界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降 低吞吐量。

BlockingQueue的選擇。

例子一:使用直接提交策略,也即SynchronousQueue。

首先SynchronousQueue是無界的,也就是說他存儲任務的能力是沒有限制的,但是由於該Queue本身的特性,在某次添加元素后必須等待其他線程取走后才能繼續添加。在這里不是核心線程便是新創建的線程,但是我們試想一樣下,下面的場景。

我們使用一下參數構造ThreadPoolExecutor:

new ThreadPoolExecutor(
   2, 3, 30, TimeUnit.SECONDS,
   new SynchronousQueue<Runnable>(),
   new RecorderThreadFactory("CookieRecorderPool"),
   new ThreadPoolExecutor.CallerRunsPolicy());

當核心線程已經有2個正在運行.

1、此時繼續來了一個任務(A),根據前面介紹的“如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。”,所以A被添加到queue中。

2、又來了一個任務(B),且核心2個線程還沒有忙完,OK,接下來首先嘗試1中描述,但是由於使用的SynchronousQueue,所以一定無法加入進去。

3、此時便滿足了上面提到的“如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出maximumPoolSize,在這種情況下,任務將被拒絕。”,所以必然會新建一個線程來運行這個任務。

4、暫時還可以,但是如果這三個任務都還沒完成,連續來了兩個任務,第一個添加入queue中,后一個呢?queue中無法插入,而線程數達到了maximumPoolSize,所以只好執行異常策略了。

所以在使用SynchronousQueue通常要求maximumPoolSize是無界的,這樣就可以避免上述情況發生(如果希望限制就直接使 用有界隊列)。對於使用SynchronousQueue的作用jdk中寫的很清楚:此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。

什么意思?如果你的任務A1,A2有內部關聯,A1需要先運行,那么先提交A1,再提交A2,當使用SynchronousQueue我們可以保證,A1必定先被執行,在A1么有被執行前,A2不可能添加入queue中。

例子二:使用無界隊列策略,即LinkedBlockingQueue

這個就拿newFixedThreadPool來說,根據前文提到的規則:

如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。那么當任務繼續增加,會發生什么呢?

如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。OK,此時任務變加入隊列之中了,那什么時候才會添加新線程呢?

如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。這里就很有意思了, 可能會出現無法加入隊列嗎?不像SynchronousQueue那樣有其自身的特點,對於無界隊列來說,總是可以加入的(資源耗盡,當然另當別論)。換句說,永遠也不會觸發產生新的線程!corePoolSize大小的線程數會一直運行,忙完當前的,就從隊列中拿任務開始運行。所以要防止任務瘋長,比如任務運行的時間比較長,而添加任務的速度遠遠超過處理任務的時間,而且還不斷增加,不一會兒就爆了。

例子三:有界隊列,使用ArrayBlockingQueue。

這個是最為復雜的使用,所以JDK不推薦使用也有些道理。與上面的相比,最大的特點便是可以防止資源耗盡的情況發生。

舉例來說,請看如下構造方法:

new ThreadPoolExecutor(
     2, 4, 30, TimeUnit.SECONDS,
     new ArrayBlockingQueue<Runnable>(2),
     new RecorderThreadFactory("CookieRecorderPool"),
     new ThreadPoolExecutor.CallerRunsPolicy());

假設,所有的任務都永遠無法執行完。

對於首先來的A,B來說直接運行,接下來,如果來了C,D,他們會被放到queue中,如果接下來再來E,F,則增加線程運行E,F。但是如果再來任務,隊列無法再接受了,線程數也到達最大的限制了,所以就會使用拒絕策略來處理。

總結:

BlockingQueue

ArrayBlockingQueue: 基於數組的阻塞隊列,在內部維護了一個定長數組,以便緩存隊列中的數據對象。並沒有實現讀寫分離,也就意味着生產和消費不能完全並行。是一個有界隊列

LinkedBlockingQueue:基於列表的阻塞隊列,在內部維護了一個數據緩沖隊列(由一個鏈表構成),實現采用分離鎖(讀寫分離兩個鎖),從而實現生產者和消費者操作的完全並行運行。是一個無界隊列,

SynchronousQueue: 沒有緩存的隊列,生存者生產的數據直接會被消費者獲取並消費。若沒有數據就直接調用出棧方法則會報錯。

三種隊列使用場景
newFixedThreadPool 線程池采用的隊列是LinkedBlockingQueue。其優點是無界可緩存,內部實現讀寫分離,並發的處理能力高於ArrayBlockingQueue。
newCachedThreadPool 線程池采用的隊列是SynchronousQueue。其優點就是無緩存,接收到的任務均可直接處理,再次強調,慎用!
並發量不大,服務器性能較好,可以考慮使用SynchronousQueue。
並發量較大,服務器性能較好,可以考慮使用LinkedBlockingQueue。
並發量很大,服務器性能無法滿足,可以考慮使用ArrayBlockingQueue。系統的穩定最重要。

案例:

若有Thread1、Thread2、Thread3、Thread4四條線程分別統計C、D、E、F四個盤的大小,所有線程都統計完畢交給Thread5線程去做匯總,應當如何實現

package com.demo.threadPool;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 若有Thread1、Thread2、Thread3、Thread4四條線程分別統計C、D、E、F四個盤的大小,所有線程都統計完畢交給Thread5線程去做匯總,應當如何實現?
 * 思考:匯總,說明要把四個線程的結果返回給第五個線程,若要線程有返回值,推薦使用callable。Thread和Runnable都沒返回值
 */
public class ITDragonThreads {
    
    public static void main(String[] args) throws Exception {  
        // 無緩沖無界線程池
        ExecutorService executor = Executors.newFixedThreadPool(8); 
        // 相對ExecutorService,CompletionService可以更精確和簡便地完成異步任務的執行
        CompletionService<Long> completion = new ExecutorCompletionService<Long>(executor);  
        //Future<Long> f = null;
        CountWorker countWorker = null;  
        //long total = 0;  
        for (int i = 0; i < 4; i++) { // 四個線程負責統計
            countWorker = new CountWorker(i);  
            completion.submit(countWorker); 
            //f = executor.submit(countWorker);
            //total += f.get();
        }  
        // 關閉線程池
        executor.shutdown();  
        // 主線程相當於第五個線程,用於匯總數據
        long total = 0;  
        for (int i = 0; i < 4; i++) { 
            total += completion.take().get(); 
        }  
        System.out.println(total / 1024 / 1024 / 1024 +"G");  
    }
}

class CountWorker implements Callable<Long>{  
    private Integer type;  
    public CountWorker() {
    }
    public CountWorker(Integer type) {
        this.type = type;
    }

    @Override  
    public Long call() throws Exception {  
        ArrayList<String> paths = new ArrayList<>(Arrays.asList("c:", "d:", "e:", "f:"));
        return countDiskSpace(paths.get(type));  
    }  
    
    // 統計磁盤大小
    private Long countDiskSpace (String path) {  
        File file = new File(path);  
        long totalSpace = file.getTotalSpace();  
        System.out.println(path + " 總空間大小 : " + totalSpace / 1024 / 1024 / 1024 + "G");  
        return totalSpace;
    }  
}  

運行結果:

e: 總空間大小 : 400G
f: 總空間大小 : 230G
d: 總空間大小 : 299G
c: 總空間大小 : 118G
1050G

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM