本文部分摘自《Java 並發編程的藝術》
概述
Java 中的線程池是運行場景最多的並發框架,合理使用線程池能夠帶來三個好處:
- 降低資源消耗。通過重復利用已有的線程降低線程創建和銷毀造成的消耗
- 提高響應速度。當任務到達時,任務可以不需要等待線程創建就能立即執行
- 提高線程可管理性。線程是稀缺資源,使用線程池進行統一分配、調優和監控,可以降低資源消耗,提高系統穩定性
線程池的實現原理
從圖中可以看到,當提交一個新任務到線程池時,線程池的處理流程如下:
- 線程池判斷核心線程池里的線程是否都在執行任務,如果不是,創建一個新的工作線程執行任務,否則進入下一流程
- 線程池判斷工作隊列是否已滿,如果工作隊列沒有滿,將新提交的任務存儲在工作隊列中,否則進入下一流程
- 線程池判斷線程池里的線程是否都處於工作狀態,如果沒有,創建一個新的工作線程執行任務,否則交給飽和策略來處理這個任務
使用線程池
1. 創建線程池
我們可以通過 ThreadPoolExecutor 來創建一個線程池
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
創建一個線程需要輸入幾個參數,如下:
-
corePoolSize(線程池的基本大小)
當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即時其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建
-
maximumPoolSize(線程池最大數量)
線程池允許創建的最大線程數,如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是,如果使用無界阻塞隊列做任務隊列,則這個參數沒有什么效果
-
keepAliveTime(線程活動保持時間)
線程池的工作線程空閑后,保持存活的時間。如果任務很多,並且每個任務的執行時間都比較短,可以調大時間,提高線程利用率
-
unit(線程保持活動時間的單位)
可選的單位有天(DAYS)、小時(HOURS)、分鍾(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS)和納秒(NANOSECONDS)
-
workQueue(任務隊列)
用於保存等到執行的任務的阻塞隊列,可以選擇以下幾個阻塞隊列:
-
ArrayBlockingQueue
是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序
-
LinkedBlockingQueue
一個基於鏈表結構的阻塞隊列,此隊列按 FIFO 排序元素,吞吐量通常高於 ArrayBlockingQueue
-
SynchronousQueue
一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一致處於阻塞狀態,吞吐量通常要高於 LinkedBlockingQueue
-
PriorityBlockingQueue
一個具有優先級的無界阻塞隊列
-
-
threadFactory
用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字
-
handler(飽和策略)
當任務和線程池都滿了,說明線程池處於飽和狀態,必須采取一種策略處理提交的新任務。在 JDK5 中線程池框架提供了以下四種策略:
- AbortPolicy:直接拋出異常,默認采取這種策略
- CallerRunsPolicy:使用調用者所在線程來運行任務
- DiscardOldestPolicy:丟棄隊列最近的一個任務,並執行當前任務
- DiscardPolicy:不處理,丟棄掉
也可以根據需要實現 RejectedExecutionHandler 接口自定義策略
2. 向線程池提交任務
可以使用 execute() 和 submit() 方法向線程池提交任務
-
execute() 方法用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功
threadsPool.execute(new Runnable() { @Override public void run() { //... } })
-
submit() 方法用於提交需要返回值的任務,線程池會返回一個 future 對象,通過這個對象可以判斷任務是否執行成功
Future<Object> future = executor.submit(hasReturnValueTask); try { Object s = future.get(); } catch(InterruptedException e) { // 處理中斷異常 } catch(ExecutionException e) { // 處理無法執行任務異常 } finally { // 關閉線程池 executor.shutdown(); }
3. 關閉線程池
可以通過調用線程池的 shutdown 或 shutdownNow 方法來關閉線程池,它們的原理是遍歷線程池中的工作線程,逐個調用線程的 interrupt 方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止
shutdown 方法和 shutdownNow 方法存在一定的區別:
- shutdownNow 方法首先將線程池狀態設置成 STOP,然后嘗試停止所有正在執行或暫停任務的線程,並返回等待執行任務的列表
- shutdown 方法只是將線程池狀態設置成 SHUTDOWN 狀態,然后中斷所有沒有正在執行任務的線程
只要調用了這兩個關閉方法中的任意一個,isShutdown 方法就會返回 true,當所有任務都已關閉,才表示線程池關閉成功,這時調用 isTerminaed 方法會返回 true。至於應該采用哪種方法關閉線程池,應該由提交到線程池的任務特性決定,通常調用 shutdown 方法關閉線程池,如果任務不一定要執行完成,可以調用 shutdownNow 方法
基於線程池技術的簡單 Web 服務器
目前的瀏覽器都支持多線程訪問,比如請求一個頁面的時候,頁面包含的圖片等靜態資源會被瀏覽器並發的獲取。如果 Web 服務器是單線程的,按順序處理發送過來的請求,無疑會影響用戶體驗,因此大部分 Web 服務器都支持並發訪問
下面使用線程池來構造一個簡單的 Web 服務器,這個 Web 服務器用來處理 HTTP 請求,目前只能處理簡單的文本和圖片內容。該 Web 服務器使用 main 線程不斷接受客戶端的 Socket 連接,將連接以及請求提交給線程池處理,這樣使得 Web 服務器能同時處理多個客戶端的請求
public class SimpleHttpServer {
static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
5, 10, 60L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
static ServerSocket serverSocket;
static int port = 8080;
public static void setPort(int port) {
if (port > 0) {
SimpleHttpServer.port = port;
}
}
/**
* 啟動 SimpleHttpServer
*/
public static void start() throws Exception {
serverSocket = new ServerSocket(port);
Socket socket = null;
while ((socket = serverSocket.accept()) != null) {
// 接收一個客戶端Socket,生成一個HttpRequestHandler,放入線程池執行
threadPool.execute(new HttpRequestHandler(socket));
}
serverSocket.close();
}
static class HttpRequestHandler implements Runnable {
private Socket socket;
public HttpRequestHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
String line;
BufferedReader br = null;
BufferedReader reader = null;
PrintWriter out = null;
InputStream in = null;
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String header = reader.readLine();
// 計算絕對路徑
String filePath = SimpleHttpServer.class.getResource(header.split(" ")[1]).getPath();
out = new PrintWriter(socket.getOutputStream());
// 如果請求資源的后綴為 jpg 或 ico,則讀取資源並輸出
if (filePath.endsWith("jpg") || filePath.endsWith("ico")) {
in = new FileInputStream(filePath);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int i;
while ((i = in.read()) != -1) {
baos.write(i);
}
byte[] array = baos.toByteArray();
out.println("HTTP/1.1 200 OK");
out.println("Server: YeeQ");
out.println("Content-Type: image/jpeg");
out.println("Content-Length: " + array.length);
out.println("");
socket.getOutputStream().write(array, 0, array.length);
} else {
br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
out = new PrintWriter(socket.getOutputStream());
out.println("HTTP/1.1 200 OK");
out.println("Server: YeeQ");
out.println("Content-Type: text/html; charset=UTF-8");
out.println("");
while ((line = br.readLine()) != null) {
out.println(line);
}
}
out.flush();
} catch (Exception e) {
if (out != null) {
out.println("HTTP/1.1 500");
out.println("");
out.flush();
}
} finally {
close(br, in, reader, out, socket);
}
}
}
/**
* 關閉流或者socket
*/
private static void close(Closeable... closeables) {
if (closeables != null) {
for (Closeable closeable : closeables) {
if (closeable != null) {
try {
closeable.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws Exception {
SimpleHttpServer.start();
}
}