ThreadPoolExecutor源碼解析(二)


 

1.ThreadPoolExcuter運行實例

 

  首先我們先看如何新建一個ThreadPoolExecutor去運行線程。然后深入到源碼中去看ThreadPoolExecutor里面使如何運作的。

public class Test {
    public static void main(String[] args){
        /**
         * 新建一個線程池
         * corePoolSize:2
         * maximumPoolSize:10
         * keepAliveTime:20
         * unit:TimeUnit.SECONDS(秒)
         * workQueue:new ArrayBlockingQueue(10)
         * threadFactory:默認
         * RejectedExecutionHandler默認
         */
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,10,20, TimeUnit.SECONDS,new ArrayBlockingQueue(10));
        //用execute添加一個線程
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

2.ThreadPoolExecute.execute方法

  可以發現,其實使用線程池ThreadPoolExcuter 就是使用這個方法,然后我們看這個方法具體的代碼。

    /**
     * 在后面執行給定任務。任務在一個新的線程中或一個存在的worker的線程池中執行。
     * 如果一個線程不能提交到excution,可能是因為這個excutor已經shundown或者因為其容量已經是最大,
     * 此時任務將會被RejectedExecutionHandler處理
     *
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 有以下3個步驟
         *
         * 1.如果少於corePoolSize的線程在運行,那么試着啟動一個新線程,其中用給定指令作為first task。
         * 這會調用addWorker去原子性得檢查runState和workerCoune,因此可以防止錯誤報警,在錯誤報警不應該時通過返回false來添加線程
         * 2.如果任務被成功排隊,我們任然應該第二次檢查是否添加一個新線程(因為可能存在在最后一次檢查后掛掉的情況)
         * 或者在進入這個方法期間線程池shutdown。所以我們再次檢查狀態,如果已關閉和有必要則退出隊列,或者如果沒有的話就開始一個新的線程。
         * 3.如果我們無法將task入隊,那么我們試圖添加新線程。如果失敗,那么知道我們shutdown或者是飽和的並拒絕task。
         */
        int c = ctl.get();
        //判斷是否小於corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果pool在運行並且能提交到隊列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //這里進行再次檢查,如果線程池沒在運行並且成功刪除task后,使用拒絕策略拒絕該task
            if (! isRunning(recheck) && remove(command))
                reject(command);
       //如果已經將task添加到隊列中,而此時沒有worker的話,那么新建一個worker。稍后這個空閑的worker就會自動去隊列里面取任務來執行
else if (workerCountOf(recheck) == 0) addWorker(null, false); } //如果無法提交那么按照拒絕策略拒絕task else if (!addWorker(command, false)) reject(command); }

  線程池的ThreadPoolExcuter基礎和 ThreadPoolExcuter  Worker基本介紹在前一節已經有說過,可以點這里查看。可以看到這個方法的主要流程,其實都在注釋里面說明了。可以發現里面主要調用了一個方法,addWorker()。 那么這個addWorker()又是什么東西呢。其實看方法名就很清楚了,就是新建一個Worker來執行你添加進來的task。

3.ThreadPoolExecute.addWorker()方法

    /**
     * 檢查當前的線程池狀態和容量,是否可以讓一個新的worker加入。如果可以,worker計數將會被調整,並且
     * 如果可能,一個新的woker將會被創建和開始,將它當作第一個任務來運行。當線程池是stopped或shutdown狀態時,
     * 將返回false。當線程工廠創建失敗而返回null或者拋出exception(比如典型的OOM)時,它也會返回fails。
     * firstTask:新線程應該第一個運行的任務。當線程數少於corePoolSize時或是隊列滿時,workers使用一個初始化的
     * first task來創建,用來進行分流。初始化空閑線程通常使用prestartCoreThread。
     * core:為true,如果使用有界的corePoolSize,否則時maxPoolSize
     * @return true if successful
     * 添加Worker
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            //狀態值
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //關於狀態值的檢測
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //關於容量的檢測
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //用到了原子CAS方法比較,使用CAS增加worker計數器成功,才能進入下一步
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //重新獲取ctl
                c = ctl.get();  // Re-read ctl
                //這里表示執行到這里的時候線程池的運行狀態改變,需要重新跳到retry處執行
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //使用firstTask初始化Worker,first可能為null,那么則表示該worker為空閑
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //持有鎖之后再次檢查,確保一致性
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        //largestPoolSize為跟蹤的目前最大線程數,因為之前已經做過判斷,所以不會越界問題
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //workerAdded是在上面最后才設置的,確保這個變量能准確表示是否添加worker成功
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //再次檢查
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  addWorker本事只是為線程池添加一個ThreadPoolExcuter  Worker,其本身所做的事情其實很簡單,但難就難在要確保安全有效得添加一個ThreadPoolExcuter  Worker。為此addWorker()方法做了很多額外的工作。比如判斷線程池的運行狀態,當前Worker數量是否已經飽和等等。可以發現在這個方法,或者說整個ThreadPoolExecutor中,很多時候都是使用雙重檢查的方式來對線程池狀態進行檢查。其實這都是為了效率,最簡單不過直接使用Synchronized或ReentranLock進行同步,但這樣效率會低很多,所以在這里,只有在萬不得已的情況下,才會使用悲觀的ReentranLock。

  addWorker的最后直接調用了t.start,這里的t其實就是Worker它自己。接下來再看Worker是如何運行的。

4.ThreadPoolExecute.runWorker()方法

    /**
     * 主要的Worker運行的循環。重復得獲取從任務隊列中取出task並執行它。
    */

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //取出firstTask,再將worker中的值-設置為null
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //不斷循環取出線程運行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //鎖住線程
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //如果當前線程是stop,那么將確認其為interrupted
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //調用鈎子
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //運行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

  從源碼中可以看出,一個ThreadPoolExcuter  Worker的工作其實就是不斷使用getTask()方法從隊列中獲取新的任務來執行。值得一提的是,初始化參數里面的時間戳參數就是在這個方法里面運用的。在循環體中每次都使用鎖以保證當前worker在運行task過程中不會被中斷。同時運行時還會去調用兩個內置的鈎子:beforeExecute()和afterExecute(),這兩個方法默認實現時空的。

  同時在運行的循環中每次都關注着ThreadPoolExecutor的運行狀態,當線程池處於中斷狀態時,循環Worker的當前線程也會中斷。

 

總結:說到這里就差不多把線程池運行task的流程說完了,當然其中忽略了很多的細節。但總而言之,ThreadPoolExecutor其實就是對worker進行管理,然后使用這些worker來執行用戶提交的task。對用戶提交的task的數量也進行一定的控制管理,比如超過一定數量時放入一個任務隊列中等等。然后對線程池規定一些狀態量,根據這些狀態量對線程池進行控制。

 

 


 

如果覺得對你有幫助,不如花0.5元請作者吃顆糖,讓他甜一下吧~~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM