1.為什么使用線程池
在多線程編程中一項很重要的功能就是執行任務,而執行任務的方式有很多種,為什么一定需要使用線程池呢?下面我們使用Socket編程處理請求的功能,分別對每種執行任務的方式進行分析。
1.1串行執行任務
當Socket監聽到客戶端有連接,通過handleSocket方法順序的處理每一個客戶端連接,當處理完成后,繼續監聽。代碼如下:
ServerSocket serverSocket = new ServerSocket();
SocketAddress endpoint = new InetSocketAddress(host, port);
serverSocket.bind(endpoint,1023);
while (!isStop) {
Socket socket = serverSocket.accept();
handleSocket(socket);
}
這種方式的缺點非常明顯:當我有多個客戶端請求時,在server處理一個請求的過程中,其他請求都需要等待前一個請求處理完畢。這種在高並發情況下幾乎不可用。
1.2為每個任務創建一個線程
針對上面的問題進行優化:為每一個客戶端請求創建一個線程來處理請求,主線程只需要創建線程,之后即可繼續堅挺客戶端請求.流程圖如下:
代碼如下:
ServerSocket serverSocket = new ServerSocket();
SocketAddress endpoint = new InetSocketAddress(host, port);
serverSocket.bind(endpoint,1023);
while (!isStop) {
Socket socket = serverSocket.accept();
new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++).start();
}
這種方式有以下優點:
1.將處理客戶端連接的操作從主線程中分離出去,使得主循環可以更快的響應下一次請求。
2.處理客戶端連接的操作是並行的,提高了程序的吞吐量。
但是這種方式有有以下幾個缺點:
1.處理請求的線程必須是線程安全的
2.線程的創建和銷毀都需要開銷,當大量創建線程的時候,將會消耗大量計算機資源
3.當可用的CPU數量小於可運行的線程的時候,那么多出來的線程會占用內存資源,給垃圾回收帶來壓力,並且在大量線程競爭CPU資源的時候會有很大的性能開銷
4.JVM中可創建的線程數存在一個上限,這個上限隨着平台的不同而不同,並且受多個因素的限制,包括JVM的啟動參數,每個線程所占用的內存大小等,如果超出這些限制,將會拋出OOM異常。
1.3 使用線程池處理客戶端請求
對於1.2中出現的問題,最好的解決方案就是使用線程池來執行task,這樣可以對創建的線程總數做限制,從而避免1.2中的問題。流程圖如下:
處理方式如下:
ServerSocket serverSocket = new ServerSocket();
SocketAddress endpoint = new InetSocketAddress(host, port);
serverSocket.bind(endpoint,1023);
while (!isStop) {
Socket socket = serverSocket.accept();
executorService.execute(new SocketHandler(socket, THREAD_NAME_PREFIX + threadIndex++));
}
此中方式有以下幾個優點:
1.任務提交和任務執行分離開
2.執行任務的線程可以重用,減少了線程創建和銷毀的開銷,同時當任務到達時可以直接使用創建好的線程執行任務,也提高了程序的響應速度。
2.java中線程池介紹
在java中線程池的實現是基於生產者-消費者模式的,線程池的功能將任務的提交和任務的執行分離,任務提交的過程為生產者,執行任務的過程為消費過程。具體的分析見源碼分析。java線程池的頂層接口為Executor,源碼如下:
public interface Executor {
void execute(Runnable command);
}
此接口為所有線程池實現的頂層接口,其規定了可以接受的task類型為Runnable實現類,但是具體的執行task的邏輯由線程池實現類自己定義,比如:
可以使用主線程串行執行任務,
也可以為每個任務創建一個新的線程
或者提前創建好一組線程,每次執行任務的時候從一組線程中取,等等
對於線程池的執行策略主要有以下幾個方面:
1.在什么線程中執行任務
2.按照什么順序執行任務(FIFO、LIFO、優先級?)
3.有多少個任務可以並發執行
4.最多可以有多少個任務在隊列中等待執行
5.當等待隊列中達到最大值的時候,怎么樣拒絕新提交的task
6.在執行一個任務之前或者之后需要做哪些操作?
應該根據具體的業務選擇不同的執行策略。在java類庫中提供了Executors工具類來常見默認策略的線程池。主要有以下幾個接口:
public static ExecutorService newFixedThreadPool(int nThreads)
將會創建一個固定大小的線程池,每當有新任務提交的時候,當線程總數沒有達到核心線程數的時候,為每個任務創建一個新線程,當線程的個數到達最大值后,重用之前創建的線程,當線程因為未知異常而停止時候,將會重現創建一個線程作為補充。
public static ExecutorService newCachedThreadPool()
根據需求創建線程的個數,當線程數大於任務數的時候,將會注銷多余的線程
public static ExecutorService newSingleThreadExecutor()
創建一個單線程的線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
創建一個可執行定時任務的線程池
在以上的例子中,所有提交的task在提交到線程池后其執行狀態是不可見的,即主線程無法知道提交的task是否執行結束或者執行結果。針對這個問題,java提供了可以返回數據的task接口Future和Callable接口。
其中Callable接口提供了任務返回數據以及拋出異常的功能,定義如下:
public interface Callable<V> {
V call() throws Exception;
}
在ExecutorService中所有的submit方法都會返回一個Future對象,其接口定義如下:
public interface Future<V> {
取消任務執行,當mayInterruptIfRunning為true,interruptedthisthread
boolean cancel(boolean mayInterruptIfRunning);
返回此任務是否在執行完畢之前被取消執行
boolean isCancelled();
返回此任務是否已經完成,包括正常結束,異常結束以及被cancel
boolean isDone();
返回執行結果,當任務沒有執行結束的時候,等待
V get() throws InterruptedException, ExecutionException;
}
3.使用線程池可能出現的問題
1.線程飢餓死鎖
在單線程的Executor中,如果Executor中執行的一個任務中,再次提交任務到同一個Executor中,並且等待這個任務執行完畢,那么就會發生死鎖問題。如下demo中所示:
public class ThreadDeadLock {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
public static void main(String[] args) throws Exception {
System.out.println("Main Thread start.");
EXECUTOR_SERVICE.submit(new DeadLockThread());
System.out.println("Main Thread finished.");
}
private static class DeadLockThread extends Thread{
@Override
public void run() {
try {
System.out.println("DeadLockThread start.");
Future future = EXECUTOR_SERVICE.submit(new DeadLockThread2());
future.get();
System.out.println("DeadLockThread finished.");
} catch (Exception e) {
}
}
}
private static class DeadLockThread2 extends Thread {
@Override
public void run() {
try {
System.out.println("DeadLockThread2 start.");
Thread.sleep(1000 * 10);
System.out.println("DeadLockThread2 finished.");
} catch (Exception e) {
}
}
}
}
輸出結果為:
Main Thread start.
Main Thread finished.
DeadLockThread start.
對於多個線程的線程池,如果所有正在執行的線程都因為等待處於工作隊列中的任務執行而阻塞,那么就會發生線程飢餓死鎖。
當往線程池中提交有依賴的任務時,應清楚的知道可能會出現的線程飢餓死鎖風險。應考慮是否將依賴的task提交到不同的線程池中
或者使用無界的線程池。
只有當任務相對獨立時,設置線程池大小和工作隊列的大小才是合理的,否則有可能會出現線程飢餓死鎖
2.任務運行時間過長
任務執行時間過長會影響線程池的響應時間,當運行時間長的任務遠大於線程池線程的個數時,會出現所有線程都在執行運行時間長的任務,從而影響對其他任務的響應。
解決辦法:
1.通過限定任務等待的時長,而不要無限期等待下去,當等待超時的時候,可以將任務標記為失敗,或者重新放到線程池中。
2.當線程池中阻塞任務過多的時,應該考慮擴大線程池的大小
4.線程池大小的設置
線程池的大小依賴於提交任務的類型以及服務器的可用資源,線程池的大小應該避免設置過大或者過小,當線程設置過打的時候可能會有資源耗盡的風險,線程池設置過小會有可用cpu空閑從而影響系統吞吐量。
影響線程池大小的資源有很多,比如CPU、內存、數據庫鏈接池等,只需要計算資源可用總資源 / 每個任務需要的資源,取最小值,即可得出線程池的上限。
線程池的最小值應該大於可用的CPU數量。
4.java中常用線程池源碼分析-ThreadPoolExecutor
ThreadPoolExecutor線程池是比較常用的一個線程池實現類,通過Executors工具類創建的線程池中,其具體實現類是ThreadPoolExecutor。首先我們可以看下ThreadPoolExecutor的構造函數如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
下面分別對構造函數中的各個參數對應的策略進行分析:
1.線程的創建與銷毀
首先構造函數中corePoolSize、maximumPoolSize、keepAliveTime和unit參數影響線程的創建和銷毀。其中corePoolSize為核心線程數,當第一次提交任務的時候如果正在執行的線程數小於corePoolSize,則新建一個線程執行task,如果已經超過corePoolSize,則將任務放到任務隊列中等待執行。當任務隊列的個數到達上限的時候,並且工作線程數量小於maximumPoolSize,則繼續創建線程執行工作隊列中的任務。當任務的個數小於maximumPoolSize的時候,將會把空閑的線程標記為可回收的垃圾線程。對於以下代碼段測試此功能:
public class ThreadPoolTest {
private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(3, 6,100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3));
public static void main(String[] args) throws Exception {
for (int i = 0; i< 9; i++) {
executorService.submit(new Task());
System.out.println("Active thread:" + executorService.getActiveCount() + ".Task count:" + executorService.getTaskCount() + ".TaskQueue size:" + executorService.getQueue().size());
}
}
private static class Task extends Thread {
@Override
public void run() {
try {
Thread.sleep(1000 * 100);
} catch (Exception e) {
}
}
}
}
輸出結果為:
Active thread:1.Task count:1.TaskQueue size:0
Active thread:2.Task count:2.TaskQueue size:0
Active thread:3.Task count:3.TaskQueue size:0
Active thread:3.Task count:4.TaskQueue size:1
Active thread:3.Task count:5.TaskQueue size:2
Active thread:3.Task count:6.TaskQueue size:3
Active thread:4.Task count:7.TaskQueue size:3
Active thread:5.Task count:8.TaskQueue size:3
Active thread:6.Task count:9.TaskQueue size:3
2.任務隊列
在ThreadPoolExecutor的構造函數中可以傳入保存任務的隊列,當新提交的任務沒有空閑線程執行時候,會將task保存到此隊列中。保存的順序是根據插入的順序或者Comparator來排序的。
3.飽和策略
ThreadPoolExecutor.AbortPolicy
拋出RejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy
將任務的執行交給調用者,即將本該異步執行的任務變成同步執行。
4.線程工廠
當線程池需要創建線程的時候,默認是使用線程工廠方法來創建線程的,通常情況下我們通過指定線程工廠的方式來為線程命名,便於出現線程安全問題時候來定位問題。
6.線程池最佳實現
1.項目中所有的線程應該都有線程池來提供,不允許自行創建線程
2.盡量不要用Executors來創建線程,而是使用ThreadPoolExecutor來創建
Executors有以下問題:
1)FixedThreadPool 和 SingleThreadPool:
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM。