Java 線程池詳解



本文部分摘自《Java 並發編程的藝術》


概述

Java 中的線程池是運行場景最多的並發框架,合理使用線程池能夠帶來三個好處:

  • 降低資源消耗。通過重復利用已有的線程降低線程創建和銷毀造成的消耗
  • 提高響應速度。當任務到達時,任務可以不需要等待線程創建就能立即執行
  • 提高線程可管理性。線程是稀缺資源,使用線程池進行統一分配、調優和監控,可以降低資源消耗,提高系統穩定性

線程池的實現原理

從圖中可以看到,當提交一個新任務到線程池時,線程池的處理流程如下:

  1. 線程池判斷核心線程池里的線程是否都在執行任務,如果不是,創建一個新的工作線程執行任務,否則進入下一流程
  2. 線程池判斷工作隊列是否已滿,如果工作隊列沒有滿,將新提交的任務存儲在工作隊列中,否則進入下一流程
  3. 線程池判斷線程池里的線程是否都處於工作狀態,如果沒有,創建一個新的工作線程執行任務,否則交給飽和策略來處理這個任務

使用線程池

1. 創建線程池

我們可以通過 ThreadPoolExecutor 來創建一個線程池

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

創建一個線程需要輸入幾個參數,如下:

  • corePoolSize(線程池的基本大小)

    當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即時其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建

  • maximumPoolSize(線程池最大數量)

    線程池允許創建的最大線程數,如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是,如果使用無界阻塞隊列做任務隊列,則這個參數沒有什么效果

  • keepAliveTime(線程活動保持時間)

    線程池的工作線程空閑后,保持存活的時間。如果任務很多,並且每個任務的執行時間都比較短,可以調大時間,提高線程利用率

  • unit(線程保持活動時間的單位)

    可選的單位有天(DAYS)、小時(HOURS)、分鍾(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS)和納秒(NANOSECONDS)

  • workQueue(任務隊列)

    用於保存等到執行的任務的阻塞隊列,可以選擇以下幾個阻塞隊列:

    • ArrayBlockingQueue

      是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序

    • LinkedBlockingQueue

      一個基於鏈表結構的阻塞隊列,此隊列按 FIFO 排序元素,吞吐量通常高於 ArrayBlockingQueue

    • SynchronousQueue

      一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一致處於阻塞狀態,吞吐量通常要高於 LinkedBlockingQueue

    • PriorityBlockingQueue

      一個具有優先級的無界阻塞隊列

  • threadFactory

    用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字

  • handler(飽和策略)

    當任務和線程池都滿了,說明線程池處於飽和狀態,必須采取一種策略處理提交的新任務。在 JDK5 中線程池框架提供了以下四種策略:

    • AbortPolicy:直接拋出異常,默認采取這種策略
    • CallerRunsPolicy:使用調用者所在線程來運行任務
    • DiscardOldestPolicy:丟棄隊列最近的一個任務,並執行當前任務
    • DiscardPolicy:不處理,丟棄掉

    也可以根據需要實現 RejectedExecutionHandler 接口自定義策略

2. 向線程池提交任務

可以使用 execute() 和 submit() 方法向線程池提交任務

  • execute() 方法用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功

    threadsPool.execute(new Runnable() {
        @Override
        public void run() {
            //...
        }
    })
    
  • submit() 方法用於提交需要返回值的任務,線程池會返回一個 future 對象,通過這個對象可以判斷任務是否執行成功

    Future<Object> future = executor.submit(hasReturnValueTask);
    try {
        Object s = future.get();
    } catch(InterruptedException e) {
        // 處理中斷異常
    } catch(ExecutionException e) {
        // 處理無法執行任務異常
    } finally {
        // 關閉線程池
        executor.shutdown();
    }
    

3. 關閉線程池

可以通過調用線程池的 shutdown 或 shutdownNow 方法來關閉線程池,它們的原理是遍歷線程池中的工作線程,逐個調用線程的 interrupt 方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止

shutdown 方法和 shutdownNow 方法存在一定的區別:

  • shutdownNow 方法首先將線程池狀態設置成 STOP,然后嘗試停止所有正在執行或暫停任務的線程,並返回等待執行任務的列表
  • shutdown 方法只是將線程池狀態設置成 SHUTDOWN 狀態,然后中斷所有沒有正在執行任務的線程

只要調用了這兩個關閉方法中的任意一個,isShutdown 方法就會返回 true,當所有任務都已關閉,才表示線程池關閉成功,這時調用 isTerminaed 方法會返回 true。至於應該采用哪種方法關閉線程池,應該由提交到線程池的任務特性決定,通常調用 shutdown 方法關閉線程池,如果任務不一定要執行完成,可以調用 shutdownNow 方法


基於線程池技術的簡單 Web 服務器

目前的瀏覽器都支持多線程訪問,比如請求一個頁面的時候,頁面包含的圖片等靜態資源會被瀏覽器並發的獲取。如果 Web 服務器是單線程的,按順序處理發送過來的請求,無疑會影響用戶體驗,因此大部分 Web 服務器都支持並發訪問

下面使用線程池來構造一個簡單的 Web 服務器,這個 Web 服務器用來處理 HTTP 請求,目前只能處理簡單的文本和圖片內容。該 Web 服務器使用 main 線程不斷接受客戶端的 Socket 連接,將連接以及請求提交給線程池處理,這樣使得 Web 服務器能同時處理多個客戶端的請求

public class SimpleHttpServer {

    static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            5, 10, 60L,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    static ServerSocket serverSocket;

    static int port = 8080;

    public static void setPort(int port) {
        if (port > 0) {
            SimpleHttpServer.port = port;
        }
    }

    /**
     * 啟動 SimpleHttpServer
     */
    public static void start() throws Exception {
        serverSocket = new ServerSocket(port);
        Socket socket = null;
        while ((socket = serverSocket.accept()) != null) {
            // 接收一個客戶端Socket,生成一個HttpRequestHandler,放入線程池執行
            threadPool.execute(new HttpRequestHandler(socket));
        }
        serverSocket.close();
    }

    static class HttpRequestHandler implements Runnable {

        private Socket socket;

        public HttpRequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            String line;
            BufferedReader br = null;
            BufferedReader reader = null;
            PrintWriter out = null;
            InputStream in = null;
            try {
                reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String header = reader.readLine();
                // 計算絕對路徑
                String filePath = SimpleHttpServer.class.getResource(header.split(" ")[1]).getPath();
                out = new PrintWriter(socket.getOutputStream());
                // 如果請求資源的后綴為 jpg 或 ico,則讀取資源並輸出
                if (filePath.endsWith("jpg") || filePath.endsWith("ico")) {
                    in = new FileInputStream(filePath);
                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
                    int i;
                    while ((i = in.read()) != -1) {
                        baos.write(i);
                    }
                    byte[] array = baos.toByteArray();
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: YeeQ");
                    out.println("Content-Type: image/jpeg");
                    out.println("Content-Length: " + array.length);
                    out.println("");
                    socket.getOutputStream().write(array, 0, array.length);
                } else {
                    br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
                    out = new PrintWriter(socket.getOutputStream());
                    out.println("HTTP/1.1 200 OK");
                    out.println("Server: YeeQ");
                    out.println("Content-Type: text/html; charset=UTF-8");
                    out.println("");
                    while ((line = br.readLine()) != null) {
                        out.println(line);
                    }
                }
                out.flush();
            } catch (Exception e) {
                if (out != null) {
                    out.println("HTTP/1.1 500");
                    out.println("");
                    out.flush();
                }
            } finally {
                close(br, in, reader, out, socket);
            }
        }
    }

    /**
     * 關閉流或者socket
     */
    private static void close(Closeable... closeables) {
        if (closeables != null) {
            for (Closeable closeable : closeables) {
                if (closeable != null) {
                    try {
                        closeable.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        SimpleHttpServer.start();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM