java8 線程池


java8 線程池

java 線程的創建、銷毀和線程減切換是一件比較耗費計算機資源的事。如果我們需要用多線程處理任務,並頻繁的創建、銷毀線程會造成計算機資源的無端浪費,因此出現了線程池技術。在《java 並發編程的藝術》一書中,作者總結了三條使用線程池的好處:

降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。

線程池的使用

線程池的創建

線程池的創建可以通過創建 ThreadPoolExecutor 對象或者調用 Executors 的工廠方法來創建線程池。但是在阿里巴巴的 java 開發手冊中提到:

【強制】線程池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
說明: Executors 返回的線程池對象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2) CachedThreadPool 和 ScheduledThreadPool:
允許的創建線程數量為 Integer.MAX_VALUE, 可能會創建大量的線程,從而導致 OOM。

ThreadPoolExecutor

因此先看一下怎么通過創建 ThreadPoolExecutor 對象來創建一個線程池。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

這是 ThreadPoolExecutor 的構造方法,其中的參數含義如下:

  • corePoolSize:核心線程池大小, 當新的任務到線程池后,線程池會創建新的線程(即使有空閑線程),直到核心線程池已滿。
  • maximumPoolSize:最大線程池大小,顧名思義,線程池能創建的線程的最大數目
  • keepAliveTime:程池的工作線程空閑后,保持存活的時間
  • TimeUnit: 時間單位
  • BlockingQueue:用來儲存等待執行任務的隊列
  • threadFactory:線程工廠
  • RejectedExecutionHandler: 當隊列和線程池都滿了時拒絕任務的策略

重要參數的說明:

  • corePoolSize 和 maximumPoolSize
    默認情況下線程中的線程初始時為 0, 當有新的任務到來時才會創建新線程,當線程數目到達 corePoolSize 的數量時,新的任務會被緩存到 workQueue 隊列中。如果不斷有新的任務到來,隊列也滿了的話,線程池會再新建線程直到總的線程數目達到 maximumPoolSize。如果還有新的任務到來,則要根據 handler 對新的任務進行相應拒絕處理。
  • BlockingQueue
    一個阻塞隊列,用來存儲等待執行的任務,常用的有如下幾種:
    1. ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
    2. LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按 FIFO (先進先出) 排序元素,吞吐量通常要高於 ArrayBlockingQueue。靜態工廠方法 Executors.newFixedThreadPool() 使用了這個隊列。
    3. SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於 LinkedBlockingQueue,靜態工廠方法 Executors.newCachedThreadPool 使用了這個隊列。
    4. PriorityBlockingQueue:一個具有優先級得無限阻塞隊列。
  • RejectedExecutionHandler
    當隊列和線程池都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。有下面四種 JDK 提供的策略:
    1. AbortPolicy,表示無法處理新任務時拋出異常, 默認策略
    2. CallerRunsPolicy:用調用者所在線程來運行任務。
    3. DiscardOldestPolicy: 該策略將丟棄最老的一個請求,也就是即將被執行的任務,並嘗試再次提交當前任務。
    4. DiscardPolicy:不處理,丟棄掉
      除了這些 JDK 提供的策略外,還可以自己實現 RejectedExecutionHandler 接口定義策略。

一個創建線程池的小例子:

public class CreateThreadPool {
    public static void main(String args[]) {
        //不建議的做法
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        //使用 guava 開源框架的 ThreadFactoryBuilder 給線程池的線程設置名字
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-thread-%d").build();

        ExecutorService pool = new ThreadPoolExecutor(4, 10, 0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<Runnable>(256),
                namedThreadFactory,
                new ThreadPoolExecutor.AbortPolicy());

        pool.execute(() -> System.out.println(Thread.currentThread().getName()));
        pool.execute(() -> System.out.println(Thread.currentThread().getName()));
        pool.execute(() -> System.out.println(Thread.currentThread().getName()));
        pool.shutdown();
    }
}

//輸出:
demo-thread-0
demo-thread-1
demo-thread-2

使用 Executors 的工廠方法創建線程

1. SingleThreadExecutor

創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。
此線程池保證所有任務的執行順序按照任務的提交順序執行。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
2. FixedThreadPool

創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。
線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
3. CachedThreadPool

創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,
那么就會回收部分空閑(60 秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。
此線程池不會對線程池大小做限制,線程池大小完全依賴於操作系統(或者說 JVM)能夠創建的最大線程大小, 極端情況下會因為創建過多線程而耗盡系統資源

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

這里雖然指定 maximumPool 為 Integer.MAX_VALUE,但沒什么意義,如果不能滿足任務執行需求,CachedThreadPool 還會繼續創建新的線程。

4. ScheduledThreadPool

主要用來在給定的延遲之后運行任務,或者定期執行任務。

public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
5. newWorkStealingPool

newWorkStealingPool 是 jdk1.8 才有的,會根據所需的並行層次來動態創建和關閉線程,通過使用多個隊列減少競爭,底層用的 ForkJoinPool來實現的。ForkJoinPool 的優勢在於,可以充分利用多 cpu,多核 cpu 的優勢,把一個任務拆分成多個 “小任務”,把多個“小任務” 放到多個處理器核心上並行執行;當多個 “小任務” 執行完成之后,再將這些執行結果合並起來即可。

五種線程池的使用場景

  • newSingleThreadExecutor:一個單線程的線程池,可以用於需要保證順序執行的場景,並且只有一個線程在執行。
  • newFixedThreadPool:一個固定大小的線程池,可以用於已知並發壓力的情況下,對線程數做限制。
  • newCachedThreadPool:一個可以無限擴大的線程池,比較適合處理執行時間比較小的任務。
  • newScheduledThreadPool:可以延時啟動,定時啟動的線程池,適用於需要多個后台線程執行周期任務的場景。
  • newWorkStealingPool:一個擁有多個任務隊列的線程池,可以減少連接數,創建當前可用 cpu 數量的線程來並行執行。

線程池的關閉

shutdown

shutdown 的原理是只是將線程池的狀態設置成 SHUTDOWN 狀態,然后中斷所有沒有正在執行任務的線程。

shutdownNow

shutdownNow 的原理是遍歷線程池中的工作線程,然后逐個調用線程的 interrupt 方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。shutdownNow 會首先將線程池的狀態設置成 STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表。

通常調用 shutdown 來關閉線程池,如果任務不一定要執行完,則可以調用 shutdownNow

線程池的原理

一個線程池處理的基本流程如下 (jdk7 之前):

img

  1. 首先線程池判斷基本線程池是否已滿?沒滿,創建一個工作線程來執行任務。滿了,則進入下個流程。
  2. 其次線程池判斷工作隊列是否已滿?沒滿,則將新提交的任務存儲在工作隊列里。滿了,則進入下個流程。
  3. 最后線程池判斷整個線程池是否已滿?沒滿,則創建一個新的工作線程來執行任務,滿了,則交給飽和策略來處理這個任務。

jdk7 之后的線程池處理流程更復雜、判斷條件也更多:

img

execute

結合 ThreadPoolExecutor 類的源碼和圖看一下具體執行流程。

ThreadPoolExecutor 類中,最核心的任務提交方法是 execute() 方法:

public void execute(Runnable command) {
    //判斷提交的任務是否為 null, 是則拋出異常
    if (command == null)
        throw new NullPointerException();

    /*
     * 獲取線程池控制狀態
     * ctl 是一個 AtomicInteger 變量 (騷操作)
     * jdk 8 中通過一個 int 值的前 28 位表示工作線程數量 workerCount, 剩余高位來表示 線程池狀態
     * 計算 workerCount 和 runState 時通過掩碼計算。  CAPACITY = (1 << 29) - 1
     * private static int runStateOf(int c)     { return c & ~CAPACITY; }
     * private static int workerCountOf(int c)  { return c & CAPACITY; }
     * */
    int c = ctl.get();

    //1. 當線程數小於 核心線程池容量時 將添加工作線程去執行任務

    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return; 
        c = ctl.get(); // 不成功則再次獲取線程池控制狀態
    }

    //2. (worker線程數量大於核心線程池容量時)如果線程池處於 RUNNING 狀態,將命令加入 workQueue 隊列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get(); //再次檢查防止狀態突變
        if (! isRunning(recheck) && remove(command)) 
            //2.1 如果狀態改變,線程池沒有 RUNNING 則將命令移出隊列,並拒絕執行
            reject(command); 
        else if (workerCountOf(recheck) == 0) 
            //2.2 狀態沒有改變,線程池 RUNNING,但 worker線程數量為 0, 則添加非core的worker線程
             addWorker(null, false);
    }

    //3. 如果線程池沒有 RUNNING 並嘗試添加非core的 worker 線程失敗,那就拒絕執行
    else if (!addWorker(command, false)) 
        reject(command);
}
  1. 如果運行的線程小於 corePoolSize,則嘗試使用用戶定義的 Runnalbe 對象創建一個新的線程。調用 addWorker() 函數會原子性的檢查runStateworkCount,通過返回 false 來防止在不應該添加線程時添加了線程
  2. 如果一個任務能夠成功入隊列,在添加一個線程時仍需要進行雙重檢查(可能因為在前一次檢查后該線程死亡了),或者當進入到此方法時,線程池已經 shutdown 了,所以需要再次檢查狀態。若線程此時的狀態不是 RUNNING,則需要回滾入隊列操作;或者當線程池沒有工作線程時,需要創建一個新的工作線程。
  3. 如果無法入隊列,那么需要增加一個新工作線程,如果此操作失敗,那么就意味着線程池已經 SHUTDOWN 或者已經飽和了,所以拒絕任務

上面提到了線程池的狀態,那就來看一下:

  • RUNNING = -1 << COUNT_BITS; // 運行狀態, 也是線程池的初始狀態
  • SHUTDOWN = 0 << COUNT_BITS; // 不再接收新的任務,但是會處理隊列中的任務
  • STOP = 1 << COUNT_BITS; // 不再接收新的任務,也不會處理隊列中的任務並且會中斷正在執行的任務
  • TIDYING = 2 << COUNT_BITS; // 所有線程已經終止,並且工作線程數 workerCount 等於 0。在進入此狀態會會調用 terminated() 方法
  • TERMINATED = 3 << COUNT_BITS; // 終止狀態, 在 terminated() 方法返回后,由 TIDYING 狀態進入此狀態。

狀態轉換圖:

img

addWorker()

此方法用來創建新的線程添加到線程池

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        /*
         * 檢查線程池狀況, 確保此時可以添加新的線程,
         * 如果是runing,那么跳過if。
         * 如果rs>=SHUTDOWN,同時不等於SHUTDOWN,即為SHUTDOWN以上的狀態,那么不接受新線程。
         * 如果rs>=SHUTDOWN,同時等於SHUTDOWN,同時first != null,那么拒絕新線程,
         * 如果為Empty,那么隊列已空,不需要增加消耗線程,返回 false。
         * */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            /* 
             * 判斷線程池是否已滿,如果線程數大於等於最大容量 CAPACITY 直接返回false
             * core 是一個boolean 參數,表明調用者想把此線程添加到哪個線程池
             * 根據 core 的值判斷要添加的線程池是否已滿
             **/
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //CAS 操作增加工作線程數
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            //CAS 操作失敗, 再次檢查狀態重來一次
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //創建新的線程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //獲取到鎖
            mainLock.lock();
            try {
                // 再次檢查狀態,因為狀態可能在獲取鎖之前改變
                int rs = runStateOf(ctl.get());
                //確保當前線程池還接收新的線程
                //結合上面的線程狀態知道:當狀態值大於等於 SHUTDOWN 時 線程池就不再接收新的線程了
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //線程添加成功后就可以啟動線程准備執行了
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

從上面兩段源碼可以看到, 在添加新的線程進入線程池時,各種操作非常的嚴謹細致,往往需要多次檢查狀態,確保線程池的正確運行。

worker 工作線程

線程池創建線程時,會將線程封裝成工作線程 Worker,Worker 在執行完任務后,還會無限循環獲取工作隊列里的任務來執行。我們可以從 Worker 的 runWorker 方法里看到:

final void runWorker(Worker w) {
    // 獲取當前線程
    Thread wt = Thread.currentThread();
    // 獲取w的firstTask
    Runnable task = w.firstTask;
    // 設置w的firstTask為null
    w.firstTask = null;
    // 釋放鎖(設置state為0,允許中斷)
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) { // 任務不為null或者阻塞隊列還存在任務
            // 獲取鎖
            w.lock();
            /*
             * 這里的檢查主要是確保線程池此時還能接收新的任務去執行, 如果不在接收新的任務
             * 則應該中斷當前線程
             **/
            if ((runStateAtLeast(ctl.get(), STOP) ||  
                 (Thread.interrupted() &&  
                  runStateAtLeast(ctl.get(), STOP))) &&   
                  !wt.isInterrupted())    
                wt.interrupt(); 
            try {
                // 在執行之前調用鈎子函數
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 運行給定的任務
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 執行完后調用鈎子函數
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 增加給worker完成的任務數量
                w.completedTasks++;
                // 釋放鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 處理完成后,調用鈎子函數
        processWorkerExit(w, completedAbruptly);
    }
}

參考

https://www.cnblogs.com/javanoob/p/threadpool.html
http://ifeve.com/java-threadpool/
https://mp.weixin.qq.com/s/5dexEENTqJWXN_17c6Lz6A


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM