線程池ThreadPoolExecutor、Executors參數詳解與源代碼分析


歡迎探討,如有錯誤敬請指正

如需轉載,請注明出處 http://www.cnblogs.com/nullzx/

1. ThreadPoolExecutor數據成員

Private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));

     ctl主要用於存儲線程池的工作狀態以及池中正在運行的線程數。顯然要在一個整型變量存儲兩個數據,只能將其一分為二。其中高3bit用於存儲線程池的狀態,低位的29bit用於存儲正在運行的線程數。

     線程池具有以下五種狀態,當創建一個線程池時初始化狀態為RUNNING

RUNNING

允許提交並處理任務

SHUTDOWN

不允許提交新的任務,但是會處理完已提交的任務

STOP

不允許提交新的任務,也不會處理阻塞隊列中未執行的任務,並設置正在執行的線程的中斷標志位

TIDYING

所有任務執行完畢,池中工作的線程數為0,等待執行terminated()勾子方法

TERMINATED

terminated()勾子方法執行完畢

      注意,這里說的是線程池的狀態而不是池中線程的狀態。

      調用線程池的shutdown方法,將線程池由RUNNING(運行狀態)轉換為SHUTDOWN狀態。

      調用線程池的shutdownNow方法,將線程池由RUNNING或SHUTDOWN狀態轉換為STOP狀態。

      SHUTDOWN狀態和STOP狀態先會轉變為TIDYING狀態,最終都會變為TERMINATED

Private static int runStateOf(int c)
Private static int workerCountOf(int c)
Private static int ctlOf(int rs,int wc)

      ThreadPoolExecutor同時提供上述三個方法用於池中的線程查看線程池的狀態和計算正在運行的線程數。

Private int largestPoolSize;
Private final BlockingQueue<Runnable>workQueue;
Private volatile long keepAliveTime;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;

      上述數據成員對線程池的性能也有很大的影響,我會將它們放到構造中講解。

Privatefinal HashSet<Worker> workers= new HashSet<Worker>();
Privatelong completedTaskCount;
Private volatile boolean allowCoreThreadTimeOut;
private int largestPoolSize;

       completedTaskCount表示線程池已完成的任務數。

      allowCoreThreadTimeeOut表示是否允許核心線程在空閑狀態下自行銷毀。

      largestPoolSize 表示線程池從創建到現在,池中線程的最大數量

private final HashSet<Worker> workers = new HashSet<Worker>();

     workers是個HashSet容器,它存儲的是Worker類的對象,Worker是線程池的內部類,它繼承了Runnable接口,不嚴格的情況下,可以將一個Worker對象看成Thread對象,也就是工作的線程。shutdown和shutdownNow方法中會使用workers完成對所有線程的遍歷。

Privatefinal ReentrantLock mainLock =new ReentrantLock();
Privatefinal Condition termination = mainLock.newCondition();

      mainLock主要用於同步訪問(或者說改變)線程池的狀態以及線程池的各項參數,比如completedTaskCount和workers等。

     在awaitTermination方法中,(mianLock的)termination是用於延時的條件隊列。

2. 構造函數

publicThreadPoolExecutor(intcorePoolSize,
		int maximumPoolSize,
		long keepAliveTime,
		TimeUnit unit,
		BlockingQueue<Runnable> workQueue,
		ThreadFactory threadFactory,
		RejectedExecutionHandler handler)

       線程池的構造函數參數多達7個,現在我們一一來分析它們對線程池的影響。

       corePoolSize:線程池中核心線程數的最大值

       maximumPoolSize:線程池中能擁有最多線程數

       workQueue:用於緩存任務的阻塞隊列

       我們現在通過向線程池添加新的任務來說明着三者之間的關系。

     (1)如果沒有空閑的線程執行該任務且當前運行的線程數少於corePoolSize,則添加新的線程執行該任務。

     (2)如果沒有空閑的線程執行該任務且當前的線程數等於corePoolSize同時阻塞隊列未滿,則將任務入隊列,而不添加新的線程

     (3)如果沒有空閑的線程執行該任務且阻塞隊列已滿同時池中的線程數小於maximumPoolSize,則創建新的線程執行任務。

     (4)如果沒有空閑的線程執行該任務且阻塞隊列已滿同時池中的線程數等於maximumPoolSize,則根據構造函數中的handler指定的策略來拒絕新的任務。

       注意,線程池並沒有標記哪個線程是核心線程,哪個是非核心線程,線程池只關心核心線程的數量。

       通俗解釋,如果把線程池比作一個單位的話,corePoolSize就表示正式工,線程就可以表示一個員工。當我們向單位委派一項工作時,如果單位發現正式工還沒招滿,單位就會招個正式工來完成這項工作。隨着我們向這個單位委派的工作增多,即使正式工全部滿了,工作還是干不完,那么單位只能按照我們新委派的工作按先后順序將它們找個地方擱置起來,這個地方就是workQueue,等正式工完成了手上的工作,就到這里來取新的任務。如果不巧,年末了,各個部門都向這個單位委派任務,導致workQueue已經沒有空位置放新的任務,於是單位決定招點臨時工吧(臨時工:又是我!)。臨時工也不是想招多少就找多少,上級部門通過這個單位的maximumPoolSize確定了你這個單位的人數的最大值,換句話說最多招maximumPoolSize–corePoolSize個臨時工。當然,在線程池中,誰是正式工,誰是臨時工是沒有區別,完全同工同酬。

        keepAliveTime:表示空閑線程的存活時間。

        TimeUnitunit:表示keepAliveTime的單位。

        為了解釋keepAliveTime的作用,我們在上述情況下做一種假設。假設線程池這個單位已經招了些臨時工,但新任務沒有繼續增加,所以隨着每個員工忙完手頭的工作,都來workQueue領取新的任務(看看這個單位的員工多自覺啊)。隨着各個員工齊心協力,任務越來越少,員工數沒變,那么就必定有閑着沒事干的員工。這樣的話領導不樂意啦,但是又不能輕易fire沒事干的員工,因為隨時可能有新任務來,於是領導想了個辦法,設定了keepAliveTime,當空閑的員工在keepAliveTime這段時間還沒有找到事情干,就被辭退啦,畢竟地主家也沒有余糧啊!當然辭退到corePoolSize個員工時就不再辭退了,領導也不想當光桿司令啊!

       handler:表示當workQueue已滿,且池中的線程數達到maximumPoolSize時,線程池拒絕添加新任務時采取的策略。

為了解釋handler的作用,我們在上述情況下做另一種假設。假設線程池這個單位招滿臨時工,但新任務依然繼續增加,線程池從上到下,從里到外真心忙的不可開交,阻塞隊列也滿了,只好拒絕上級委派下來的任務。怎么拒絕是門藝術,handler一般可以采取以下四種取值。

ThreadPoolExecutor.AbortPolicy()

拋出RejectedExecutionException異常

ThreadPoolExecutor.CallerRunsPolicy()

由向線程池提交任務的線程來執行該任務

ThreadPoolExecutor.DiscardOldestPolicy()

拋棄最舊的任務(最先提交而沒有得到執行的任務)

ThreadPoolExecutor.DiscardPolicy()

拋棄當前的任務

     workQueue:它決定了緩存任務的排隊策略。對於不同的應用場景我們可能會采取不同的排隊策略,這就需要不同類型的阻塞隊列,在線程池中常用的阻塞隊列有以下2種:

    (1)SynchronousQueue<Runnable>:此隊列中不緩存任何一個任務。向線程池提交任務時,如果沒有空閑線程來運行任務,則入列操作會阻塞。當有線程來獲取任務時,出列操作會喚醒執行入列操作的線程。從這個特性來看,SynchronousQueue是一個無界隊列,因此當使用SynchronousQueue作為線程池的阻塞隊列時,參數maximumPoolSizes沒有任何作用。

    (2)LinkedBlockingQueue<Runnable>:顧名思義是用鏈表實現的隊列,可以是有界的,也可以是無界的,但在Executors中默認使用無界的。

      threadFactory:指定創建線程的工廠

     實際上ThreadPoolExecutor類中還有很多重載的構造函數,下面這個構造函數在Executors中經常用到。

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
	this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
	Executors.defaultThreadFactory(), defaultHandler);
}

      注意到上述的構造方法使用Executors中的defaultThreadFactory()線程工廠和ThreadPoolExecutor中的defaultHandler拋棄策略。

      使用defaultThreadFactory創建的線程同屬於相同的線程組,具有同為Thread.NORM_PRIORITY的優先級,以及名為"pool-XXX-thread-"的線程名(XXX為創建線程時順序序號),且創建的線程都是非守護進程。

      defaultHandler缺省拋棄策是ThreadPoolExecutor.AbortPolicy()。

      除了在創建線程池時指定上述參數的值外,還可在線程池創建以后通過如下方法進行設置。

Public void allowCoreThreadTimeOut(boolean value)
Public void setKeepAliveTime(long time,TimeUnit unit)
Public void setMaximumPoolSize(int maximumPoolSize)
Public void setCorePoolSize(int corePoolSize)
Public void setThreadFactory(ThreadFactory threadFactory)
Public void setRejectedExecutionHandler(RejectedExecutionHandler handler)

3. 其它有關涉及池中線程數量的相關方法

public void allowCoreThreadTimeOut(boolean value) 
public int prestartAllCoreThreads()

     默認情況下,當池中有空閑線程,且線程的數量大於corePoolSize時,空閑時間超過keepAliveTime的線程會自行銷毀,池中僅僅會保留corePoolSize個線程。如果線程池中調用了allowCoreThreadTimeOut這個方法,則空閑時間超過keepAliveTime的線程全部都會自行銷毀,而不必理會corePoolSize這個參數。

     如果池中的線程數量小於corePoolSize時,調用prestartAllCoreThreads方法,則無論是否有待執行的任務,線程池都會創建新的線程,直到池中線程數量達到corePoolSize。

4. Executors中的線程池的工廠方法

     為了防止使用者錯誤搭配ThreadPoolExecutor構造函數的各個參數以及更加方便簡潔的創建ThreadPoolExecutor對象,JavaSE中又定義了Executors類,Eexcutors類提供了創建常用配置線程池的方法。以下是Executors常用的三個創建線程池的源代碼。

      從源碼中可以看出,Executors間接的調用了重載的ThreadPoolExecutor構造函數,並幫助用戶根據不同的應用場景,配置不同的參數。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

      newCachedThreadPool:使用SynchronousQueue作為阻塞隊列,隊列無界,線程的空閑時限為60秒。這種類型的線程池非常適用IO密集的服務,因為IO請求具有密集、數量巨大、不持續、服務器端CPU等待IO響應時間長的特點。服務器端為了能提高CPU的使用率就應該為每個IO請求都創建一個線程,以免CPU因為等待IO響應而空閑。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

      newFixedThreadPool:需指定核心線程數,核心線程數和最大線程數相同,使用LinkedBlockingQueue 作為阻塞隊列,隊列無界,線程空閑時間0秒。這種類型的線程池可以適用CPU密集的工作,在這種工作中CPU忙於計算而很少空閑,由於CPU能真正並發的執行的線程數是一定的(比如四核八線程),所以對於那些需要CPU進行大量計算的線程,創建的線程數超過CPU能夠真正並發執行的線程數就沒有太大的意義。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

      newSingleThreadExecutor:池中只有一個線程工作,阻塞隊列無界,它能保證按照任務提交的順序來執行任務。

5. 任務的提交過程

submit方法源碼

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

       submit的實現方法位於抽象類AbstractExecutorService中,而此時execute方法還未實現(而是在AbstractExecutorService的繼承類ThreadPoolExecutor中實現)。submit有三種重載方法,這里我選取了兩個常用的進行分析,可以看出無論哪個submit方法都最終調用了execute方法。

execute方法源碼

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

         由於execute方法中多次調用addWorker,我們這里就簡要介紹一下它,這個方法的主要作用就是創建一個線程來執行Runnnable對象。

addWorker(Runnable firstTask, boolean core)

       第一個參數firstTask不為null,則創建的線程就會先執行firstTask對象,然后去阻塞隊列中取任務,否直接到阻塞隊列中獲取任務來執行。第二個參數,core參數為真,則用corePoolSize作為池中線程數量的最大值;為假,則以maximumPoolSize作為池中線程數量的最大值。

      簡要分析一下execute源碼,執行一個Runnable對象時,首先通過workerCountOf(c)獲取線程池中線程的數量,如果池中的數量小於corePoolSize就調用addWorker添加一個線程來執行這個任務。否則通過workQueue.offer(command)方法入列。如果入列成功還需要在一次判斷池中的線程數,因為我們創建線程池時可能要求核心線程數量為0,所以我們必須使用addWorker(null, false)來創建一個臨時線程去阻塞隊列中獲取任務來執行。

       isRunning(c) 的作用是判斷線程池是否處於運行狀態,如果入列后發現線程池已經關閉,則出列。不需要在入列前判斷線程池的狀態,因為判斷一個線程池工作處於RUNNING狀態到執行入列操作這段時間,線程池可能被其它線程關閉了,所以提前判斷毫無意義。

addWorker源碼

private boolean addWorker(Runnable firstTask, boolean core) {
    //這個兩個for循環主要是判斷能否增加一個線程,
	//外循環來判斷線程池的狀態
	//內循環主要是個增加線程數的CAS操作
	retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // 如果是因為線程數的改變導致CAS失敗,只需要重復內循環
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);//創建線程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();//啟動線程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

6. 線程的執行過程

runWorker源碼

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

       Thread的run方法實際上調用了Worker類的runWorker方法,而Worker類繼承了AQS類,並實現了lock、unlock、trylock方法。但是這些方法不是真正意義上的鎖,所以在代碼中加鎖操作和解鎖操作沒有成對出現。runWorker方法中獲取到任務就“加鎖”,完成任務后就“解鎖”。也就是說在“加鎖”到“解鎖”的這段時間內,線程處於忙碌狀態,而其它時間段,處於空閑狀態。線程池就可以通過trylock方法來確定這個線程是否空閑。

       getTask方法的主要作用是從阻塞隊列中獲取任務。

       beforeExecute(wt, task)和afterExecute(task, thrown)是個鈎子函數,如果我們需要在任務執行之前和任務執行以后進行一些操作,那么我們可以自定義一個繼承ThreadPoolExecutor類,並覆蓋這兩個方法。

getTask源代碼

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

     可以看出如果允許線程在keepAliveTime時間內未獲取到任務線程就銷毀就調用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),否則會調用workQueue.take()方法(該方法即使獲取不到任務就會一直阻塞下去)。而確定是否使用workQueue.poll方法只有兩個條件決定,一個是當前池中的線程是否大於核心線程數量,第二個是是否允許核心線程銷毀,兩者其一滿足就會調用該方法。

7. 線程池的關閉過程

shutdown源碼

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

       advanceRunState(SHUTDOWN)的作用是通過CAS操作將線程池的狀態更改為SHUTDOWN狀態。

       interruptIdleWorkers是對空閑的線程進行中斷,它實際上調用了重載帶參數的函數interruptIdleWorkers(false)

       onShutdown也是一個鈎子函數

interruptIdleWorkers源碼

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

      通過workers容器,遍歷池中的線程,對每個線程進行tryLock()操作,如果成功說明線程空閑,則設置其中斷標志位。而線程是否響應中斷則由任務的編寫者決定。

8. 參考文章

[1] http://www.infoq.com/cn/articles/java-threadPool/

[2] http://my.oschina.net/u/1398304/blog/376827?fromerr=limo9iEj

[3] http://www.cnblogs.com/dolphin0520/p/3932921.html

[4] http://cuisuqiang.iteye.com/blog/2019372

[5] http://blog.sina.com.cn/s/blog_5eeabe8b0100v9i5.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM