去年看過一篇《ThreadPoolExecutor詳解》大致講了ThreadPoolExecutor內部的代碼實現。
總結一下,主要有以下四點:
當有任務提交的時候,會創建核心線程去執行任務(即使有核心線程空閑仍會創建);
當核心線程數達到corePoolSize時,后續提交的都會進BlockingQueue中排隊;
當BlockingQueue滿了(offer失敗),就會創建臨時線程(臨時線程空閑超過一定時間后,會被銷毀);
當線程總數達到maximumPoolSize時,后續提交的任務都會被RejectedExecutionHandler拒絕。
prestartAllCoreThreads方法可以直接創建所有核心線程並啟動。
BlockingQueue使用無限容量的阻塞隊列(如LinkedBlockingQueue)時,不會創建臨時線程(因為隊列不會滿),所以線程數保持corePoolSize。
BlockingQueue使用沒有容量的同步隊列(如SynchronousQueue)時,任務不會入隊,而是直接創建臨時線程去執行任務。
雖然線程池的模型被剖析的非常清晰,但是如何最高性能地使用線程池一直是一個令人糾結的問題,其中最主要的問題就是如何決定線程池的大小。
這篇文章會以量化測試的方式分析:何種情況線程池應該使用多少線程數。
1. 計算密集型任務與IO密集型任務
大多數剛接觸線程池的人會認為有一個准確的值作為線程數能讓線程池適用在程序的各個地方。然而大多數情況下並沒有放之四海而皆准的值,很多時候我們要根據任務類型來決定線程池大小以達到最佳性能。
計算密集型任務以CPU計算為主,這個過程中也會涉及到一些內存數據的存取,執行任務時CPU處於忙碌狀態。
IO密集型任務以IO為主,比如讀寫磁盤文件、讀寫數據庫、網絡請求等阻塞操作,執行IO操作時,CPU處於等待狀態。
2. 計算密集型任務
下面寫一個計算密集型任務的例子:
public class ComputeThreadPoolTest {
final static ThreadPoolExecutor computeExecutor;
final static List<Callable<Long>> computeTasks;
final static int task_count = 5000;
static {
computeExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
// 創建5000個計算任務
computeTasks = new ArrayList<>(task_count);
for (int i = 0; i < task_count; i++) {
computeTasks.add(new ComputeTask());
}
}
static class ComputeTask implements Callable<Long> {
// 計算一至五十萬數的總和(純計算任務)
@Override
public Long call() {
long sum = 0;
for (long i = 0; i < 50_0000; i++) {
sum += i;
}
return sum;
}
}
public static void main(String[] args) throws InterruptedException {
// 我電腦是四核處理器
int processorsCount = Runtime.getRuntime().availableProcessors();
// 逐一增加線程池的線程數
for (int i = 1; i <= processorsCount * 5; i++) {
computeExecutor.setCorePoolSize(i);
computeExecutor.setMaximumPoolSize(i);
computeExecutor.prestartAllCoreThreads();
System.out.print(i);
computeExecutor.invokeAll(computeTasks); // warm up all thread
System.out.print("\t");
testExecutor(computeExecutor, computeTasks);
System.out.println();
// 一定要讓cpu休息會兒,Windows桌面操作系統不會讓應用長時間霸占CPU
// 否則Windows回收應用程序的CPU核心數將會導致測試結果不准確
TimeUnit.SECONDS.sleep(5);// cpu rest
}
computeExecutor.shutdown();
}
private static <T> void testExecutor(ExecutorService executor, List<Callable<T>> tasks)
throws InterruptedException {
for (int i = 0; i < 8; i++) {
long start = System.currentTimeMillis();
executor.invokeAll(tasks); // ignore result
long end = System.currentTimeMillis();
System.out.print(end - start); // 記錄時間間隔
System.out.print("\t");
TimeUnit.SECONDS.sleep(1); // cpu rest
}
}
}
將程序生成的數據粘貼到excel中,並對數據進行均值統計
注意如果相同的線程數兩次執行的時間相差比較大,說明測試的結果不准確。
測試程序生成的數據可以從這下載
對數據生成折線圖
由於我筆記本的CPU有四個處理器,所以會發現當線程數達到4之后,5000個任務的執行時間並沒有變得更少,基本上是在600毫秒左右徘徊。
因為計算機只有四個處理器可以使用,當創建更多線程的時候,這些線程是得不到CPU的執行的。
所以對於計算密集型任務,應該將線程數設置為CPU的處理個數,可以使用Runtime.availableProcessors方法獲取可用處理器的個數。
《並發編程實戰》一書中對於IO密集型任務建議線程池大小設為Ncpu+1Ncpu+1,原因是當計算密集型線程偶爾由於頁缺失故障或其他原因而暫停時,這個“額外的”線程也能確保這段時間內的CPU始終周期不會被浪費。
對於計算密集型任務,不要創建過多的線程,由於線程有執行棧等內存消耗,創建過多的線程不會加快計算速度,反而會消耗更多的內存空間;另一方面線程過多,頻繁切換線程上下文也會影響線程池的性能。
3. 每個程序員都應該知道的延遲數
IO操作包括讀寫磁盤文件、讀寫數據庫、網絡請求等阻塞操作,執行這些操作,線程將處於等待狀態。
為了能更准確的模擬IO操作的阻塞,我覺得有必要將https://people.eecs.berkeley.edu/~rcs/research/interactive_latency.html中列舉的延遲數整理出來。
事件 納秒 微秒 毫秒 對照
一級緩存 0.5 - - -
二級緩存 7 - - 一級緩存時間14倍
互斥鎖定/解鎖 25.0 - - -
主存參考 100.0 - - 二級緩存20倍,一級緩存200倍
使用Zippy壓縮1K字節 3,000.0 3 - -
通過1Gbps網絡發送1K字節 10,000.0 10 - -
從SSD中隨機讀取4K 150,000.0 150 - 1GB/秒的讀取速度的SSD硬盤
從內存中順序讀取1MB 250,000.0 250 - -
在同一數據中心局域網內往返 500,000.0 500 - -
從SSD順序讀取1MB 1,000,000.0 1000 1 1GB/秒SSD,4X 內存
磁盤搜尋 10,000,000.0 10000 10 20X 數據中心往返
從磁盤順序讀取1MB 20,000,000.0 20000 20 80X 內存,20X SSD
發送一個數據包
美國加州→荷蘭→加州 150,000,000.0 150000 150 -
4. IO密集型任務
這里用sleep方式模擬IO阻塞:
public class IOThreadPoolTest {
// 使用無限線程數的CacheThreadPool線程池
static ThreadPoolExecutor cachedThreadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
static List<Callable<Object>> tasks;
// 仍然是5000個任務
static int taskNum = 5000;
static {
tasks = new ArrayList<>(taskNum);
for (int i = 0; i < taskNum; i++) {
tasks.add(Executors.callable(new IOTask()));
}
}
static class IOTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
cachedThreadPool.invokeAll(tasks);// warm up all thread
testExecutor(cachedThreadPool, tasks);
// 看看執行過程中創建了多少個線程
int largestPoolSize = cachedThreadPool.getLargestPoolSize();
System.out.println("largestPoolSize:" + largestPoolSize);
cachedThreadPool.shutdown();
}
private static void testExecutor(ExecutorService executor, List<Callable<Object>> tasks)
throws InterruptedException {
long start = System.currentTimeMillis();
executor.invokeAll(tasks);
long end = System.currentTimeMillis();
System.out.println(end - start);
}
}
這里使用無線程數限制的CachedThreadPool線程池,也就是說這里的5000個任務會被5000個線程同時處理,由於所有的線程都只是阻塞而不消耗CPU資源,所以5000個任務在不到2秒的時間內就執行完了。
很明顯使用CachedThreadPool能有效提高IO密集型任務的吞吐量,而且由於CachedThreadPool中的線程會在空閑60秒自動回收,所以不會消耗過多的資源。
但是打開任務管理器你會發現執行任務的同時內存會飆升到接近400M,因為每個線程都消耗了一部分內存,在5000個線程創建之后,內存消耗達到了峰值。
所以使用CacheThreadPool的時候應該避免提交大量長時間阻塞的任務,以防止內存溢出;另一種替代方案是,使用固定大小的線程池,並給一個較大的線程數(不會內存溢出),同時為了在空閑時節省內存資源,調用allowCoreThreadTimeOut允許核心線程超時。
線程執行棧的大小可以通過-Xss*size*或-XX:ThreadStackSize參數調整
5. 混合型任務
大多數情況下,並不是單一的計算型或IO型,而是IO伴隨計算兩者混合執行的任務——簡單的Http請求也會有請求的構造過程。
混合型任務要根據任務等待阻塞時間與CPU計算時間的比重來決定線程數量:
threads=cores1–blockingCoefficient=cores∗(1+waitTimecomputeTime)
threads=cores1–blockingCoefficient=cores∗(1+waitTimecomputeTime)
比如一個任務包含一次數據庫讀寫(0.1ms),並在內存中對讀取的數據進行分組過濾等操作(5μs),那么線程數應該為80左右。
線程數與阻塞比例的關系圖大致如下:
當阻塞比例為0,也就是純計算任務,線程數等於核心數(這里是4);阻塞比例越大,線程池的線程數應該更多。
《Java並發編程實戰》中最原始的公式是這樣的:
>Nthreads=Ncpu∗Ucpu∗(1+WC)>
>Nthreads=Ncpu∗Ucpu∗(1+WC)>
NcpuNcpu代表CPU的個數,UcpuUcpu代表CPU利用率的期望值(0<Ucpu<10<Ucpu<1),WCWC仍然是等待時間與計算時間的比例。
我上面提供的公式相當於目標CPU利用率為100%。
通常系統中不止一個線程池,所以實際配置線程數應該將目標CPU利用率計算進去。
6. 總結
線程池的大小取決於任務的類型以及系統的特性,避免“過大”和“過小”兩種極端。線程池過大,大量的線程將在相對更少的CPU和有限的內存資源上競爭,這不僅影響並發性能,還會因過高的內存消耗導致OOM;線程池過小,將導致處理器得不到充分利用,降低吞吐率。
要想正確的設置線程池大小,需要了解部署的系統中有多少個CPU,多大的內存,提交的任務是計算密集型、IO密集型還是兩者兼有。
雖然線程池和JDBC連接池的目的都是對稀缺資源的重復利用,但通常一個應用只需要一個JDBC連接池,而線程池通常不止一個。如果一個系統要執行不同類型的任務,並且它們的行為差異較大,那么應該考慮使用多個線程池,使每個線程池可以根據各自的任務類型以及工作負載來調整。
參考鏈接:
https://stackoverflow.com/questions/868568/what-do-the-terms-cpu-bound-and-i-o-bound-mean
https://people.eecs.berkeley.edu/~rcs/research/interactive_latency.html
https://en.wikipedia.org/wiki/Amdahl%27s_law
http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/
《並發編程實戰》:java concurrent in practice