線程池-線程池的好處


1.線程池的好處。

線程使應用能夠更加充分合理的協調利用cpu 、內存、網絡、i/o等系統資源。

線程的創建需要開辟虛擬機棧,本地方法棧、程序計數器等線程私有的內存空間。

在線程的銷毀時需要回收這些系統資源。頻繁的創建和銷毀線程會浪費大量的系統資源,增加並發編程的風險。

另外,在服務器負載過大的時候,如何讓新的線程等待或者友好的拒絕服務?這些丟失線程自身無法解決的。所以需要通過線程池協調多個線程,並實現類似主次線程隔離、定時執行、周期執行等任務。線程池的作用包括:

  • 利用線程池管理並復用線程、控制最大並發數等。
  • 實現任務線程隊列緩存策略和拒絕機制。
  • 實現某些與時間相關的功能,如定時執行、周期執行等。
  • 隔離線程環境。比如,交易服務和搜索服務在同一台服務器上,分別開啟兩個線程池,交易線程的資源消耗明顯要大;因此,通過配置獨立的線程池,將較慢的交易服務與搜索服務隔開,避免個服務線程互相影響。

在了解線程池的基本作用后,我們學習一下線程池是如何創建線程的。首先從ThreadPoolExecutor構造方法講起,學習如何定義ThreadFectory和RejectExecutionHandler,並編寫一個最簡單的線程池示例。然后,通過分析ThreadPoolExecutor的execute和addWorker兩個核心方法,學習如何把任務線程加入到線程池中運行。ThreadPoolExecutor的構造方法如下:

 1 public ThreadPoolExecutor(
 2                               int corePoolSize,        //第1個參數
 3                               int maximumPoolSize, //第2個參數
 4                               long keepAliveTime, //第3個參數
 5                               TimeUnit unit, //第4個參數
 6                               BlockingQueue<Runnable> workQueue, //第5個參數
 7                               ThreadFactory threadFactory, //第6個參數
 8                               RejectedExecutionHandler handler) { //第7個參數
 9         if (corePoolSize < 0 ||
10             maximumPoolSize <= 0 || 
11             maximumPoolSize < corePoolSize || 
12             keepAliveTime < 0) // 第一處
13             throw new IllegalArgumentException();
14         if (workQueue == null || threadFactory == null || handler == null)//第二處
15             throw new NullPointerException();
16         this.corePoolSize = corePoolSize;
17         this.maximumPoolSize = maximumPoolSize;
18         this.workQueue = workQueue;
19         this.keepAliveTime = unit.toNanos(keepAliveTime);
20         this.threadFactory = threadFactory;
21         this.handler = handler;
22     }            
  • 第1個參數 :corePoolSize 表示常駐核心線程數。如果等於0,則任務執行完成后,沒有任何請求進入時銷毀線程池的線程;如果大於0,即使本地任務執行完畢,核心線程也不會被銷毀。這個值的設置非常關鍵,設置過大會浪費資源,設置的過小會導致線程頻繁地創建或銷毀。
  • 第2個參數:maximumPoolSize 表示線程池能夠容納同時執行的最大線程數。從上方的示例代碼中第一處來看,必須大於或等於1。如果待執行的線程數大於此值,需要借助第5個參數的幫助。緩存在隊列中。如果maximumPoolSize 與corePoolSize 相等,即是固定大小線程池。
  • 第3個參數:keepAliveTime 表示線程池中的線程空閑時間,當空閑時間達到KeepAliveTime 值時,線程被銷毀,直到剩下corePoolSize 個線程為止,避免浪費內存和句柄資源。在默認情況下,當線程池的線程大於corePoolSize 時,keepAliveTime 才會起作用。但是ThreadPoolExecutor的allowCoreThreadTimeOut 變量設置為ture時,核心線程超時后也會被回收。
  • 第4個參數:TimeUnit 表示時間單位。keepAliveTime 的時間單位通常是TimeUnit.SECONDS。
  • 第5個參數:   workQueue 表示緩存隊列。當請求的線程數大於maximumPoolSize時,線程進入BlockingQueue 阻塞隊列。后續示例代碼中使用的LinkedBlockingQueue 是單向鏈表,使用鎖來控制入隊和出對的原子性,兩個鎖分別控制元素的添加和獲取,是一個生產消費模型隊列。
  • 第6個參數:threadFactory 表示線程工廠。它用來生產一組相同任務的線程。線程池的命名是通過給這個factory增加組名前綴來實現的。在虛擬機棧分析時,就可以知道線程任務是由哪個線程工廠產生的。
  • 第7個參數:handler 表示執行拒絕策略的對象。當超過第5個參數workQueue的任務緩存區上限的時候,就可以通過該策略處理請求,這是一種簡單的限流保護。友好的拒絕策略可以使如下三種:
  1. 保存到數據庫進行削峰填谷。在空閑的時候再拿出來執行。
  2. 轉向某個提示頁面。
  3. 打印日志。

從代碼第2處來看,隊列、線程工程、拒絕處理服務都必須有實例對象,但在實際編碼中,很少有程序員對着三者進行實例化,而通過Executors這個線程池靜態工廠提供默認實現,那么Executors與ThreadPoolExecutor 是什么關系呢?線程池相關的類圖

ExecutorService接口繼承了Executor接口,定義了管理線程任務的方法。ExecutorService 的抽象類AbstractExecutorService 提供了 submit()、invokeAll()等部分方法的實現,但是核心方法Executor.execute() 並沒有在這里實現。因為所有的任務都在這個方法里執行,不同的實現會帶來不同的執行策略,這一點在后續的ThreadPoolExecutor解析時,會一步步分析。通過Executor的靜態工廠方法可以創建三個線程池的包裝對象:ForkJoinPool、ThreadPoolExecutor、ScheduledThreadPoolExecutor。Executors核心方法有5個:

  • Executors.newWorkStealingPool:JDK8 引入,創建持有足夠線程的線程池,支持給定的並行堵,並通過使用對個隊列減少競爭,此構造方法中把cpu的數量設置為模型的並行度:
 1 /**
 2      * Creates a work-stealing thread pool using all
 3      * {@link Runtime#availableProcessors available processors}
 4      * as its target parallelism level.
 5      * @return the newly created thread pool
 6      * @see #newWorkStealingPool(int)
 7      * @since 1.8
 8      */
 9     public static ExecutorService newWorkStealingPool() {
10         return new ForkJoinPool
11             (Runtime.getRuntime().availableProcessors(),
12              ForkJoinPool.defaultForkJoinWorkerThreadFactory,
13              null, true);
14     }
  • Executors.newCachedThreadPool : maximumPoolSize 最大可以至Integer.MAX_VALUE,是高度可以伸縮的線程池,如果達到這個上限,相信沒有任何服務器能夠繼續工作,肯定會拋出OOM異常。keepAliveTime默認為60秒,工作線程處於空閑狀態,則回收工作線程。如果任務書增加,再次創建新的線程處理任務。
  • Executors.new ScheduledThreadPool: 線程最大至Integer.MAX_VALUE ,與上述相同,存在OOM風險。它是ScheduledExecutorService 接口家族的實現類,支持定時及周期性任務執行。相比Timer,ScheduledExextuorService 更安全,功能更強大,與newCachedThreadExecutor 的區別是不回收工作線程。
  • Executors.newSingleThreadExecutor:創建一個單線程的線程池,相當月單線程串行執行所有任務,保證按任務提交的順序依次執行。
  • Executors.newFixedThreadPool: 輸入的參數即是固定線程數,既是核心線程數也是最大線程數,不存在空閑線程,所有keepAliveTime 等於0:
    1  public static ExecutorService newFixedThreadPool(int nThreads) {
    2         return new ThreadPoolExecutor(nThreads, nThreads,
    3                                       0L, TimeUnit.MILLISECONDS,
    4                                       new LinkedBlockingQueue<Runnable>());
    5     }

    這里,輸入的隊列沒有指明長度,下面介紹LinkedBlockingQueue的構造方法。

    1 public LinkedBlockingQueue() {
    2         this(Integer.MAX_VALUE);
    3     }

    使用這樣的無界隊列,如果瞬間請求非常大,會有OOM的風險。除newWorkStealingPool 外,其他四個創建方式都存在資源耗盡的風險。

Executors 中默認的線程工程和拒絕策略過於簡單,通常對用戶不夠友好。線程工廠需要做創建前的准備工作,對線程池創建的線程必須明確標識,就像葯品的生產批號一樣,為線程本身指定有意思的名稱和相應的序列號。拒絕策略應該考慮到業務場景返回相應的提示或者友好的跳轉 1 public class UserThreadFactory implements ThreadFactory { 2 private final String namePrefix;

 3 private final AtomicInteger nextId = new AtomicInteger();  4 //定義線程組名稱,在使用jstack 來排查問題是,非常有幫助  5  UserThreadFactory(String whatFeatureOfGroup){  6 namePrefix = "UserThreadFactory's"+whatFeatureOfGroup+"-Worker-";  7  }  8  9  @Override 10 public Thread newThread(Runnable task){ 11 String name = namePrefix+ nextId.getAndIncrement(); 12 Thread thread = new Thread(null,task,name,0);
//打印threadname
13 return thread; 14 } 15 16 public static void main(String[] args) { 17 UserThreadFactory threadFactory = new UserThreadFactory("你好"); 18 String ss = threadFactory.namePrefix; 19 Task task = new Task(ss); 20 Thread thread = null; 21 for(int i = 0; i < 10; i++) { 22 thread = threadFactory.newThread(task); 23 thread.start(); 24 } 25 26 } 27 } 28 29 class Task implements Runnable{ 30 String poolname; 31 Task(String poolname){ 32 this.poolname = poolname; 33 } 34 private final AtomicLong count = new AtomicLong(); 35 36 @Override 37 public void run(){ 38 System.out.println(poolname+"_running_"+ count.getAndIncrement()); 39 } 40 }

上述示例包括線程工廠和任務執行體的定義,通過newThread 方法快速、統一地創建線程任務,強調線程一定要是歐特定的意義和名稱,方便出錯時回溯。

下面簡單地實現一下RejectedExecutionHandler,實現了接口的rejectExecution方法,打印當前線程池狀態,源碼如下。

1 public class UserRejectHandler implements RejectedExecutionHandler {
2     
3     @Override
4     public void rejectedExecution(Runnable task, ThreadPoolExecutor executor){
5         System.out.println("task rejected."+ executor.toString());
6     }
7 }

在ThreadPoolExecutor 中提供了4個公開的內部靜態類:

  • AbortPolicy (默認):丟棄任務並拋出RejectExecutionException 異常。
  • DiscardPolicy:丟棄任務,但是不拋出異常,這是不推薦的做法。
  • DiscardOldestPolicy : 拋棄隊列中等待最久的任務,然后把當前任務加入隊列中。
  • CallerRunsPolicy : 調用任務的run方法繞過線程池直接執行。

根據之前實現的線程工廠和拒絕策略,線程池的相關代碼實現如下:

public class UserThreadPool {
    public static void main(String[] args) {
        BlockingQueue queue = new LinkedBlockingQueue(2);
        UserThreadFactory f1 = new UserThreadFactory("第一機房");
        UserThreadFactory f2 = new UserThreadFactory("第二機房");
        UserRejectHandler handler = new UserRejectHandler();
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(1,2,60,
                        TimeUnit.SECONDS,queue,f1,handler);
        ThreadPoolExecutor threadPoolExecutor2 =
                new ThreadPoolExecutor(1,2,60,
                        TimeUnit.SECONDS,queue,f2,handler);
        Runnable task1 = new Task("");
        for (int i = 0;i<200;i++){
            threadPoolExecutor.execute(task1);
            threadPoolExecutor2.execute(task1);
        }
    }
}

當任務被拒絕的時候,拒絕策略會打印出當前線程池的大小以及達到了maximumPoolSize=2 ,且隊列已滿。

 

我的博客即將同步至騰訊雲+社區,邀請大家一同入駐:https://cloud.tencent.com/developer/support-plan?invite_code=5cjrpedpyda8

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM