Java線程池ThreadPoolExecutor中execute()方法原理


ThreadPoolExecutor中execute()方法原理

序言

線程池的相關參數,創建,執行,以及運行原理。

涉及問題

需求:涉及大數據批量數據對比處理
方案 :定時任務,中根據數據來源創建線程池,加入隊列,批量處理大數據量
涉及思考問題: ThreadPoolExecutorexecute()方法原理

execute()執行原理

  1. 如果當前運行的線程,少於corePoolSize,則創建一個新的線程來執行任務。
  2. 如果運行的線程等於或多於 corePoolSize,將任務加入 BlockingQueue
  3. 如果 BlockingQueue 內的任務超過上限,則創建新的線程來處理任務。
  4. 如果創建的線程數是單錢運行的線程超出 maximumPoolSize,任務將被拒絕策略拒絕。

那么問題來了,如果當前沒有線程數即為0,阻塞隊列也是無界的,按照我們上面所說的四條,那么沒有線程被執行,但是會無限向隊列中添加,
測試代碼

public class threadTest {
    private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger();
        while (true) {
            executor.execute(() -> {
                System.out.println(atomicInteger.getAndAdd(1));
            });
        }
    }
}

結果里面的System.out.println(atomicInteger.getAndAdd(1));語句執行了,與上面的描述矛盾了。到底發生了什么?線程池創建線程的邏輯是什么?我們還是從源碼來看看到底線程池的邏輯是什么?

線程池

ctl

要了解線程池,首先要了解線程池里面的狀態控制變量ctl

  • 線程池的ctl是一個原子的AtomicInteger
  • 這個ctl包含兩個參數:
    • runState 線程的狀態
    • workerCount 激活的線程數
  • 它的低29位用於存放當前的線程數, 因此一個線程池在理論上最大的線程數是 536870911; 高 3 位是用於表示當前線程池的狀態, 其中高三位的值和狀態對應如下:
    • 111: RUNNING
    • 000: SHUTDOWN
    • 001: STOP
    • 010: TIDYING
    • 011: TERMINATED
      為了能夠使用ctl,線程池提供了三個方法:
    // Packing and unpacking ctl
    //獲取線程池的狀態
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //獲取線程池中工作線程數量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //根據線程池的狀態和工作線程數得到ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

execute

外界通過 execute 這個方法來向線程池提交任務。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
     
        int c = ctl.get();
        //如果工作線程數小於核心線程數,
        if (workerCountOf(c) < corePoolSize) {
        	//執行addWorker,會創建一個核心線程,如果創建失敗,重新獲取ctl
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果工作線程數大於等於核心線程數,線程池的狀態是RUNNING,並且可以添加進隊列
        //(RUNNING狀態下)如果添加失敗,說明是隊列已經滿了,接着就去創建新的線程,如果大於最大線程數,則執行拒絕策略
        //如果線程池不是RUNNING狀態,則執行拒絕策略(當然還會調addWorker進行判斷一次)
        if (isRunning(c) && workQueue.offer(command)) {
        	//再次獲取ctl,進行雙重檢索(也就是對線程池的狀態再次檢查一遍)
            int recheck = ctl.get();
            //如果線程池是不是處於RUNNING的狀態,那么就會將任務從隊列中移除,
            //如果移除失敗,則會判斷工作線程是否為0 ,如果過為0 就創建一個非核心線程
            //如果移除成功,就執行拒絕策略,因為線程池已經不可用了;
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
	        //線程池掛了或者大於最大線程數
            reject(command);
    }

通過上面的代碼我們可以得出幾個結論:
①** 當工作線程數小於核心線程數時,會創建核心線程數;**
②如果工作線程數大於等於核心線程數時,會嘗試將任務添加進隊列

  • 如果成功,會對線程池的狀態進行二次驗證(因為可能存在剛好線程池的狀態發生了改變的情況),只要是RUNNING的狀態,就一定要保證有工作線程還在。
    ③ 二次驗證時,如果線程池不是處於RUNNING的狀態,那么就會將任務從隊列中移除

  • 如果移除失敗,則會判斷工作線程是否為0 ,如果過為0 就創建一個非核心線程;

  • 如果移除成功,就執行拒絕策略,因為線程池已經不可用了;
    ④ 二次驗證時,如果線程池處於RUNNING的狀態,會判斷工作線程是否為0 ,如果過為0 就創建一個非核心線程;

  • 如果失敗,說明是隊列已經滿了,接着就去創建新的線程,如果大於最大線程數,則執行拒絕策略

疑問:
代碼中addWorker方法,總共出現3次,第一次是創建核心線程,其他2次都是非核心線程,為什么呢?

第一次毋庸置疑;關鍵是第二次,我一開始很迷惑,第二次是在工作線程為0時調用的,為什么不傳true是判斷核心線程;我們現在假設此時是創建核心線程,即false改為true;那么addWorker方法中wc >= (core ? corePoolSize : maximumPoolSize)這個地方會去判斷當前工作線程是否大於核心線程,在高並發的情況下,會存在其他線程將工作線程的數量創建的大於核心線程數,導致返回false,並且不會創建新線程,雖然有工作線程的存在,但是會導致原本可以及時處理的任務,要去排隊執行。
第三次顯而易見必須是false,因為經過前面幾次后,剩下的結果,要么是線程池掛了,要是是隊列滿了,所以一定是創建非核心線程;

addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //判斷線程池的是否可以接收新任務
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
            	//獲取工作線程的數量
                int wc = workerCountOf(c);
                //判斷工作線程的數量是否大於等於線程池的上限或者核心或者最大線程數
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //使用cas增加工作線程數
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //如果添加失敗,並且線程池狀態發生了改變,重來一遍
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
		//上面的邏輯是考慮是否能夠添加線程,如果可以使用cas來增加工作線程數量
		//下面正式啟動線程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        	//新建worker
            w = new Worker(firstTask);
            // 獲取當前線程
            final Thread t = w.thread;
            if (t != null) {
            	//獲取重入鎖
                final ReentrantLock mainLock = this.mainLock;
                //鎖住
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
					// rs < SHUTDOWN  -- 狀態即為:RUNNING
					//rs == SHUTDOWN && firstTask == null 
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //如果線程已經啟動,拋出異常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //workers是一個HashSet,必須在鎖住的情況下,操作
                        workers.add(w);
                        int s = workers.size();
                        //設置largestPoolSize ,標記workerAdded 
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //如果添加成功,啟動線程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
        	//啟動線程失敗,回滾
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

該方法的思路:

  • 考慮是否能夠添加線程,如果可以使用CAS來增加工作線程數量
  • 正式啟動線程

Worker 本身也是一個任務因為其實現了Runnable,並且其繼承了AbstractQueuedSynchronizer,所以也具備鎖的效果。而我們提交的任務,是在Worker方法中的run方法中調用的。

為什么啟動Worker屬性中的Thread,就能運行Worker
其構造函數是這樣的:

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

可以看出這句this.thread = getThreadFactory().newThread(this);把當前的Worker對象作為任務傳給了新建的進程。這樣啟動進程時,它也就啟動了。

源碼對比

jdk8

在jdk8中addWorker()方法是:

...省略...
for (;;) {
        //獲取工作線程的數量
        int wc = workerCountOf(c);
       //判斷工作線程的數量是否大於等於線程池的上限或者核心或者最大線程數
       if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
...省略...

jdk11

但是到了jdk11中,其優化了下,變成了如下:

for (;;) {
          if (workerCountOf(c)
              >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))

本質上含義是一樣的,都是判斷工作線程的數量是否大於等於線程池的上限或者核心或者最大線程數

為什么可以改寫成這樣(core ? corePoolSize : maximumPoolSize) & COUNT_MASK)呢?

COUNT_MASK就是jdk8中的CAPACITY。

這是因為COUNT_MASK表示的是000 11111 11111111 11111111 1111111表示是線程池最大能表示的數字,即線程池的上限;可以看到其高位三個數都是0,因為高位表示的是runstate,即線程池運行狀態;都為0,也就是不需要這三個數;那么這個COUNT_MASK和任何數字做&運算;都會等於該數字本身,並且做&運算,保證了這個數字不會超過COUNT_MASK本身,即線程池的上限;

總結

  • 當工作線程小於核心線程時,會創建核心線程。
  • 如果線程池中的線程數量大於等於corePoolSize,但隊列workQueue未滿,則將新添加的任務放到workQueue中,按照FIFO的原則依次等待執行(線程池中有線程空閑出來后依次將隊列中的任務交付給空閑的線程執行);
  • 當線程池處於非RUNNING狀態時,會嘗試將剛剛加入到隊列的任務移除掉,如果移除失敗,並且工作線程為0情況下,就會嘗試創建一個非核心線程,來消費隊列。
  • 如果線程池中的線程數量大於等於corePoolSize,且隊列workQueue已滿,但線程池中的線程數量小於maximumPoolSize,則會創建新的線程來處理被添加的任務;
  • 如果線程池中的線程數量等於了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理

當有新的任務要處理時,先看線程池中的線程數量是否大於corePoolSize,再看緩沖隊列workQueue是否滿,最后看線程池中的線程數量是否大於maximumPoolSize
另外,當線程池中的線程數量大於corePoolSize時,如果里面有線程的空閑時間超過了keepAliveTime,就將其移除線程池

原文地址:[https://blog.csdn.net/u013066244/article/details/87898187]


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM