線程池原理與實踐


JUC的線程池架構

1.Executor

Executor是Java異步任務的執行者接口,目標是執行目標任務。Executor作為執行者角色,目的是提供一種將“任務提交者”與“任務執行者”分離的機制。它只有一個函數式方法:

public interface Executor {
    void execute(Runnable command);
}

2.ExecutorService

ExecutorService繼承於Executor。它對外提供異步任務的接收服務。ExecutorService提供了“接受異步任務並轉交給執行者”的方法,比如submit、invoke方法等。具體如下:

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

3.AbstractExecutorService

AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。AbstractExecutorService存在的目的是為ExecutorService中的接口提供默認實現。(模板模式)

4.ThreadPoolExecutor

大名鼎鼎的線程池實現類,繼承於AbstractExecutorService。它是核心實現類,它可以預先提供指定數量的可重用線程,可以對線程進行管理和監控。

5.ScheduledExecutorService

她繼承於ExecutorService。是一個完成延時和周期性任務的接口。

6.Executors

是一個靜態工廠類,內置的靜態工廠方法可以理解為快捷創建線程池的方法。

image

Executors的4種快捷創建線程池的方法

newSingleThreadExecutor 創建只有一個線程的線程池

newFixedThreadPool 創建固定大小的線程池

newCachedThreadPool 創建一個不限制線程數量的線程池,任何提交的任務都立即執行,空閑線程會及時回收

newScheduledThreadPool 創建一個可定期或延時執行任務的線程池

  • newSingleThreadExecutor
public static void main(String[] args) {
       final AtomicInteger integer = new AtomicInteger(0);

       ExecutorService pool = Executors.newSingleThreadExecutor();

       for (int i = 0; i < 5; i++) {
           pool.execute(() -> {
               System.out.println(Thread.currentThread() + " :doing" + "-" + integer.incrementAndGet());
               try {
                   Thread.sleep(500);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           });
       }

       pool.shutdown();
}

Thread[pool-1-thread-1,5,main] :doing-1
Thread[pool-1-thread-1,5,main] :doing-2
Thread[pool-1-thread-1,5,main] :doing-3
Thread[pool-1-thread-1,5,main] :doing-4
Thread[pool-1-thread-1,5,main] :doing-5

場景:任務按照提交順序,一個任務一個任務逐個執行。

以上代碼最后調用shutdown來關閉線程池。執行shutdown方法后,線程池狀態變為shutdown,線程池將拒絕新任務,不能再往線程池中添加新任務。此時,線程池不會立刻退出,直到線程池中的任務處理完成后才會退出。還有一個shutdownNow方法,執行這個后,線程狀態變為stop,試圖停止所有正在執行的線程,並且不再處理阻塞隊列中等待的任務,會返回那些未執行的任務。

  • newFixedThreadPool
ExecutorService pool = Executors.newFixedThreadPool(3);

Thread[pool-1-thread-1,5,main] :doing-1
Thread[pool-1-thread-3,5,main] :doing-2
Thread[pool-1-thread-2,5,main] :doing-3
Thread[pool-1-thread-3,5,main] :doing-4
Thread[pool-1-thread-1,5,main] :doing-5

適用場景:需要任務長期執行的場景。“固定數量的線程池”能穩定的保證一個數,避免頻繁 回收和創建線程,適用於CPU密集型的任務,在CPU被線程長期占用的情況下,能確保少分配線程。

弊端:內部使用無界隊列存放任務,當有大量任務,隊列無限增大,服務器資源迅速耗盡。

newFixedThreadPool工廠方法返回一個ThreadPoolExecutor實例,該線程池實例的corePoolSize數量為參數nThread,其maximumPoolSize數量也為參數nThread,其workQueue屬性的值為LinkedBlockingQueue ()無界阻塞隊列。使用Executors創建“固定數量的線程池”的潛在問題主要存在於其workQueue上,其值為LinkedBlockingQueue(無界阻塞隊列)。如果任務提交速度持續大於任務處理速度,就會造成隊列中大量的任務等待。如果隊列很大,很有可能導致JVM出現OOM(Out Of Memory)異常,即內存資源耗盡。

  • newCachedThreadPool

線程池內的某些線程無事可干成為空閑線程,可以靈活回收這些空閑線程。

ExecutorService pool = Executors.newCachedThreadPool();

Thread[pool-1-thread-5,5,main] :doing-5
Thread[pool-1-thread-1,5,main] :doing-1
Thread[pool-1-thread-2,5,main] :doing-2
Thread[pool-1-thread-3,5,main] :doing-3
Thread[pool-1-thread-4,5,main] :doing-4

特點:在執行任務時,如果池內所有線程忙,則會添加新線程來處理。不會限制線程的大小,完全依賴於操作系統能夠創建的最大線程大小。如果存量線程超過了處理任務數量,就會回收線程。、

適用場景:快速處理突發性強、耗時短的任務場景,如Netty的NIO處理場景、REST API接口的瞬時削峰場景。

弊端:沒有最大線程數量限制,如果大量的異步任務提交,服務器資源可能耗盡。

  • newScheduledThreadPool
public static void main(String[] args) {
        final AtomicInteger integer = new AtomicInteger(0);

        ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);

        for (int i = 0; i < 5; i++) {
            pool.scheduleAtFixedRate(
                    () -> {
                        System.out.println(Thread.currentThread() + " :doing" + "-" + integer.incrementAndGet());

                    }, 0, 500, TimeUnit.MILLISECONDS);
            // 0表示首次執行任務的執行時間,500表示每次執行任務的間隔時間
        }
//        pool.shutdown();
}

因為可以周期性執行任務,所以不shutdown。

適用場景:周期性執行任務的場景。

線程池的標准創建方式

使用ThreadPoolExecutor構造方法創建,一個比較重要呃構造器如下:

public ThreadPoolExecutor(int corePoolSize,核心線程數
                              int maximumPoolSize, 最大線程數
                              long keepAliveTime, TimeUnit unit, 空閑時間
                              BlockingQueue<Runnable> workQueue, 阻塞隊列
                              ThreadFactory threadFactory, 線程工廠(線程產生方式)
                              RejectedExecutionHandler handler 拒絕策略) {
    ...
}

1.核心和最大線程數量

接收新任務時,並且當前工作線程池數少於核心線程數量,即使有工作線程是空閑的,它也會創建新線程處理任務,直到達到核心線程數。

2.BlockingQueue

阻塞隊列用於暫時接收任務。

3.KeepAliveTime

設置線程最大空閑時長,如果超過這個時間,非核心線程會被回收。當然,也可以調用allowCoreThreadTimeOut方法將超時策略應用到核心線程。

線程池的任務調度流程

  1. 工作線程數量小於核心線程數量,執行新任務時會優先創建線程,而不是獲取空閑線程。
  2. 任務數量大於核心線程數量,新任務將被加入阻塞隊列中。執行任務時,也是先從阻塞隊列中獲取任務。
  3. 在核心線程用完,阻塞隊列已滿的情況下,會創建非核心線程處理新任務。
  4. 在如果線程池總數超過maximumPoolSize,線程池會拒絕接收任務,為新任務執行拒絕策略。

image

ThreadFactory(線程工廠)

創建線程方式

阻塞隊列

阻塞隊列與普通度列相比:阻塞隊列為空時,會阻塞當前線程的元素獲取操作。當隊列中有元素,被阻塞的線程會被自動喚醒。

BlockingQueue是JUC包的一個超級接口,比較常用的實現類有:

(1)ArrayBlockingQueue:數組隊列

(2)LinkedBlockingQueue:鏈表隊列

(3)PriorityBlockingQueue:優先級隊列

(4)DelayQueue:延遲隊列

(5)SynchronousQueue:同步隊列

調度器的鈎子方法

ThreadPoolExecutor為每個任務執行前后都提供了鈎子方法。

// 任務執行之前的鈎子方法(前鈎子)
protected void beforeExecute(Thread t, Runnable r) { }
// 之后(后鈎子)
protected void afterExecute(Runnable r, Throwable t) { }
// 終止(停止鈎子)
protected void terminated() { }

beforeExecute:可用於重新初始化ThreadLocal線程本地變量實例、更新日志記錄、計時統計等。

afterExecute:更新日志記錄、計時統計等。

terminated:Executor終止時調用。

演示一下前鈎子。

public class TestMain {

    public static void main(String[] args) {
        final ThreadPoolExecutor pool = new ThreadPoolExecutor(
                2,
                4,
                60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(2)) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("前鈎子嗷 ~ ~ ~ ");
            }
        };

        for (int i = 0; i < 5; i++) {
            pool.execute(() -> {
                System.out.println("你誰啊");
            });
        }
    }
}

線程池拒絕策略

任務被拒絕有兩種情況:

  1. 線程池已經關閉。
  2. 工作隊列已滿且最大線程數已滿。

拒絕策略有以下實現:

  • AbortPolicy:拒絕策略。拋異常。
  • DiscardPolicy:拋棄策略。丟棄新來的任務。
  • DiscardOldestPolicy:拋棄最老任務策略。因為隊列是隊尾進對頭出,所以每次都是移除隊頭元素后再入隊。
  • CallerRunsPolicy:調用者執行策略。提交任務線程自己執行任務,不使用線程池中的線程。
  • 自定義策略。實現RejectExecutionHandler接口的rejectedExecution方法。

線程池中執行任務的Worker為什么要繼承AQS?而不是用ReentrantLock

因為R是可重入鎖,Woker通過AQS實現的是不可重入鎖。另外,Worker在執行任務時,會lock住執行邏輯,原因是怕其他線程執行shutdown中斷他的執行。

Re

《Java高並發編程》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM