Java線程池使用


線程池的優點

當我們需要一個新的線程執行任務時,可能會直接創建一個

new Thread(()->{
	// do something
}).start();

在業務量較少的情況,這樣也沒什么太大問題。

但是如果任務頻繁的話

  1. 頻繁的創建和銷毀線程是十分消耗性能的,甚至可能創建和銷毀線程所用時間大於任務本身執行所用時間
  2. 如果業務量非常大,可能會占用過多的資源,導致整個服務由於資源不足而宕機

這里就可以引入線程池。

線程池,簡單來說,就是維護了若干個線程,當需要執行任務時,直接調用其中某一個線程來執行即可。

線程池的優點

  1. 降低性能消耗:重復利用已創建的線程,節省了頻繁創建和銷毀帶來的性能損耗
  2. 提示任務效率:任務來了分配一個線程就可以立即干活,而不用等待線程重新創建
  3. 更好的管控:線程池可以控制線程數,避免過多消耗服務器資源;亦更方便調優和監控

線程池執行流程

先看看線程池核心構造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

簡單介紹下這7個參數

  1. corePoolSize: 核心線程數
  2. maximumPoolSize:最大線程數
  3. keepAliveTime:非核心線程存活時間
  4. unit:非核心線程存活時間單位
  5. workQueue:緩存隊列
  6. threadFactory:線程工廠
  7. handler:拒絕策略

這里參數介紹非常簡單,但足以理解線程池執行流程圖,之后對這7個參數還會擴展說明,可結合理解。

線程池執行流程圖

實例類比

假設有一家xx銀行,默認開放5個櫃台營業,業務繁忙時最大可開放10個櫃台。另外還有一個候客區,可容納10個人。其運行流程大概如下

銀行例子1

銀行例子2

銀行例子3

銀行例子4

銀行例子5

銀行例子6

銀行例子7

銀行例子8

銀行例子9

銀行例子10

銀行例子11

銀行例子12

7大參數擴展說明

1 int corePoolSize

核心線程數

最初線程池里沒有線程,一開始新建的就是核心線程

2 int maximumPoolSize

最大線程數

=核心線程數+非核心線程數

當線程數到了核心線程數且隊列滿了才會新建非核心線程

3 long keepAliveTime

非核心線程存活時間

非核心線程一段時間不干活就會被銷毀

通常,核心線程閑置也會保留在線程池里。但如果設置ThreadPoolExecutor的allowCoreThreadTimeOut屬性為true,核心線程閑置一段時間也會被銷毀。

4 TimeUnit unit

非核心線程數存活時間單位

5 BlockingQueue workQueue

存放待執行任務的隊列

5.1 SynchronousQueue

  1. 這個隊列接收到任務的時候,會直接提交給線程處理,而不保留它。
  2. 如果所有線程都在工作怎么辦?那就新建一個線程來處理這個任務
  3. 使用這個類型隊列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE。(保證不出現【線程數達到了maximumPoolSize而不能新建線程】的錯誤)

5.2 LinkedBlockingQueue

  1. 這個隊列接收到任務的時候,如果當前線程數小於核心線程數,則新建線程(核心線程)處理任務
  2. 如果當前線程數等於核心線程數,則進入隊列等待
  3. 這個隊列沒有最大值限制,所有超過核心線程數的任務都將被添加到隊列中。(這也就導致了maximumPoolSize的設定失效,因為總線程數永遠不會超過corePoolSize)

5.3 ArrayBlockingQueue

  1. 這個隊列接收到任務的時候,如果當前線程數小於核心線程數,則新建線程(核心線程)執行任務
  2. 如果當前線程數等於核心線程數,則進入隊列等待
  3. 如果隊列已滿,則新建線程(非核心線程)執行任務
  4. 如果總線程數到了maximumPoolSize,則觸發拒絕策略
  5. 可以限定這個隊列的長度

5.4 DelayQueue

  1. 這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務
  2. 隊列內元素必須實現Delayed接口,即任務必須實現Delayed接口

6 ThreadFactory threadFactory

線程工廠

這是一個接口,需要實現它的Thread newThread(Runnable r)方法

可以對線程進行自定義的初始化,例如給線程設定名字,方便后期調試

7 RejectedExecutionHandler handler

拒絕策略

當提交任務數超過maximumPoolSize+workQueue之和時觸發

  1. AbortPolicy:默認策略。拋出RejectedExecutionException異常
  2. DiscardPolicy:丟棄當前任務,不拋出任何異常
  3. DIscardOldestPolicy:丟棄隊列里最早添加的元素,再安排當前任務。如果失敗則不斷重試
  4. CallerRunsPolicy:使用調用者自己的線程來執行任務。調用者線程會調用執行器的execute方法來執行該任務
  5. 自定義策略:實現RejectedExecutionHandler接口,實現rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法

Java自帶的四個線程池

了解了線程池7大參數,理解自帶的四種線程池會容易很多

1 FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

固定大小的線程池

  1. 每次來新任務就會創建一個新的線程,直到線程數達到最大線程數(這里也是核心線程數)
  2. 線程數達到最大值后就不會變,因為沒有非核心線程會被銷毀
  3. 如果某個線程因為執行異常而結束,那么線程池會補充一個新線程

2 SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

一個單線程的線程池

  1. 該線程池只有一個線程工作,相當於單線程串行執行所有任務
  2. 保證所有任務的執行順序按照任務的提交順序執行
  3. 如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它

3 CachedThreadPool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

一個可緩存的線程池

  1. 無核心線程,最大線程數Integer.MAX_VALUE
  2. 線程空閑60s后回收
  3. 緩存隊列是不存儲任務的隊列

4 ScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
	
public ScheduledThreadPoolExecutor(int corePoolSize) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
		  new DelayedWorkQueue());
}

一個有計划執行的線程池

  1. 有固定核心線程,最大線程數Integer.MAX_VALUE
  2. 定時以及周期性執行任務

自定義線程池

那么,我們實際中用哪一種捏。

答案是,都不用。

因為,上述線程中

  1. FixedThreadPool 和 SingleThreadExecutor 使用了 LinkedBlockingQueue ,這是個無界隊列。當任務突發過多時,這個隊列可能因為緩存太多任務而消耗非常多的內存資源,最終導致OOM
  2. CachedThreadPool 和 ScheduledThreadPool 最大線程數是Integer.MAX_VALUE。即相當於沒有對最大線程數做限制,任務突發過多時,可能因為創建大量線程導致資源耗盡,最終同樣導致OOM

所以,我們來自定義線程池。

線程池使用步驟

線程池使用步驟大概如下

  1. 根據7大參數,建立一個線程池
  2. 放入要執行的任務
  3. 最后,如有必要,關閉線程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(params...);
pool.execute(Runnable r);
pool.shutdown();

其中關閉線程池有兩個方法shutdown和shutdownNow,區別是

  • shutdown: 線程池拒收新任務,等待線程池里的任務執行完畢,之后關閉線程池。
  • shutdownNow: 線程池拒收新任務,線程池里的所有任務立刻停止,關閉線程池。

自定義線程池例子

1 自定義工具類。

包裝自定義線程池,對外提供靜態方法方便使用。

這里為了方便測試,核心線程1,最大線程10,緩存隊列10,該線程池最大接收20個任務

package com.test.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomThreadPoolUtil {
	
	private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolUtil.class);
	
	private static ThreadPoolExecutor pool = null;

	static {
		pool = new ThreadPoolExecutor(1, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), new CustomThreadFactory(), new CustomRejectedExecutionHandler());
	}
	
	public static void destory() {
		pool.shutdown();
	}
	
	public static void execute(Runnable r) {
		pool.execute(r);
	}
	
	private static class CustomThreadFactory implements ThreadFactory {
		private AtomicInteger count = new AtomicInteger(0);	
		
		@Override
		public Thread newThread(Runnable r) {
			Thread t = new Thread(r);
			String threadName = CustomThreadPoolUtil.class.getSimpleName() + count.incrementAndGet();
			t.setName(threadName);
			
			return t;
		}
	}
	
	private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

		@Override
		public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
			logger.error("任務執行失敗 {}, 線程池已滿 {}", r.toString(), executor.toString());
		}
		
	}
	
}

2 測試類。

創建21個任務,故意大於自定義線程池最大可處理量20

package com.test.threadpool;

public class TestThreadPool {

	public static void main(String[] args) {
		int num = 21;
		for(int i=1; i<=num; i++) {
			int j = i;
			CustomThreadPoolUtil.execute(new Runnable() {

				@Override
				public void run() {
					try {
						Thread.sleep(100); // 模擬業務運行
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName() + " 執行了任務" + j);
				}
			});
		}
		
		CustomThreadPoolUtil.destory();
		
	}
	
}

3 測試結果。

20個任務被正常執行。最后一個任務被拒絕,調用了我們的自定義拒絕方法

任務執行失敗 com.threadpool.TestThreadPool$1@3af49f1c, 線程池已滿 java.util.concurrent.ThreadPoolExecutor@19469ea2[Running, pool size = 10, active threads = 10, queued tasks = 10, completed tasks = 0]
CustomThreadPoolUtil2 執行了任務12
CustomThreadPoolUtil1 執行了任務1
CustomThreadPoolUtil7 執行了任務17
CustomThreadPoolUtil3 執行了任務13
CustomThreadPoolUtil4 執行了任務14
CustomThreadPoolUtil5 執行了任務15
CustomThreadPoolUtil9 執行了任務19
CustomThreadPoolUtil8 執行了任務18
CustomThreadPoolUtil6 執行了任務16
CustomThreadPoolUtil10 執行了任務20
CustomThreadPoolUtil1 執行了任務3
CustomThreadPoolUtil2 執行了任務2
CustomThreadPoolUtil9 執行了任務8
CustomThreadPoolUtil3 執行了任務5
CustomThreadPoolUtil7 執行了任務4
CustomThreadPoolUtil8 執行了任務9
CustomThreadPoolUtil5 執行了任務7
CustomThreadPoolUtil10 執行了任務11
CustomThreadPoolUtil6 執行了任務10
CustomThreadPoolUtil4 執行了任務6

參數設置推薦

剛才參數只是為了方便測試,實際中,如何設置各個參數才更合理呢

1 核心線程數(corePoolSize)

參考 任務耗時 和 每秒任務數

假設一個任務耗時0.1秒,系統每秒產生100個任務。

如果想在1秒內處理完這100個任務,那么有 0.1 * 100 / corePoolSize = 1,得 corePoolSize = 10

同理,如果只是偶爾某一秒產生了100個任務,后面有更多時間去處理,如2秒,那么0.1 * 100 / corePoolSize = 2,得 corePoolSize = 5

tip: 根據8020法則,實際應用中,不會每秒一直產生100的任務量,所以最終核心線程數可以設置為計算所得的80%,即最終corePoolSize = 10 * 0.8 = 8。而有時100的任務量,還有緩存隊列和最大線程數來保證可以執行。不過為了方便后續計算,這里還是先取 corePoolSize = 10。

2 任務隊列長度(workQueue)

參考 核心線程數 和 任務耗時

一般可設置為 核心線程數/單個任務執行時間*2

如本例中,緩存隊列長度可設置為 10 / 0.1 * 2 = 200

3 最大線程數(maximumPoolSize)

參考 核心線程數,緩存隊列長度,每秒最大任務數

一般可設置為 (最大任務數-任務隊列長度)*單個任務執行時間

假設本例中,每秒最大任務數1000,則最大線程數 = (1000 - 200) * 0.1 = 80

4 最大空閑時間(keepAliveTime)

參考系統運行環境和硬件壓力設定

無固定參考值,可根據系統產生任務的時間間隔合理設置

5 拒絕策略(handler)

參考 任務重要程度

任務不重要可直接丟棄,重要可自行采用緩沖機制

總結

本文先由單線程的弊端引出多線程,然后介紹了線程池的總體執行流程。

之后擴展說明7大參數,由此從理論上介紹了Java自帶的四大線程池。

然而實際中並不使用它們,轉到自定義線程池,並給出代碼示例。最后介紹了自定義線程池的參數設置推薦。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM