源碼剖析ThreadPoolExecutor線程池及阻塞隊列


  本文章對ThreadPoolExecutor線程池的底層源碼進行分析,線程池如何起到了線程復用、又是如何進行維護我們的線程任務的呢?我們直接進入正題:

  首先我們看一下ThreadPoolExecutor類的源碼

 1 public ThreadPoolExecutor(int corePoolSize,
 2                               int maximumPoolSize,
 3                               long keepAliveTime,
 4                               TimeUnit unit,
 5                               BlockingQueue<Runnable> workQueue, 
 6                               ThreadFactory threadFactory,
 7                               RejectedExecutionHandler handler) { //拒絕策略
 8         if (corePoolSize < 0 ||
 9             maximumPoolSize <= 0 ||
10             maximumPoolSize < corePoolSize ||
11             keepAliveTime < 0)
12             throw new IllegalArgumentException();
13         if (workQueue == null || threadFactory == null || handler == null)
14             throw new NullPointerException();
15         this.acc = System.getSecurityManager() == null ?
16                 null :
17                 AccessController.getContext();
18         //核心線程
19         this.corePoolSize = corePoolSize;
20         //最大線程數
21         this.maximumPoolSize = maximumPoolSize;
22         //阻塞隊列,即今天主題
23         this.workQueue = workQueue;
24         //超時時間
25         this.keepAliveTime = unit.toNanos(keepAliveTime);
26         this.threadFactory = threadFactory;
27         //拒絕策略
28         this.handler = handler;
29     }

  這是我們線程池實例化的時候的參數,其實最大的實用性來說,就是核心線程與最大線程數的設定,這個完全靠個人經驗,並沒有一個真正意義上的公式可以適用所有的業務場景,這里博主為大家找了一篇關於設定線程數的文章:

  https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

  我們的線程池初始化好后,我們自己會調用excute方法來讓線程池運行我們的線程任務,那我們就先來看看這個方法的實現:

 1 public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         /*
 5          * 第一步:工作線程是否小於核心線程數量,如果是添加work中,worker其實也是一個線程,只不過它內部操作的是我們的上傳的任務
 6          * 第二步:如果大於核心線程數量,添加到worker隊列中,每一個不同的隊列offer的實現方法也是不一樣的,今天我們主要探討這個
 7          * 第三步:阻塞隊列被塞滿了,需要創建新的非核心線程數量worker線程去處理我們的任務,創建worker線程失敗了會觸發拒絕策略,默認拋異常
 8          */
 9         int c = ctl.get();
10         if (workerCountOf(c) < corePoolSize) {
11             if (addWorker(command, true))
12                 return;
13             c = ctl.get();
14         }
15         if (isRunning(c) && workQueue.offer(command)) {
16             int recheck = ctl.get();
17             if (! isRunning(recheck) && remove(command))
18                 reject(command);
19             else if (workerCountOf(recheck) == 0)
20                 addWorker(null, false);
21         }
22         else if (!addWorker(command, false))
23             reject(command);
24     }
25     

  我們看到當任務調用的時候,會執行addworker,那么worker是個什么東西呢?我們來看看它的構造實例:我們看一下worker類,就發現其實worker也是一個線程

 1 private final class Worker
 2         extends AbstractQueuedSynchronizer
 3         implements Runnable
 4     {
 5     
 6     ......
 7     
 8     Worker(Runnable firstTask) {
 9             setState(-1); // inhibit interrupts until runWorker
10             this.firstTask = firstTask;
11             this.thread = getThreadFactory().newThread(this);
12         }
13 
14         /** 覆蓋執行run方法
15           */
16         public void run() {
17             runWorker(this);
18         }
19     ......
20     
21     }

  這次我們來看一下addworker是怎么操作的:

 1 private boolean addWorker(Runnable firstTask, boolean core) {
 2         retry:
 3         for (;;) {
 4             int c = ctl.get();
 5             int rs = runStateOf(c);
 6 
 7             // Check if queue empty only if necessary.
 8             if (rs >= SHUTDOWN &&
 9                 ! (rs == SHUTDOWN &&
10                    firstTask == null &&
11                    ! workQueue.isEmpty()))
12                 return false;
13 
14             for (;;) {
15                 int wc = workerCountOf(c);
16                 if (wc >= CAPACITY ||
17                     //不允許創建大於最大核心線程數的任務
18                     wc >= (core ? corePoolSize : maximumPoolSize))
19                     return false;
20                 if (compareAndIncrementWorkerCount(c))
21                     break retry;
22                 c = ctl.get();  // Re-read ctl
23                 if (runStateOf(c) != rs)
24                     continue retry;
25                 // else CAS failed due to workerCount change; retry inner loop
26             }
27         }
28 
29         boolean workerStarted = false;
30         boolean workerAdded = false;
31         Worker w = null;
32         try {
33             //主要的創建worker過程是在這里
34             w = new Worker(firstTask);
35             final Thread t = w.thread;
36             if (t != null) {
37                 final ReentrantLock mainLock = this.mainLock;
38                 mainLock.lock();
39                 try {
40                     // Recheck while holding lock.
41                     // Back out on ThreadFactory failure or if
42                     // shut down before lock acquired.
43                     int rs = runStateOf(ctl.get());
44 
45                     if (rs < SHUTDOWN ||
46                         (rs == SHUTDOWN && firstTask == null)) {
47                         if (t.isAlive()) // precheck that t is startable
48                             throw new IllegalThreadStateException();
49                         workers.add(w);
50                         int s = workers.size();
51                         if (s > largestPoolSize)
52                             largestPoolSize = s;
53                         workerAdded = true;
54                     }
55                 } finally {
56                     mainLock.unlock();
57                 }
58                 if (workerAdded) {
59                     //此處調用的是worker線程的start方法,並沒有直接調用我們的 任務
60                     //上面我們看worker的run方法了,里面調用的 是runWorker,那我們看看runWorker方法就可以了
61                     t.start();
62                     workerStarted = true;
63                 }
64             }
65         } finally {
66             if (! workerStarted)
67                 addWorkerFailed(w);
68         }
69         return workerStarted;
70     }
71     

  到這里添加完畢后,我們在看看它是是如何執行我們的線程的,來看看runworker方法實現:

 1 final void runWorker(Worker w) {
 2         Thread wt = Thread.currentThread();
 3         Runnable task = w.firstTask;
 4         w.firstTask = null;
 5         w.unlock(); // allow interrupts
 6         boolean completedAbruptly = true;
 7         try {
 8             //這里體現的是線程的復用,復用的是worker線程,每處理一個線程都會getTask()從隊列中取一個任務進行處理
 9             while (task != null || (task = getTask()) != null) {
10                 w.lock();
11                 // If pool is stopping, ensure thread is interrupted;
12                 // if not, ensure thread is not interrupted.  This
13                 // requires a recheck in second case to deal with
14                 // shutdownNow race while clearing interrupt
15                 if ((runStateAtLeast(ctl.get(), STOP) ||
16                      (Thread.interrupted() &&
17                       runStateAtLeast(ctl.get(), STOP))) &&
18                     !wt.isInterrupted())
19                     wt.interrupt();
20                 try {
21                     beforeExecute(wt, task);
22                     Throwable thrown = null;
23                     try {
24                         //直接調用我們任務的run方法,我們任務雖然是繼承了runable,但是並沒有調用start方法
25                         //其實我們的線程放入線程池中,並不是讓我們的線程運行,僅僅是定義了一個方法體,
26                         //真正運行的是被線程池管理的worker線程
27                         task.run();
28                     } catch (RuntimeException x) {
29                         thrown = x; throw x;
30                     } catch (Error x) {
31                         thrown = x; throw x;
32                     } catch (Throwable x) {
33                         thrown = x; throw new Error(x);
34                     } finally {
35                         afterExecute(task, thrown);
36                     }
37                 } finally {
38                     task = null;
39                     w.completedTasks++;
40                     w.unlock();
41                 }
42             }
43             completedAbruptly = false;
44         } finally {
45             //回收線程,釋放資源
46             processWorkerExit(w, completedAbruptly);
47         }
48     }

  這個時候,大家應該就解決了一個問題就是,線程池如何體現的線程復用,就在gettask那里體現的,復用的就是worker線程,好了,這個時候不僅worker創建完成了,並且直接調用start方法,讓自己開始運行起來,執行本次添加的任務,但是細心的小伙伴會看到參數傳入的核心線程為true,那么此時也僅僅啟動了核心線程,那么超過核心線程數的就應該加入到隊列中,那么有什么隊列供我們選擇呢?

  所以我們接下來看BlockingQueue的offer、poll(如果設置超時時間)、take方法,BlockingQueue有很多實現類,我們主要看以下幾個:
  ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue、DelayQueue五種BlockingQueue;

  我們首先來說一說ArrayBlockingQueue,我們看看其構造函數

 1 public ArrayBlockingQueue(int capacity) {
 2 //必須指定隊列大小,默認非公平鎖
 3         this(capacity, false);
 4     }
 5 
 6 public ArrayBlockingQueue(int capacity, boolean fair) {
 7         if (capacity <= 0)
 8             throw new IllegalArgumentException();
 9         this.items = new Object[capacity];
10         lock = new ReentrantLock(fair);
11         notEmpty = lock.newCondition();
12         notFull =  lock.newCondition();
13     }
public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {//超過隊列大小,將不繼續存放,返回false創建worker線程
            if (count == items.length)
                return false;
            else {
                //數組后添加任務元素,使用到了循環數組的算法
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0)
                    return null;
                //等待時間,如果超過默認1000微妙,則將阻塞當前線程,等待添加任務時將其喚醒或者等待超時
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                //等待被喚醒,無超時時間
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

  ArrayBlockingQueue講解完了,再來看看LinkedBlockingQueue:三個方法對於任務的存放與取出與ArrayBlockingQueue並無太大差別,我們就不做太多的講解,簡單說一下,主要的就是節點阻塞隊列與數組阻塞隊列所用到的鎖機制不一樣,主要是因為數組是一個對象,而節點操作的則是對頭與隊尾節點,所以用到了兩個taskLock與putLock鎖,兩者在對於隊列的取與添加並不會產生沖突

public LinkedBlockingQueue() {
       //鏈表類型雖然不用指定大小隊列,但是默認時int的最大值,實際場景下會引發內存溢出問題
        this(Integer.MAX_VALUE);
    }

 public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

  PriorityBlockingQueue,總體來說,是具有優先級考慮的任務隊列,因為任務需要實現comparator接口,先看看其構造器吧;

public PriorityBlockingQueue() {
//默認11長度大小,無比較器
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

public PriorityBlockingQueue(int initialCapacity) {
//可指定長度大小
        this(initialCapacity, null);
    }

public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
//可指定比較器
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }
public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
//在這里隊列的長度不再固定,而是實現了自動擴展
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
//默認與不默認都會使用comparator接口讓數組進行比較,使優先級高的在數組最前面
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
//每次取出任務的時候都會進行優先級比較,放到數組的第一個
            while ( (result = dequeue()) == null && nanos > 0)
                nanos = notEmpty.awaitNanos(nanos);
        } finally {
            lock.unlock();
        }
        return result;
    }
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
//同理,也會進行優先級比較,雖然每次都比較但是在添加任務元素的時候已經是排好序的了
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

  SynchronousQueue是一個比較特殊的隊列,固定長度為1,並且可以說是實時進行任務運行,並且必須已經有worker任務結束在獲取其他任務的時候才會在隊列中添加任務元素,否則一直為返回null,非常適合在一個請求需要同時拉去多個服務的場景;

    public SynchronousQueue() {
        this(false);
    }

 public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

  並且是上面提到的offer、take、poll三個方法的操作都是一個transfer方法進行操作的,我們主要看一下這兩個類在這個方法上有哪些區別;首先看一下結構簡單的TransferQueue;

//存數據的時候,參數為transfer(e, true, 0)
//取數據的時候,參數為transfer(null, false, 0)
E transfer(E e, boolean timed, long nanos) {
//作者認為這個方法主要做了一下幾件事情:
//1、根據傳進來的任務參數,判斷當前是取數據還是放數據
//2、如果是存數據,判斷當前隊列是否是空隊列,如果是則返回false,線程池則會創建新的worker線程,不是空隊列則會喚醒進行獲取任務的worker線程並返回數據
//3、如果是拿數據,判斷是否是空隊列,則新創建節點並阻塞當前線程等待放數據時喚醒,不是空隊列(換一種說法就是已經有一個線程正在等待獲取任務),拋出頭結點,繼續往下走,這里有個疑問,拋出去后該節點的線程一直在等待,無法被喚醒了
QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);

            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;
                    if (t != tail)                  // inconsistent read
                        continue;
                    if (tn != null) {               // lagging tail
                        advanceTail(t, tn);
                        continue;
                    }
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);
                    if (!t.casNext(null, s))        // failed to link in
                        continue;

                    advanceTail(t, s);              // swing tail and wait
                    Object x = awaitFulfill(s, e, timed, nanos);
                    if (x == s) {                   // wait was cancelled
                        clean(t, s);
                        return null;
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;

                } else {                            // complementary-mode
                    QNode m = h.next;               // node to fulfill
                    if (t != tail || m == null || h != head)
                        continue;                   // inconsistent read

                    Object x = m.item;
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||                   // m cancelled
                        !m.casItem(x, e)) {         // lost CAS
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);
                    return (x != null) ? (E)x : e;
                }
            }
        }

   不知道大家看到這里是否有疑問,為什么當多個worker線程開始獲取任務時,已經等待的節點會被拋出去,只會給隊列留下最新的一個等待節點,其他節點根本不會再被喚醒了,其實這也是我的疑問,不知道大家有沒有注意到,希望高手們可以給一個解釋;下面再看一個TransferStack,這個是默認線程池隊列初始化中使用的節點類型,這個比較好理解,也通俗易懂一點;我們看看其源碼:

E transfer(E e, boolean timed, long nanos) {
//該實現類跟上一個有一些區別,但是總體邏輯差不多,也是分以下幾步:
//第一步:根據傳進來的任務參數判斷是請求數據,還是存入數據
//第二步:如果是存入數據,空節點時直接返回null,讓線程池創建worker線程運行任務,如果有等待節點,那么存入當前任務數據,並且再移除存入的數據節點和等待的節點,等待的節點此時會被賦值存入的任務並被喚醒
//第三步:如果是取出數據,空節點時存入等待數據節點並阻塞當前線程,如果不是空節點,已經有了等待節點,那么將會除去等待節點並喚醒,還會除去當前鋼加入的等待節點,使當前節點隊列還是保持null
 SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // empty or same-mode
                    if (timed && nanos <= 0) {      // can't wait
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } else if (casHead(h, s = snode(s, e, h, mode))) {
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // wait was cancelled
                            clean(s);
                            return null;
                        }
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } else if (!isFulfilling(h.mode)) { // try to fulfill
                    if (h.isCancelled())            // already cancelled
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                        for (;;) { // loop until matched or waiters disappear
                            SNode m = s.next;       // m is s's match
                            if (m == null) {        // all waiters are gone
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            }
                            SNode mn = m.next;
                            if (m.tryMatch(s)) {
                                casHead(s, mn);     // pop both s and m
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  // lost match
                                s.casNext(m, mn);   // help unlink
                        }
                    }
                } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }

  接下來我們再來討論一下DelayQueue,這個隊列就和PriorityQueue有些關聯,具體關聯在哪里呢?我們看看它 的源碼;

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    //內部使用的是PriorityQueue作為存儲對象,但是該DelayQueue為任務元素指定了比較器,就是時間比較器
    private final PriorityQueue<E> q = new PriorityQueue<E>();
}
//比較器是這個,定義了Comparable<Delayed>,查看時間是否到達或超過了指定時間
public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

  剩下的offer、poll、take我就不講解了,去元素的時候就是多判斷了一步,是否超過或到達指定時間,否則將會使當前線程進行等待剩余的時間,而不是自旋

       最后總結一下線程池以及使用到的隊列原理:

       線程池為何會比自己創建線程更加高效、方便,第一點就是線程池已經幫我們封裝好了並且對線程進行了管理,比如生產者消費者模式,使我們的線程池高效的利用CPU進行處理任務,也可以對我們的業務場景來看使用哪個隊列;第二點是線程池幫我們把線程進行了復用,而不是處理完一個任務就丟棄一個線程;

       隊列中ArrayBlockingQueue與LinkedBlockingQueue,處理節點存儲類型不一樣,一個是數組,一個是節點類型,還有LinkedBlockingQueue使用到了存儲鎖與取鎖,兩者操作並不沖突,而ArrayBlockingQueue則使用了一個排它鎖,取數據與存數據用的是一把鎖。

        而PriorityBlockingQueue和DelayQueue,雖然內部都使用了PriorityQueue作為存儲介質,但是PriorityBlockingQueue不會強制要求你使用哪一種比較器,而DelayQueue必須使用時間比較器更加局限也明確了任務的類型。

  最后說一下SynchronousQueue,該隊列比其他隊列特殊一點,該隊列是同步類型的隊列,就是說隊列不存儲任務數據,而是必須有正在獲取的等待節點才會讓數據暫時放入隊列中然后立馬取出,或者不會放入隊列,而是替換到等待隊列中的任務並喚醒等待隊列,從而到達任務不會被存儲在隊列中。也就是說不會緩沖任務放入隊列中,更像是實時任務取出並處理。

  好了,我們的學習也到此結束了,並且最后提示大家使用線程池的時候,最好自己定義線程池參數,而不是使用Executors使用默認參數來創建線程。就是因為寫的隊列長度過大,會導致程序崩潰


 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM