電腦的CPU資源是有限的,任務的處理速度與線程數量之間並不是正相關。當線程數量過多,CPU要頻繁的在不同線程切換,反而會引起處理性能的下降。線程池中最大的線程數,是考慮多種因素來事先設定的,比如硬件的條件,業務的類型等等。
當我們向一個固定大小的的線程池中請求一個線程時,當線程池中沒有空閑資源了,這個時候線程池如何處理這個請求?是拒絕請求還是排隊請求?各種策略又是如何實現的呢?
實際上,這些問題的處理並不復雜,底層的數據結構,就是隊列(queue)。
一、Java線程池介紹
1,線程池的作用
限制系統中執行線程的數量。
減少了創建和銷毀線程的次數,重復利用線程。
2,主要的類
Executor:執行線程的接口
ExecutorSerivce: 線程池接口
ThreadPoolExecutor :線程池類
Executors:常用線程池工廠
3,常用的線程池
配置線程池是比較復雜的過程,所有可以使用現有的線程池工廠生成常用的線程池:
- newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。為了合理利用資源,我們通常把定長池的長度設置為當前PC機獲取cpu核心數:Runtime.getRuntime().availableProcessors():獲取當前CPU核心數;
- newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程;
- newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行;
- newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class MyThreadPool {
public static void main(String [] args){
int num = Runtime.getRuntime().availableProcessors();
Executor executor = Executors.newFixedThreadPool(num);
for (int i = 0 ; i<num ; i++){
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("我是一個子線程!!");
}
});
}
}
}

我們再來看Executors.newFixedThreadPool(num),點進去,會發現就是new了一個LinkedBlockingQueue:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

二、線程池和隊列結合實現一個日志處理
JDK自己的線程池底層不光是用隊列實現的,我們也可以使用線程池和隊列相結合,來實現一些功能。
通常我們會把要執行的任務放入一個隊列中,由線程池來執行,比如爬蟲、日志。我們先來看一個線程池和隊列結合實現日志記錄的例子。
import com.swagger.demo.Entity.LogContentEntity;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@Configuration
@Aspect
@Component
public class AopLogConfig implements Runnable {
@Autowired
private HttpServletRequest request;
private LinkedBlockingQueue<LogContentEntity> logQueue;
public AopLogConfig() {
//Spring啟動后,該對象創建時。初始化隊列以及線程池。
logQueue = new LinkedBlockingQueue<LogContentEntity>(3000);
int num = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(num);
for (int i = 0 ;i<num ;i++){
executor.execute(this);
}
}
@Before("execution(public * com.swagger.demo.controller..*.*(..))")
public void doBefore(JoinPoint joinPoint) throws Exception{
//日志記錄的信息可自行修改
LogContentEntity Log = new LogContentEntity();
String method = request.getMethod();
Log.setHttpMethod(method);
String url = request.getRequestURL().toString();
Log.setUrl(url);
String ip = request.getRemoteAddr();
Log.setIp(ip);
Log.setContent("test Log Content");
//將需要記錄的日志對象放到隊列中等待線程異步執行。
logQueue.put(Log);
}
@Override
public void run() {
try{
while(true){
//如果隊列里沒有,則會阻塞;
LogContentEntity take = logQueue.take();
//日志處理邏輯可自行修改;
System.out.println(take.toString());
}
}catch(Exception e){
e.printStackTrace();
}
}
}

三、線程池+隊列以優先級方式執行隊列任務
import java.util.concurrent.TimeUnit;
public class MyPriorityTask implements Runnable, Comparable<MyPriorityTask> {
private int priority;
private String name;
public MyPriorityTask(String name, int priority) {
this.name = name;
this.priority = priority;
}
public void run() {
System.out.printf("MyPriorityTask: %s Priority :%d\n", name, priority);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int compareTo(MyPriorityTask o) {
if (this.getPriority() < o.getPriority()) {
return 1;
}
if (this.getPriority() > o.getPriority()) {
return -1;
}
return 0;
}
public int getPriority() {
return priority;
}
}

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 1, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
for (int i = 0; i < 100; i++) {
MyPriorityTask task = new MyPriorityTask("Task " + i, 0);
executor.execute(task);
System.out.println(executor.getTaskCount());
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 101; i < 8; i++) {
MyPriorityTask task = new MyPriorityTask("Task " + i, 1);
executor.execute(task);
System.out.println(executor.getTaskCount());
}
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Main: End of the program.\n");
}
}

四、使用線程池的一些陷阱
盡管線程池對於構建多線程應用是個很強大的機制,但它也不是沒有缺點的。使用線程池構建的應用會面臨其他多線程應用所面對的一樣的並發風險,比如同步錯誤和死鎖,此外線程池還有其他的一些特有缺陷,比如 線程池-關聯 死鎖,資源不足,還有線程泄漏。
1.死鎖
任何多線程應用都會面臨死鎖的風險。彼此雙方都在等待一個事件,而這個事件只能有對方提供,這樣一對進程或者線程我們稱之為死鎖。死鎖最簡單的情況是線程 A 持有了對象 X 的獨占鎖,線程 A 在等待對象 Y 的鎖,而線程 B 恰恰持有了對象 Y 的獨占鎖,線程 B 在等待對象 X 的鎖。除非有某種辦法能夠打破這種鎖等待(Java 鎖機制不能支持這個),否則的話這一對死鎖線程將會永久地等待下去。
既然死鎖是所有多線程編程都將面臨的風險,線程池為我們引入了另一種死鎖:線程池中所有線程都在阻塞等待隊列中另一個任務的執行結果,但是另一個任務無法得到執行,因為池中壓根兒就沒用空閑的可用線程。這種情況在線程池用於一些相互影響對象的模擬實現中可能會出現,這些模擬對象彼此發送查詢然后作為任務隊列進行執行,發起查詢的對象同步等待響應。
2.資源不足
線程池的優點之一是他們在大多數情況下比其他的調度機制具備更好的性能,比如我們上面所討論的那幾種。但這個取決於你有沒有恰當地配置了線程池大小。線程占用大量的資源,包括內存和其他系統資源。除了線程對象所必須的內存之外,每個線程還需要兩個執行調用棧,這個棧可能會很大。此外,JVM 可能還會為每個 Java 線程創建一個本地線程,這樣將會占用額外的系統資源。最后,雖然線程之間切換的調度開銷很小,大量的線程上下文切換也會影響到你的應用性能。
如果線程池過大的話,這些眾多線程所消耗的資源將會明顯影響到系統性能。時間會浪費在線程之間的切換上,配置有比你實際需要更多的線程會引起資源不足的問題,因為池中線程所占用的資源如果用在其他任務上可能會更高效。除了這些線程本身所使用的資源之外,服務請求時所做的工作可能會需要額外資源,比如 JDBC 連接,套接字,或者文件。這些也是有限的資源,而且對它們進行過高並發請求的話可能會導致失效,比如無法分配一個 JDBC 連接。
3.並發錯誤
線程池以及其他隊列機制依賴於 wait() 和 notify() 方法的使用,這可能會變得很棘手。如果編碼不當的話,很可能會導致通知丟失,結果就是池中的線程都處於一個空閑的狀態,而實際上隊列中有任務需要處理。在使用這些工具的時候要打起十二萬分的精神;即便是專家在用它們的時候也經常會失誤。幸運的是,可以使用一些現成的實現,這些實現久經考驗,比如下文將會討論到的 你無須自行編碼 實現的 java.util.concurrent 包。
4.線程泄漏
各種各樣的線程池中存在的一個重大的危險就是線程泄漏,當一個線程被從線程池中移除去執行一個任務,任務執行結束之后卻沒有返還給線程池的時候,就會出現這種危險。出現這種情況的一種方式是當任務拋出一個 RuntimeException 或一個 Error 時。如果線程池類沒有捕捉到這些,該線程將會傻傻地存在於線程池之中,而線程池的線程數量則會被永久地減一。當這種情況發生的次數足夠多的時候,線程池最終將為空(無可用線程),而系統則會癱瘓,因為已經沒有線程來處理任務了。
癱瘓的任務,比如那些永久等待不保證可用資源或者等待已經回家了的用戶輸入的任務,也可以造成相等於線程泄漏一樣的后果。如果一個線程永久地被這樣一個任務所占用了的話,它造成的影響和從池中移除是一樣的。像這樣的任務應該要么給它們一個線程池之外的線程,要么控制一下它們的等待時間。
5.請求過載
服務器很可能會被鋪天蓋地而來的請求所淹沒。這種情況下,我們可能並不想讓每個進來的請求都放進我們的工作隊列,因為等待執行的任務隊列也可能會占用過多系統資源並導致資源不足。這時候要做什么就取決於你的決定了,比如你可以通過一個表示服務器暫時太忙的響應來拒絕這些請求。
五、高效線程池使用指南
你只需要遵循一些簡單的指導方針,線程池就可以成為你構建服務應用的一個非常有效的方法:
- 不要把同步等待其他任務執行結果的任務放進任務隊列。這將導致上文所描述那種死鎖,池中所有線程都在等待一個任務的執行結果,而隊列中的這個任務無法得到執行因為所有線程都在使用中。
- 可能長時間操作的任務放入線程池的時候要慎重。如果程序必須要等待一個資源,比如一個 I/O 的完成,定義一個最長等待時間,然后失敗或稍后重新執行。這就保證了通過將一個線程從一個可能會完成的任務中釋放出來而最終一些其他任務得到成功執行。
- 理解你的任務。想要有效地調整線程池大小,你需要理解隊列中那些任務要做的事情。它們是 CPU 密集型操作嗎?它們會長時間占用 I/O 嗎?你的答案會影響到你對你的應用的配置。如果這些任務來自不同的類、有着截然不同的特征,為不同類型的任務定制不同的工作隊列也許更行得通,這樣每個池都能夠得到有據配置。
線程池大小配置
調整線程池的大小在很大程度上是一件避免兩個錯誤的事情:擁有過多或過少的線程。幸運的是,對於大多數應用而言太多或太少之間的中間地帶還是很寬廣的。
回顧應用中使用線程的兩個主要優點:在等待一個諸如 I/O 之類的慢操作的時候進程能夠繼續進行,利用多個處理器的可用性。在一個 N 處理器主機上運行一個計算密集型的應用,通過設置線程數量為 N 增加額外的線程可能會提高吞吐量,但添加的額外線程超過 N 的話就沒有什么好處了。確實,過多的線程甚至會降低性能因為會帶來額外的上下文切換開銷。
線程池最佳大小取決於可用處理器的數量和工作隊列中任務的性質。對於在一個 N-處理器 系統中一個的將持有完全計算密集型任務的工作隊列,通常獲得 CPU 最大利用率的話是配置線程池大小為 N 或 N + 1 個線程。
對於可能要等待 I/O 完成的任務,比如,一個從 socket 中讀取一個 HTTP 請求的任務 - 你需要增加線程池的線程的數量超出可用處理器的數量,因為所有的線程都在同一時間工作。通過分析,你可以為一個典型的請求估算出等待時間(WT)和服務時間(ST)之間的比率。比如我們稱這個比率為 WT/ST,對於一個 N-處理器系統,你需要大約 N * (1 + WT/ST) 個線程來保持處理器得到充分利用。
處理器利用率並非配置線程池大小的唯一依據。因為在線程池增長的時候,你可能會遇到調度器的局限性,內存可用性,或者其他系統資源,比如 socket 的數量,打開文件的處理,或者數據庫連接等問題。
六、總結
- 使用JDK的方法創建會產生OOM情況,主要原因是用LinkedBlockingQueue隊列,該隊列可以導致OOM。
- 線程可以使用用阿里巴巴推薦的方法,但是因為定線程數量,並且隊列用的是ArrayBlockingQueue,所以效率較低,不過可以保證內存不會OOM。
- 無需自行編碼。Doug Lea 寫了一個傑出的開源並發工具包,java.util.concurrent,包含了互斥,信合,能夠在並發訪問下性能表現良好的集合類諸如隊列和哈希表,以及一些工作隊列的實現。這個包里的 PooledExecutor 類是一個高效的、被廣泛使用的、基於工作隊列的一個線程池的正確實現。不用再嘗試着自己去寫代碼實現了,那樣很容易出錯,你可以考慮使用 java.util.concurrent 包里的一些工具。
- 線程池是構建服務器應用的很有用的一個工具。它的概念很簡單,但在實現或者使用的時候需要注意一些問題,比如死鎖,資源不足,以及 wait() 和 notify() 的復雜性。如果你發現自己的應用需要一個線程池,考慮一下使用 java.util.concurrent 包里的某個 Executor 類,比如 PooledExecutor,不要去從頭寫一個。如果你發現你在創建一些要處理簡短任務的線程,你就應該考慮使用線程池了。
我的微信公眾號:架構真經(id:gentoo666),分享Java干貨,高並發編程,熱門技術教程,微服務及分布式技術,架構設計,區塊鏈技術,人工智能,大數據,Java面試題,以及前沿熱門資訊等。每日更新哦!


參考資料:
- https://blog.csdn.net/weixin_39770927/article/details/81360511
- https://blog.csdn.net/zhangqinfu/article/details/52931530
- https://blog.csdn.net/every__day/article/details/83900109
- https://blog.csdn.net/wwp231/article/details/52504687
- https://blog.csdn.net/qq360694660/article/details/78296919
- https://blog.csdn.net/defonds/article/details/43796951
