幾種線程池的實現分析(轉)


1. 前言

在閱讀研究線程池的源碼之前,一直感覺線程池是一個框架中最高深的技術。研究后才發現,線程池的實現是如此精巧。本文從技術角度分析了線程池的本質原理和組成,同時分析了JDK、Jetty6、Jetty8、Tomcat的源碼實現,對於想了解線程池本質、更好的使用線程池或者定制實現自己的線程池的業務場景具有一定指導意義。

2. 使用線程池的意義

  • 復用:類似WEB服務器等系統,長期來看內部需要使用大量的線程處理請求,而單次請求響應時間通常比較短,此時Java基於操作系統的本地調用方式大量的創建和銷毀線程本身會成為系統的一個性能瓶頸和資源浪費。若使用線程池技術可以實現工作線程的復用,即一個工作線程創建和銷毀的生命周期期間內可以執行處理多個任務,從而總體上降低線程創建和銷毀的頻率和時間,提升了系統性能。
  • 流控:服務器資源有限,超過服務器性能的過高並發設置反而成為系統的負擔,造成CPU大量耗費於上下文切換、內存溢出等后果。通過線程池技術可以控制系統最大並發數和最大處理任務量,從而很好的實現流控,保證系統不至於崩潰。
  • 功能:JDK的線程池實現的非常靈活,並提供了很多功能,一些場景基於功能的角度會選擇使用線程池。

3. 線程池技術要點:

從內部實現上看,線程池技術可主要划分為如下6個要點實現:

 

圖1線程池技術要點

  • 工作者線程worker:即線程池中可以重復利用起來執行任務的線程,一個worker的生命周期內會不停的處理多個業務job。線程池“復用”的本質就是復用一個worker去處理多個job,“流控“的本質就是通過對worker數量的控制實現並發數的控制。通過設置不同的參數來控制worker的數量可以實現線程池的容量伸縮從而實現復雜的業務需求
  • 待處理工作job的存儲隊列:工作者線程workers的數量是有限的,同一時間最多只能處理最多workers數量個job。對於來不及處理的job需要保存到等待隊列里,空閑的工作者work會不停的讀取空閑隊列里的job進行處理。基於不同的隊列實現,可以擴展出多種功能的線程池,如定制隊列出隊順序實現帶處理優先級的線程池、定制隊列為阻塞有界隊列實現可阻塞能力的線程池等。流控一方面通過控制worker數控制並發數和處理能力,一方面可基於隊列控制線程池處理能力的上限。
  • 線程池初始化:即線程池參數的設定和多個工作者workers的初始化。通常有一開始就初始化指定數量的workers或者有請求時逐步初始化工作者兩種方式。前者線程池啟動初期響應會比較快但造成了空載時的少量性能浪費,后者是基於請求量靈活擴容但犧牲了線程池啟動初期性能達不到最優。
  • 處理業務job算法:業務給線程池添加任務job時線程池的處理算法。有的線程池基於算法識別直接處理job還是增加工作者數處理job或者放入待處理隊列,也有的線程池會直接將job放入待處理隊列,等待工作者worker去取出執行。
  • workers的增減算法:業務線程數不是持久不變的,有高低峰期。線程池要有自己的算法根據業務請求頻率高低調節自身工作者workers的數量來調節線程池大小,從而實現業務高峰期增加工作者數量提高響應速度,而業務低峰期減少工作者數來節省服務器資源。增加算法通常基於幾個維度進行:待處理工作job數、線程池定義的最大最小工作者數、工作者閑置時間。

線程池終止邏輯:應用停止時線程池要有自身的停止邏輯,保證所有job都得到執行或者拋棄。

4. 幾種線程池的實現細節

結合上面的技術點,列舉幾種線程池實現方式。

 
  • 工作者workers與待處理工作隊列實現方式舉例:

    實現

    工作者workers結構與並發保護

    待處理工作隊列結構

    JDK

    使用了HashSet來存儲工作者workers,通過可重入鎖ReentrantLock對其進行並發保護。每個worker都是一個Runnable接口。

    使用了實現接口BlockingQueue的阻塞隊列來存儲待處理工作job,並把隊列作為構造函數參數,從而實現業務可以靈活的擴展定制線程池的隊列。業務也可使用JDK自身的同步阻塞隊列SynchronousQueue、有界隊列ArrayBlockingQueue、無界隊列LinkedBlockingQueue、優先級隊列PriorityBlockingQueue。

    Jetty6

    同樣使用了HashSet存儲工作者workers,通過synchronized一個對象進行HashSet的並發保護。每個工作者實際上是一個Thread的擴展。

    使用了數組存儲待處理的job對象Runnable。數組初始化容量為_maxThreads個,使用變量_queued計算保存當前內部待處理job的個數即數組length。超過數組最大值時,擴大_maxThreads個容量,因此數組永遠夠用夠大,容量無界。同樣是用synchronized一個對象的方式實現同步。

    Jetty8

    使用了ConcurrentLinkedQueue存儲工作者workers,利用JDK基於CSA算法的實現提高了並發效率,同時也降低了線程池並發保護的復雜程度。針對隊列ConcurrentLinkedQueue無法保證size()實時性問題引入原子變量AtomicInteger統計工作者數量。

    與JDK相同實現,使用了基於接口BlockingQueue的阻塞隊列來存儲待處理工作job,也支持在線程池構造函數的參數中傳入隊列類型。同時,Jetty8內部默認未設置隊列類型場景可自動設置使用2種隊列:有界無法擴容的ArrayBlockingQueue及Jetty自身定制擴展實現的可擴容隊列BlockingArrayQueue。

    Tomcat

    基於JDK的ThreadPoolExecutors實現,復用JDK業務

    復用JDK業務

  • 線程池初始化與處理業務job算法舉例:

    實現

    線程池構造與工作者初始化

    處理業務job的算法

    JDK

    1. 基於多個構造參數實現靈活初始化,幾個核心參數如下:

    corePoolSize:核心工作者數

    maximumPoolSize:最大工作者數

    keepAliveTime:超過核心工作者數時閑置工作者的存活時間。

    workQueue:待處理job隊列,即前面提到的BlockingQueue接口。

    2. 默認初始化后不啟動工作者,等待有請求時才啟動。可以通過調用線程池接口提前啟動核心工作數個工作者線程,也可以啟動業務期望的多個工作者線程。

    1. 工作者workers數量低於核心工作者數corePoolSize時會優先創建一個工作者worker處理job,處理成功則返回。

    2. 工作者workers數量高於核心工作者數時會優先把job放入到待處理隊列,放入隊列成功時處理結束。

    3. 步驟2中入隊失敗會識別工作者數是否還小於最大工作者數maximumPoolsize,小於的話也會新創建一個工作者worker處理job。

    4. 拒絕處理

    Jetty6

    1. 同樣支持設置多個參數:

    _spawnOrShrinkAt:擴容/縮容閥值

    _minThreads:最小工作者數

    _maxThreads:最大工作者數

    _maxIdleTimeMs:閑置工作者最大閑置超時時間

    2. 初始化后直接啟動_minThreads個工作者線程

    1. 查找閑置的工作者worker,找到則派發job。

    2. 沒有閑置的工作者,將job存入待處理數組。

    3. 當識別到數組中待處理job超過擴容閥值參數時,擴容增加工作者處理job

    4. 否則不處理

    Jetty8

    1. 配置參數類似Jetty6,去除了_spawnOrShrinkAt閥值參數。

    2. 初始化后直接啟動_minThreads個工作者線程

    非常簡單,直接將待處理job入隊。

    Tomcat

    1. 基於JDK線程池的構造方法

    2. 來請求時啟動工作者

    處理方法復用JDK的,但是在開始提交前擴展了JDK的功能,實現了可以統計提交數submittedCount的能力

  • 線程池工作者worker的增減機制舉例:

    實現

    工作者增加算法

    工作者減少算法

    JDK

    1. 待處理job來時,工作者workers數量低於核心工作者數corePoolSize時。

    2. 待處理job來時,workers數超過核心數小於最大工作者數且入待處理隊列失敗場景。

    3. 業務調用線程池的更新核心工作者數接口時,若發現擴容,會增加工作者數。

    1. 待處理任務隊列里沒有job並且工作者workers數量超過了核心工作者數corePoolSize。

    2. 待處理任務隊列里沒有job並且允許工作者數量小於核心工作者參數為true,此場景會至少保留一個工作者線程。

    Jetty6

    1. 啟動線程池時會啟動_minThreads個工作者線程

    2. 待處理的job數量高於了閥值參數且工作者數沒有達到最大值時會增加工作者。

    3. 調用線程池接口setMinThreads更新最小工作者數時會根據需要增加工作者。

    如下三個條件同時滿足時會減少工作者:

    1. 待處理任務數組中沒有待處理job

    2. 工作者workers數量超過了最小工作者數_minThreads

    3. 閑置工作者線程數高於了閥值參數

    Jetty8

    1. 啟動線程池時啟動最小工作者參數個工作者線程

    2. 已經沒有閑置工作者或者閑置工作者的數量已經小於待處理的job的總數

    3. 調用線程池接口setMinThreads更新最小工作者數時

    如下三個條件同時滿足時會減少工作者:

    1. 待處理任務隊列里沒有待處理的job

    2. 工作者workers總數超過了最小工作者參數配置_minThreads

    3. 工作者線程的閑置時間超時

    Tomcat

    同JDK增加工作者算法

    復用JDK減少算法,同時定制擴展延遲參數,超過參數時,直接拋出異常到外面來終止線程池工作者。

5. 小結

對比幾種線程池實現,JDK的實現是最為靈活、功能最強且擴展性最好的,Tomcat即基於JDK線程池功能擴展實現,復用原有業務的同時擴充了自己的業務。Jetty6是完全自己定制的線程池業務,耦合線程池眾多復雜的業務邏輯到線程池類里面,邏輯相對最為復雜,擴展性也非常差。Jetty8相對Jetty6的實現簡化了很多,其中利用了JDK中的同步容器和原子變量,同時實現方式也越來越接近JDK。

6. 參考源碼

  • JDK源碼類:java.util.concurrent.ThreadPoolExecutor
  • Jetty6源碼類:org.mortbay.thread.QueuedThreadPool
  • Jetty8源碼類:org.eclipse.jetty.util.thread.QueuedThreadPool
  • Tomcat源碼類:org.apache.tomcat.util.threads.ThreadPoolExecutor

感謝郭蕾對本文的審校。

原文地址:http://www.infoq.com/cn/articles/thread-pool-algorithm-realization


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM