池化技術——自定義線程池


池化技術——自定義線程池

1、為什么要使用線程池?

池化技術

1.1、池化技術的特點:

  1. 程序的運行,本質:占用系統的資源! 優化資源的使用!=>池化技術

  2. 線程池、連接池、內存池、對象池///..... 創建、銷毀。十分浪費資源

  3. 池化技術:事先准備好一些資源,有人要用,就來我這里拿,用完之后還給我。

1.2、線程池的好處:

  1. 降低資源的消耗
  2. 降低資源的消耗
  3. 方便管理。

核心:線程復用、可以控制最大並發數、管理線程

1.3、如何自定義一個線程池

牢記:三大方法、7大參數、4種拒絕策略

2、三大方法

三大方法

在java的JDK中提夠了Executors開啟JDK默認線程池的類,其中有三個方法可以用來開啟線程池。

2.1、單個線程的線程池方法

ExecutorService threadPool = Executors.newSingleThreadExecutor();    //單個線程的線程池

該方法開啟的線程池,故名思義該池中只有一個線程。

2.2、固定的線程池的大小的方法

ExecutorService threadPool = Executors.newFixedThreadPool(5);    //固定的線程池的大小

其中方法中傳遞的int類型的參數,就是池中的線程數量

2.3、可伸縮的線程池的方法

ExecutorService threadPool = Executors.newCachedThreadPool();        //可伸縮

該方法創建的線程池是不固定大小的,可以根據需求動態的在池子里創建線程,遇強則強。

2.4、完整的測試代碼為:

package com.xgp.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 工具類,三大方法
 */
public class Demo01 {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newSingleThreadExecutor();    //單個線程的線程池
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);    //固定的線程池的大小
//        ExecutorService threadPool = Executors.newCachedThreadPool();        //可伸縮

       try{
           for(int i = 0;i < 10;i++) {
               //使用線程池去創建
               threadPool.execute(() -> {
                   System.out.println(Thread.currentThread().getName() + " OK");
               });
           }
       }catch (Exception e) {
           e.printStackTrace();
       }finally {
           //關閉線程池
           threadPool.shutdown();
       }
    }
}

將三行注釋部分依次打開的運行結果為:

pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK

上述運行結果為單個線程的線程池的結果:可以看出的確只有一條線程在執行。

pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-5 OK

上述運行結果為固定線程的線程池的結果:因為固定的大小為5,可以看出的確有5條線程在執行。

pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
pool-1-thread-7 OK
pool-1-thread-9 OK
pool-1-thread-10 OK
pool-1-thread-8 OK
pool-1-thread-6 OK

上述運行結果為彈性線程的線程池的結果:可以看出的確有10條線程在執行。

3、為什么要自定義線程池?三大方法創建線程池的弊端分析

  1. 在單個線程池和固定大小的線程池中,因為處理的線程有限,當大量的請求進來時,都會在阻塞隊列中等候,而允許請求的隊列長度為Integet.MAX_VALUE,整數的最大值約為21億,會導致JVM內存溢出。

  2. 在彈性伸縮的線程池中,允許創建的線程數量為Integet.MAX_VALUE,可能會創建大量的線程,使得Jvm內存溢出。

    對於上述的兩點,其數值會在后面分析源碼的環節看到,關於這一點,在阿里巴巴開發手冊中有着詳細的說明,並極力推薦采用自定義線程池,而不使用這三大方法。

4、七大參數

七大參數

源碼分析:我們要指定義自己的線程池,先從源碼的角度看一看JDK現有的三個線程池是如何編寫的。

	public static ExecutorService newSingleThreadExecutor() {
    	return new FinalizableDelegatedExecutorService
      	  (new ThreadPoolExecutor(1, 1,
           	                     0L, TimeUnit.MILLISECONDS,
          	                      new LinkedBlockingQueue<Runnable>()));
	}

	public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

從三大方法的源代碼中可以看出,三種線程池都是new 了一個 ThreadPoolExecutor 對象,點擊源碼中看看。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

這里調用了一個 this() 方法,在點擊進去一看。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

到這里就可以很明顯的看出本節想要講述的7大參數,是哪7大了吧。根據英文意思,可以很容易的說明這七大參數的意思了。

    public ThreadPoolExecutor(int corePoolSize,   //核心線程數
                              int maximumPoolSize,  //最大線程數
                              long keepAliveTime,   //超時等待
                              TimeUnit unit,        //超時等待的單位
                              BlockingQueue<Runnable> workQueue,    //阻塞隊列
                              ThreadFactory threadFactory,      //線程池工廠
                              RejectedExecutionHandler handler) {     //拒絕策略

在阿里巴巴開發手冊中,推薦的也是使用 ThreadPoolExecutor 來進行創建線程池的。

這里可以用一張在銀行辦理業務的圖來生動的說明這七大參數。

這里,解釋下這張圖對應的含義:

  1. 銀行在人很少的時候也只開放兩個窗口,並且什么時候都不會進行關閉。——核心線程數

  2. 當人數大於2又小於5人時,后來的三人就在候客區等候辦理業務。——阻塞隊列

  3. 當人數大於5人又小於8人時,另外三個正在關閉的窗口需要開放進行辦理業務,於是乎就有了5個窗口在進行辦理業務。——最大線程數

  4. 將開啟其他三個窗口,需要領導將這三個窗口的員工叫回。——線程池工廠

  5. 當人數實在太多時,銀行都擠不下了,此時就會把門關了,不接受新的服務了。——拒絕策略

  6. 當銀行人數又變少時,另外的三個非核心窗口太久沒又生意,為了節約成本,則又會進行關閉。——超時等待

    通過對上述7大參數的分析,同學們也能夠更加理解JDK自帶的三大方法的弊端,以及為什么是整數的最大值的這個數值。

5、如何手動的去創建一個線程池

手動創建連接池

七大參數的說明也都講了,於是乎我們可以仿照這七大參數,來定義一個自己的線程池。對於其中的線程工廠,我們也一般采用默認的工廠,而其中的拒絕策略我們可以通過源碼分析,先使用三大方法使用的拒絕策略。

點擊進入 defaultHandler 的源碼中可以看到。

    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

其中的 new AbortPolicy(); 就是三大方法使用的拒絕策略,我們先仿照銀行的例子,自定義一個線程池。代碼如下:

package com.xgp.pool;

import java.util.concurrent.*;

/**
 * 自定義線程池
 */
public class Demo02 {
/*    public ThreadPoolExecutor(int corePoolSize,   //核心線程數
                              int maximumPoolSize,  //最大線程數
                              long keepAliveTime,   //超時等待
                              TimeUnit unit,        //超時等待的單位
                              BlockingQueue<Runnable> workQueue,    //阻塞隊列
                              ThreadFactory threadFactory,      //線程池工廠
                              RejectedExecutionHandler handler) {*/     //拒絕策略
    public static void main(String[] args) {
        ExecutorService pool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()    //會拋出異常的實現類
//                new ThreadPoolExecutor.CallerRunsPolicy()  //哪來的去哪里
//                new ThreadPoolExecutor.DiscardOldestPolicy()    //不會拋出異常,會丟掉任務
//                new ThreadPoolExecutor.AbortPolicy()        //嘗試會和第一個競爭
                );


        try{
            for(int i = 0;i < 8;i++) {
                //使用線程池去創建
                pool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " OK");
                });
            }
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            //關閉線程池
            pool.shutdown();
        }
    }
}

於是乎,我們完成了一個自定義的線程池,核心線程數為2,最大線程數為5,超時等待的秒數為3s,阻塞隊列的長度為3。

6、四種拒絕策略

四種拒絕策略

通過分析源碼,可以知道三大方法默認的拒絕策略在ThreadPoolExecutor這個類中,由於該類較為復雜,尋找起來不方便,於是我們可以采用IDEA的代碼提示功能,非常明顯的提示出了四種拒絕策略,也就是上面自定義線程池中的被注釋部分。

將上面自定義線程池的代碼注釋一一打開,我們來進行測試:

6.1、會拋出異常的拒絕策略

                new ThreadPoolExecutor.AbortPolicy()    //會拋出異常的實現類

該拒絕策略運行的結果為:

pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-5 OK
java.util.concurrent.RejectedExecutionException: Task com.xgp.pool.Demo02$$Lambda$1/2093631819@378bf509 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 4, queued tasks = 0, completed tasks = 4]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at com.xgp.pool.Demo02.main(Demo02.java:40)

該策略就是當最大線程數+阻塞隊列數都不滿足請求數時,系統將拋出異常來進行解決。

6.2、哪來的去哪里拒絕策略

                new ThreadPoolExecutor.CallerRunsPolicy()  //哪來的去哪里

該拒絕策略運行的結果為:

pool-1-thread-1 OK
main OK
main OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-5 OK

可以看出,該拒絕策略當線程池的線程數不能夠滿足需求時,會將不能服務的任務打道回府,即交給main線程來解決,該拒絕策略適用於原來的線程能夠解決問題的情況。

6.3、丟掉任務拒絕策略

                new ThreadPoolExecutor.DiscardOldestPolicy()    //不會拋出異常,會丟掉任務

該拒絕策略運行的結果為:

pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-4 OK
pool-1-thread-5 OK

數一數,一共是10個任務,但根據執行的情況只處理了8個任務,該拒絕策略將不能夠分配線程執行的任務全部丟棄了,會造成數據的丟失。

6.4、嘗試競爭拒絕策略

                new ThreadPoolExecutor.DiscardPolicy()      //嘗試會和第一個競爭

該拒絕策略運行的結果為:

pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-4 OK
pool-1-thread-5 OK

數一數,一共是10個任務,但根據執行的情況只處理了9個任務,其中競爭成功了一個,失敗了一個。該策略將會於最早進來的線程進行競爭,類似於操作系統中的搶占式短作業優先算法,該拒絕策略同樣會造成數據的丟失。

7、關於最大線程數應該如何確定

7.1、CPU密集型

CPU密集型

對於有多核Cpu的電腦,應該讓cpu充分忙碌起來,不要低於Cpu的核數,並且不應該在代碼中寫死,而是應該能夠自動的獲取當前機器的Cpu核數,獲取的代碼如下:

System.out.println(Runtime.getRuntime().availableProcessors());

7.2、IO密集型

IO密集型

對於系統中有大量IO任務時,應該要預留出足夠的線程來處理IO任務,因為IO任務極度耗時。如果判斷出系統中的IO密集的任務有10個,則定義的線程數量需要大於10。

7.3、公式總結

最大線程數	=	機器核素*2	+	IO密集型任務數

對於上述該公式,只是網上的一種總結,作者也沒有進行深入的測試於了解,讀者應該根據自己的業務需要進行合理的調整。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM