線程池的使用


什么時候需要用線程池?

答:線程的創建比較昂貴(創建線程需要系統資源,頻繁創建和銷毀消耗大量時間,導致性能問題);短平快的任務(接收大量小任務)進行分發處理使用線程池而不是一個任務對應一個新線程。

線程池的使用需要注意哪些?

1. 需要手動聲明線程池

《阿里巴巴Java開發手冊》中講,禁止使用快捷的工具方法創建線程池,而應手動new ThreadPoolExecutor來創建線程池。

注意:newFixedThreadPool和newCachedThreadPool,可能因為資源耗盡導致OOM問題。

2.注意線程池幾個關鍵參數,創建合適的線程池

1. corePoolSize:線程池核心線程大小
2. maximumPoolSize: 線程池最大線程數量
3. keepAliveTime: 空閑線程存活時間
4. unit:空閑線程存活時間單位
5. workQueue: 工作隊列 (1.ArrayBlocking 2.LinkedBlockingQueue 3.SynchronizedQueue 4.PriorityBlockingQueue)
6. ThreadFactory:線程工廠
7. Handler 拒絕策略

3. 熟悉線程池默認工作行為,為線程池設置合適的初始化參數

線程池默認工作行為:
1. 線程池不會初始化corePoolSize個線程,有任務來了才創建工作線程; 2. 當核心線程滿了之后不會立即擴容線程池,而是把任務堆積到工作隊列中; 3. 當工作隊列滿了后擴容線程池,一直到線程個數達到maximumPoolSize為止; 4. 如果隊列已滿且達到了最大線程后還有任務進來,按照拒絕策略處理; 5. 當線程數大於核心線程數時,線程等待keepAliveTime后還是沒有任務需要處理的話,收縮線程到核心線程數。

4. 根據實際容量規划需求,通過一些手段改變默認工作行為

1. 聲明線程池后立即調用prestartAllCoreThreads方法,來啟動所有核心線程池;
2. 傳入true給allCoreThreadTimeOut方法,來讓線程池在空閑的時候同樣回收核心線程。
3. 方法: 
- Java線程池先用工作隊列存放來不及處理的任務,滿了后再擴容。
- 當工作隊列設置很大時,最大線程數這個參數顯得無意義,可以優先開啟更多線程,而把隊列當成一個后備方案。
4. 思路:
- 重寫隊列offer方法,造成這個隊列已滿的假象;
- 這樣達到最大線程后會觸發拒絕策略,實現一個自定義的拒絕策略處理程序;
- 再把任務真正插入隊列。Tomcat線程池也實現了類似的效果(點擊查看)。

 5. 注意線程池的混用

要根據任務的‘輕重緩急’來指定線程池的核心參數,包括線程數、回收策略和任務隊列
- 對於執行比較慢、數量不大的IO任務,或許要考慮更多的線程數,而不需要太大的隊列
- 對於吞吐量較大的計算型任務,線程數量不宜過多,可以是CPU核數*2(CPU 密集型任務,過多的線程只會增加線程切換的開銷),但可能需要較長的隊列來做緩沖。
- Java 8 的 parallel stream 功能,可以讓我們很方便地並行處理集合中的元素,其背后是共享同一個 ForkJoinPool,默認並行度是 CPU 核數 -1- 對於 CPU 綁定的任務來說,使用這樣的配置比較合適,但如果集合操作涉及同步 IO 操作的話(比如數據庫操作、外部服務調用等),建議自定義一個 ForkJoinPool(或普通線程池)。

6. 確保線程池是在復用的

- 使用線程池目的是復用,每次new一個線程池會很糟糕;
- 如果從類庫來獲取一個線程池(標准/自定義類庫),請務必查看源碼,以確認線程實例化方式和配置符合預期。
- 復用線程池,要根據任務的性質來選用不同的線程池;
- IO綁定任務與CPU綁定任務,如果希望減少任務間干擾,需要隔離線程池。

示例: 

//案例:線上遇到的一個事故:當天直播,我們調用一個外部服務去發送短信,發送短信接口正常時可以在100ms內響應,TPS100的發送量,CachedThreadPool能穩定在占用10個左右線程的情況下滿足需求。
//在某個時間段,外部短信服務不可用了(貌似是短信平台調整敏感詞),我們調用這個服務的超時又特別長,比如1分鍾,直播前5分鍾要發送短信,而1分鍾產生 6000 個發送短信的任務,需要6000個線程,
//沒多久因為無法創建線程導致了OOM,整個應用程序崩潰。
//我們知道線程是需要分配一定的內存空間作為線程棧的,比如1MB,因此無限制創建線程必然會導致OOM
//從源碼中可以看到,newCachedThreadPool這種線程池的最大線程數是Integer.MAX_VALUE,可以認為是無上限的,而其工作隊列SynchronousQueue是一個沒有存儲空間的阻塞隊列
//這就以為着,只要有請求到來,就必須找到一條工作線程來處理,如果當前沒有空閑的線程就再創建一條新的。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
              60L, TimeUnit.SECONDS,
              new SynchronousQueue());
}
//這里的核心線程數是0,keepAliveTime是60秒,在60秒之后所有的線程都是可以回收的。
//所以這就會出現如果業務操作並發量較大,可能一下子開啟幾千個線程而又不會撐爆內存。下邊案例就是。
 
         
//案例:某項目生產環境時不時有報警提示線程數過多,超過2000個,收到報警后查看監控發現,瞬時線程數比較多但過一會兒又降下來,線程數抖動很力換,而應用的訪問量變化不大。
//wrong:每次都創建一個線程池
class ThreadPoolHelper {
    public static ThreadPoolExecutor getThreadPool() {
        //線程池沒有復用
        return (ThreadPoolExecutor) Executors.newCachedThreadPool();
    }
}
//right:使用靜態字段來存放線程池的引用
class ThreadPoolHelper {
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50,
            2, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000),
            new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get());

    public static ThreadPoolExecutor getRightThreadPool() {
        return threadPoolExecutor;
    }
}

監控

//簡單的打印線程池基本內部信息,包括線程數、活躍線程數、完成了多少任務,以及隊列中還有多少積壓任務等信息:
private void printStats(ThreadPoolExecutor threadPool) {
   Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
        log.info("=========================");
        log.info("Pool Size: {}", threadPool.getPoolSize());
        log.info("Active Threads: {}", threadPool.getActiveCount());
        log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
        log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());

        log.info("=========================");
    }, 0, 1, TimeUnit.SECONDS);
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM