Java並發之線程池詳解


帶着問題閱讀

1、什么是池化,池化能帶來什么好處

2、如何設計一個資源池

3、Java的線程池如何使用,Java提供了哪些內置線程池

4、線程池使用有哪些注意事項

池化技術

池化思想介紹

池化思想是將重量級資源預先准備好,在使用時可重復使用這些預先准備好的資源。

池化思想的核心概念有:

  • 資源創建/銷毀開銷大
  • 提前創建,集中管理
  • 重復利用,資源可回收

例如大街上的共享單車,用戶掃碼開鎖,使用完后歸還到停放點,下一個用戶可以繼續使用,共享單車由廠商統一管理,為用戶節省了購買單車的開銷。

池化技術的應用

常見的池化技術應用有:資源池、連接池、線程池等。

  • 資源池

    在各種電商平台大促活動時,平台需要支撐平時幾十倍的流量,因此各大平台在需要提前准備大量服務器進行擴容,在活動完畢以后,擴容的服務器資源又白白浪費。將計算資源池化,在業務高峰前進行分配,高峰結束后提供給其他業務或用戶使用,即可節省大量消耗,資源池化也是雲計算的核心技術之一。

  • 連接池

    網絡連接的建立和釋放也是一個開銷較大的過程,提前在服務器之間建立好連接,在需要使用的時候從連接池中獲取,使用完畢后歸還連接池,以供其他請求使用,以此可節省掉大量的網絡連接時間,如數據庫連接池、HttpClient連接池。

  • 線程池

    線程的建立銷毀都涉及到內核態切換,提前創建若干數量的線程提供給客戶端復用,可節約大量的CPU消耗以便處理業務邏輯。線程池也是接下來重點要講的內容。

如何設計一個線程池

設計一個線程池,至少需要提供的核心能力有:

  • 線程池容器:用於容納初始化時預先創建的線程。
  • 線程狀態管理:管理池內線程的生命周期,記錄每個線程當前的可服務狀態。
  • 線程請求管理:對調用端提供獲取和歸還線程的接口。
  • 線程耗盡策略:提供策略以處理線程耗盡問題,如拒絕服務、擴容線程池、排隊等待等。

基於以上角度,我們來分析Java是如何設計線程池功能的。

Java線程池解析

ThreadPoolExecutor使用介紹

大象裝冰箱總共分幾步

// 1.創建線程池
ThreadPoolExecutor threadPool = 
    new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
// 2.提交任務
threadPool.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("task running");
    }
}});
// 3.關閉線程池
threadPool.shutDown();

Java通過ThreadPoolExecutor提供線程池的實現,如示例代碼,初始化一個容量為1的線程池、然后提交任務、最后關閉線程池。

ThreadPoolExecutor的核心方法主要有

  • 構造函數:ThreadPoolExecutor提供了多個構造函數,以下對基礎構造函數進行說明。

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    
    • corePoolSize:線程池的核心線程數。池內線程數小於corePoolSize時,線程池會創建新線程執行任務。

    • maximumPoolSize:線程池的最大線程數。池內線程數大於corePoolSizeworkQueue任務等待隊列已滿時,線程池會創建新線程執行隊列中的任務,直到線程數達到maximumPoolSize為止。

    • keepAliveTime:非核心線程的存活時長。池內超過corePoolSize數量的線程可存活的時長。

    • unit:非核心線程存活時長單位。與keepAliveTime取值配合,如示例代碼表示1分鍾。

    • workQueue:任務提交隊列。當無空閑核心線程時,存儲待執行任務。

      類型 作用
      ArrayBlockingQueue 數組結構的有界阻塞隊列
      LinkedBlockingQueue 鏈表結構的阻塞隊列,可設定是否有界
      SynchronousQueue 不存儲元素的阻塞隊列,直接將任務提交給線程池執行
      PriorityBlockingQueue 支持優先級的無界阻塞隊列
      DelayQueue 支持延時執行的無界阻塞隊列
    • threadFactory:線程工廠。用於創建線程對象。

    • handler:拒絕策略。線程池線程數量達到maximumPoolSizeworkQueue已滿時的處理策略。

      類型 作用
      AbortPolicy 拒絕並拋出異常。默認
      CallerRunsPolicy 由提交任務的線程執行任務
      DiscardOldestPolicy 拋棄隊列頭部任務
      DiscardPolicy 拋棄該任務
  • 執行函數:executesubmit,主要分別用於執行RunnableCallable

    // 提交Runnable
    void execute(Runnable command);
    
    // 提交Callable並返回Future
    <T> Future<T> submit(Callable<T> task);
    
    // 提交Runnable,執行結束后Future.get會返回result
    <T> Future<T> submit(Runnable task, T result);
    
    // 提交Runnable,執行結束后Future.get會返回null
    Future<?> submit(Runnable task);
    
  • 停止函數:shutDownshutDownNow

    // 不再接收新任務,等待剩余任務執行完畢后停止線程池
    void shutdown();
    
    // 不再接收新任務,並嘗試中斷執行中的任務,返回還在等待隊列中的任務列表
    List<Runnable> shutdownNow();
    

內置線程池使用

To be useful across a wide range of contexts, this class provieds many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient {@link Executors} factory methods {@link Executors#newCachedThreadPool} (unbounded thread poll, with automatic thread reclamation), {@link Executors#newFixedThreadPool} (fixed size thread pool) and {@link Executors#newSingleThreadExecutor}(single background thread), that preconfigure settings for the most common usage scenarios.

由於ThreadPoolExecutor參數復雜,Java提供了三種內置線程池newCachedThreadPoolnewFixedThreadPoolnewSingleThreadExecutor應對大多數場景。

  • Executors.newCachedThreadPool()無界線程池,核心線程池大小為0,最大為Integer.MAX_VALUE,因此嚴格來講並不算無界。采用SynchronousQueueworkQueue,意味着任務不會被阻塞保存在隊列,而是直接遞交到線程池,如線程池無可用線程,則創建新線程執行。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, 
                                      new SynchronousQueue<Runnable>());
    }
    
  • Executors.newFixedThreadPool(int nThreads)固定大小線程池,其中coreSizemaxSize相等,且過期時間為0,表示經過一定數量任務提交后,線程池將始終維持在nThreads數量大小,不會新增也不會回收線程。

    public static ExecutorService new FixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads nThreads, 0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
  • Executors.newSingleThreadExecutor()單線程池,參數與fixedThreadPool類似,只是將數量限制在1,單線程池主要避免重復創建銷毀線程對象,也可用於串行化執行任務。不同與其他線程池,單線程池采用FinallizableDelegatedExecutorServiceThreadPoolExecutor對象進行包裝,感興趣的同學可以看下源碼,其方法實現僅僅是對被包裝對象方法的直接調用。包裝對象主要用於避免用戶將線程池強制轉換為ThreadPoolExecutor來修改線程池大小

    public static ExecutorService newSingleThreadExecutor() {
        return new FinallizableDelegatedExecutorService(
          (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, 
                                    new LinkedBlockQueue<Runnable>()))
        );
    }
    

ThreadPoolExecutor解析

整體設計

線程池繼承關系

ThreadPoolExecutor基於ExecutorService接口實現提交任務,未采取常規資源池獲取/歸還資源的形式,整個線程池和線程的生命周期都由ThreadPoolExecutor進行管理,線程對象不對外暴露;ThreadPoolExecutor的任務管理機制類似於生產者消費者模型,其內部維護一個任務隊列和消費者,一般情況下,任務被提交到隊列中,消費線程從隊列中拉取任務並將其執行。

線程池生命周期

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static int runStateOf(int c)     { return c & ~CAPACITY; } //計算當前運行狀態
private static int workerCountOf(int c)  { return c & CAPACITY; }  //計算當前線程數量
private static int ctlOf(int rs, int wc) { return rs | wc; }   //通過狀態和線程數生成ctl

TreadPoolExecutor通過ctl維護線程池的狀態和線程數量,其中高3位存儲運行狀態,低29位存儲線程數量。

位運算操作推薦參考第三篇文章。

線程池設定了RUNNINGSHUTDOWNSTOPTIDYINGTERMINATED五種狀態,其轉移圖如下:

在這5種狀態中,只有RUNNING時線程池可接收新任務,其余4種狀態在調用shutDownshutDownNow后觸發轉換,且在這4種狀態時,線程池均不再接收新任務。

任務管理解析

// 用於存放提交任務的隊列
private final BlockingQueue<Runnable> workQueue;

// 用於保存池內的工作線程,Java將Thread包裝成Worker存儲
private final HashSet<Worder> workers = new HashSet<Worker>();

ThreadPoolExecutor主要通過workQueueworkers兩個字段用於管理和執行任務。

線程池任務執行流程如圖,結合ThreadPoolExecutor.execute源碼,對任務執行流程進行說明:

  • 當任務提交到線程池時,如果當前線程數量小於核心線程數,則會將為該任務直接創建一個worker並將任務交由worker執行。

    if (workerCountOf(c) < corePoolSize) {
        // 創建新worker執行任務,true表示核心線程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
  • 當已經達到核心線程數后,任務會提交到隊列保存;

    // 放入workQueue隊列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 這里采用double check再次檢測線程池狀態
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 避免加入隊列后,所有worker都已被回收無可用線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    
  • 如果隊列已滿,則依據最大線程數量創建新worker執行。如果新增worker失敗,則依據設定策略拒絕任務。

    // 接上,放入隊列失敗
    // 添加新worker執行任務,false表示非核心線程
    else if (!addWorker(command, false))
        // 如添加失敗,執行拒絕策略
        reject(command);
    

woker對象

ThreadPoolExecutor沒有直接使用Thread記錄線程,而是定義了worker用於包裝線程對象。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    ...
    final Thread thread;
    
    Runnable firstTask;
    
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    
    // worker對象被創建后就會執行
    public void run() {
        runWorker(this);
    }
}

worker對象通過addWorker方法創建,一般會為其指定一個初始任務firstTask,當worker執行完畢以后,worker會從阻塞隊列中讀取任務,如果沒有任務,則該worker會陷入阻塞狀態給出worker的核心邏輯代碼:

private boolean addWorker(Runnable firstTask, boolean core) {
    ...
    // 指定firstTask,可能為null
    w = new Worker(firstTask);
    ...
    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
        if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();
        workers.add(w);
        workerAdded = true;
    }
    ...
    // 執行新添加的worker
    if (workerAdded) {
        t.start();
        workerStarted = true;
    }
}


final void runWorker(Worker w) {
    // 等待workQueue的任務
    while (task != null || (task = getTask()) != null) {
    	...
    }
}

private Runnable getTask() {
    ...
    for (;;) {
        ...
        // 如果是普通工作線程,則根據線程存活時間讀取阻塞隊列
        // 如果是核心工作線程,則直接陷入阻塞狀態,等待workQueue獲取任務
        Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
        ...
    }
}

如下圖,任務提交后觸發addWorker創建worker對象,該對象執行任務完畢后,則循環獲取隊列中任務等待執行。

Java線程池實踐建議

不建議使用Exectuors

線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。《阿里巴巴開發手冊》

雖然Java推薦開發者直接使用Executors提供的線程池,但實際開發中通常不使用。主要考慮問題有:

  • 潛在的OOM問題

    CachedThreadPool將最大數量設置為Integer.MAX_VALUE,如果一直提交任務,可能造成Thread對象過多引起OOMFixedThreadPoolSingleThreadPoo的隊列LinkedBlockingQueue無容量限制,阻塞任務過多也可能造成OOM

  • 線程問題定位不便

    由於未指定ThreadFactory,線程名稱默認為pool-poolNumber-thread-thredNumber,線程出現問題后不便定位具體線程池。

  • 線程池分散

    通常在完善的項目中,由於線程是重量資源,因此線程池由統一模塊管理,重復創建線程池容易造成資源分散,難以管理。

線程池大小設置

通常按照IO繁忙型和CPU繁忙型任務分別采用以下兩個普遍公式。

\[N_{thread} = N_{cpu} * 2 \\ N_{thread} = N_{cpu} + 1 \]

在理論場景中,如一個任務IO耗時40ms,CPU耗時10ms,那么在IO處理期間,CPU是空閑的,此時還可以處理4個任務(40/10),因此理論上可以按照IO和CPU的時間消耗比設定線程池大小。

\[Ratio = (Time_{io} + Time_{cpu}) / Time_{cpu} \\ N_{thread} = (Ratio + 1) * N_{cpu} \]

《JAVA並發編程實踐》中還考慮數量乘以目標CPU的利用率

在實際場景中,我們通常無法准確測算IO和CPU的耗時占比,並且隨着流量變化,任務的耗時占比也不能固定。因此可根據業務需求,開設線程池運維接口,根據線上指標動態調整線程池參數。

推薦參考第二篇美團線程池應用

線程池監控

ThreadPoolExecutor提供以下方法監控線程池:

  • getTaskCount() 返回被調度過的任務數量

  • getCompletedTaskCount() 返回完成的任務數量

  • getPoolSize() 返回當前線程池線程數量

  • getActiveCount() 返回活躍線程數量

  • getQueue()獲取隊列,一般用於監控阻塞任務數量和隊列空間大小

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM