深入理解Java多線程——線程池


為什么需要線程池

new Thread()不是創建一個對象那么簡單,需要調用操作系統內核的API,然后操作系統要為線程分配一系列的資源,這個成本就很高。所以線程是一個重量級的對象,應該避免頻繁創建和銷毀。而應對方案就是線程池。

定義

線程池,除了池的功能外,還提供了更全面的線程管理、任務提交等方法。帶來的好處是:

  • 降低資源消耗
  • 提高任務響應速度
  • 提高線程可管理性

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                      int maximumPoolSize,
                      long keepAliveTime,
                      TimeUnit unit,
                      BlockingQueue<Runnable> workQueue,
                      ThreadFactory threadFactory,
                      RejectedExecutionHandler handler)
  • workQueue,工作隊列負責存儲用戶提交的各個任務,下面會詳細介紹。
  • corePoolSize,核心線程數,可以理解為長期駐留的線程數目(除非設置了allowCoreThreadTimeOut)。對於不同的線程池,這個值可能會有很大區別,比如newFixedThreadPool會將其設置為nThreads,而對於newCachedThreadPool則是為0。
  • maximumPoolSize,線程不夠時能夠創建的最大線程數。對於newFixedThreadPool,就是nThreads,因為其要求是固定大小,而newCachedThreadPool則是Integer.MAX_VALUE 。
  • keepAliveTime和TimeUnit,這兩個參數指定了額外的線程能夠閑置多久,顯然有些線程池不需要它。
  • threadFactory,自定義如何創建線程,例如你可以給線程指定一個有意義的名字。
  • handler,自定義任務的拒絕策略。
  • 內部的“線程池”,是保存工作線程的集合,線程池需要在運行過程中管理線程創建、銷毀。線程池的工作線程被抽象為靜態內部類Worker,基於AQS實現。
    private final HashSet<Worker> workers = new HashSet<>();
  • ctl變量是一個非常有意思的設計,它被賦予了雙重角色,通過高低位的不同,既表示線程池狀態,又表示工作線程數目,這是一個典型的高效優化。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 真正決定了工作線程數的理論上限
private static fnal int COUNT_BITS = Integer.SIZE - 3;
private static fnal int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 線程池狀態,存儲在數字的高位
private static fnal int RUNNING = -1 << COUNT_BITS;
…
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

工作隊列workQueue

即各種BlockingQueue

  • 直接提交隊列。synchronousQueue,是特殊的BlockingQueue, 容量為0,沒執行一個插入就會阻塞,需要執行刪除才能喚醒。
  • 有界隊列。ArrayBlockingQueue,來了新任務時,pool創建工作線程,直到線程數達到corePoolSize,此時將任務加入有界隊列。如果隊列滿了,則繼續創建線程,直到maximimPoolSize,此時執行拒絕策略。
  • 無界隊列。LinkBlockingQueue,無界,注意OOM問題,提倡使用有界隊列。

不同的線程池

JUC的Executors目前提供了5種不同的線程池

  1. newFixedThreadPooll(int nThreads)創建一個指定工作線程數量的線程池
    coolPoolSize = maximumPoolSize = nThreads
    使用的是無界的工作隊列LinkBlockingQueue,每提交一個任務就創建一個工作線程,如果工作線程數量達到coolPoolSize,則將提交的任務存入到隊列中等待。
    maximumPoolSize和keepAliveTime無效。

  2. newCachedThreadPooll()創建一個可緩存的線程池,用來處理大量短時間工作任務的線程池。特點是:

    • 它會試圖緩存線程並重用,當無緩存線程可用時,就會創建新的工作線程。工作線程的創建數量有限制為Interger. MAX_VALUE。
    • 如果工作線程空閑超過1分鍾,將自動終止並移出緩存。長時間閑置時,這種線程池,不會消耗什么資源。
    • 其內部使用SynchronousQueue作為工作隊列
  3. newSingleThreadExecutor()創建一個單線程化的Executor
    只創建唯一的工作者線程,如果這個線程異常結束會有另一個取代它。單工作線程最大的特點是可保證順序地執行各個任務,並且在任意給定的時間只有一個線程在工作。

  4. newSingleThreadScheduledExecutor()和newScheduledThreadPool(int corePoolSize)。
    創建的是個ScheduledExecutorService,可以進行定時或周期性的工作調度,區別在於單一工作線程還是多個工作線程。

  5. newWorkStealingPool(int parallelism),JDK1.8引入。
    內部會構建ForkJoinPool,利用Work-Stealing算法,並行地處理任務,不保證處理順序。
    work-stealing pool的實現

Executor

Executor是一個基礎的接口,其初衷是將任務提交和任務執行細節解耦,使開發者不被太多線程創建、調度等不相關細節所打擾。

void execute(Runnable command);

ExecutorService則更加完善,不僅提供service的管理功能,比如shutdown等方法,也提供了更加全面的提交任務機制,如返回Future而不是void的submit方法。

<T> Future<T> submit(Callable<T> task);

線程池的工作原理

線程池是一種生產者-消費者模式,而不是經典池化資源的獲取/釋放模式。

為什么線程池沒有采用一般意義上池化資源的設計方法呢?因為找不到似execute(Runnable target)這種方法執行業務邏輯。

//采⽤⼀般意義上池化資源的設計⽅法
class ThreadPool{
		//	獲取空閑線程
		Thread acquire()	{
		}
		//	釋放線程
		void release(Thread	t){
		}
}	
//期望的使⽤
ThreadPool pool;
Thread	T1=pool.acquire();
//傳⼊Runnable對象
T1.execute(()->{
		//具體業務邏輯
		......
});		

所以,采用了生產者消費者模式,線程池的使用方是生產者,線程池本身是消費者,產品則是業務任務work,work被放入工作隊列workQueue。

可以把線程池類比為一個項目組,而線程就是項目組的成員。

線程池生命周期

線程池增長策略

任務通過execute(Runable)添加到pool

public void execute(Runnable command) {
…
int c = ctl.get();
// 檢查工作線程數目,低於corePoolSize則添加Worker
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
 return;
    c = ctl.get();
}
// isRunning就是檢查線程池是否被shutdown
// 工作隊列可能是有界的,ofer是比較友好的入隊方式
if (isRunning(c) && workQueue.ofer(command)) {
    int recheck = ctl.get();
// 再次進行防御性檢查
    if (! isRunning(recheck) && remove(command))
 reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
// 嘗試添加一個worker,如果失敗以為着已經飽和或者被shutdown了
else if (!addWorker(command, false))
 reject(command);
}

線程池大小的設置

如果我們的任務主要是進行計算,通常建議按照CPU核的數目N或者N+1。

如果是需要較多等待的任務,例如I/O操作比較多,可以參考Brain Goetz推薦的計算方法:線程數 = CPU核數 × (1 + 平均等待時間/平均工作時間)

線程池使用的注意事項

  1. 避免任務堆積。工作隊列是無界的,如果工作線程數目太少,導致處理跟不上入隊的速度,這就很有可能占用大量系統內存,甚至是出現OOM。
  2. 避免過度擴展線程。通常在處理大量短時任務時,使用可緩存的線程池,但很難明確設置線程數目。
  3. 避免線程泄漏。往往是因為任務邏輯有問題,導致工作線程遲遲不能被釋放,當線程數目不斷增長時造成溢出。
  4. 避免死鎖
  5. 避免在使用線程池時操作ThreadLocal。因為ThreadLocalMap中廢棄項目的回收依賴於顯式地觸發,否則就要等待線程結束,內存自動回收弱引用,進而回收相應ThreadLocalMap,但worker線程往往是不會退出的,這就容易出現OOM。
  6. Executors提供的很多方法默認使用的都是無界的LinkedBlockingQueue,高負載情境下,無界隊列很容易導致OOM,而OOM會導致所有請求都無法處理,所以建議使用有界隊列

參考

《Java並發實戰》
《Java核心技術36講》楊曉峰


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM