線程池ThreadPoolExecutor整理


項目用到線程池,但是其實很多人對原理並不熟悉 ,這里只是整理一下

ThreadPoolExecutor

  java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類

  • 構造方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
  • 參數

    corePoolSize               核心線程數
    maximumPoolSize            最大線程數 阻塞隊列裝不下的后 總得線程數  包含corePoolSize 
    keepAliveTime             超時時間 線程池中當前的空閑線程服務完某任務后的存活時間。如果時間足夠長,那么可能會服務其它任務
    unit               時間單位
    workQueue             阻塞隊列 線程數大於核心線程后放到隊列中
    threadFactory         線程池工廠
    handler              拒絕策略 阻塞隊列滿了,也達到了最大線程數 執行拒絕策略

    • corePoolSize:核心池的大小,在創建了線程池后,即在沒有任務到來之前就創建  默認情況下,在創建了線程池后,線程池中的線程數為0,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法來預創建線程.當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;
    • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
  • 流程

    1)當池子大小 小於corePoolSize就新建線程,並處理請求
    2)當池子大小 等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去從workQueue中取任務並處理
    3)當workQueue放不下新入的任務時,新建線程入池,並處理請求(不用等待隊列),如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理
    4)另外,當池子的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀

    處理步驟:  核心線程 << 阻塞隊列 <<最大線程數

 

  • 通俗流程解釋

    假如有一個工廠,工廠里面有10個工人,每個工人同時只能做一件任務。因此只要當10個工人中有工人是空閑的,來了任務就分配給空閑的工人做;
    當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;
    如果說新任務數目增長的速度遠遠大於工人做任務的速度,那么此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;
    然后就將任務也分配給這4個臨時工人做;
    如果說着14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。
    當這14個工人當中有人空閑時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的.
    這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

    也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。 

 

  • 阻塞隊列

    1)ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;

    2)LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;

    3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務

 

  • 拒絕策略 四種

    AbortPolicy 默認 直接拋棄 並拋異常
    DiscardPolicy 直接拋棄 不拋異常
    CallerRunsPolicy 在主線程中執行
    DiscardOldestPolicy 把注冊隊列中最老的拋棄掉 執行當前的
    自定義的策略 實現RejectedExecutionHandler即可

  • 測試案例
public class Test {
     public static void main(String[] args) {   
         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<Runnable>(5));
 
         for(int i=0;i<15;i++){
             MyTask myTask = new MyTask(i);
             executor.execute(myTask);
             System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
             executor.getQueue().size()+",已執行完別的任務數目:"+executor.getCompletedTaskCount());
         }
         executor.shutdown();
     }
}
 
class MyTask implements Runnable {
    private int taskNum;
 
    public MyTask(int num) {
        this.taskNum = num;
    }
 
    @Override
    public void run() {
        System.out.println("正在執行task "+taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task "+taskNum+"執行完畢");
    }
}
  • 執行結果
正在執行task 0
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 1
線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 2
線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 3
線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行完別的任務數目:0
正在執行task 4
線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行完別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
正在執行task 10
線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
正在執行task 11
線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
正在執行task 12
線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
正在執行task 13
線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行完別的任務數目:0
正在執行task 14
task 3執行完畢
task 0執行完畢
task 2執行完畢
task 1執行完畢
正在執行task 8
正在執行task 7
正在執行task 6
正在執行task 5
task 4執行完畢
task 10執行完畢
task 11執行完畢
task 13執行完畢
task 12執行完畢
正在執行task 9
task 14執行完畢
task 8執行完畢
task 5執行完畢
task 7執行完畢
task 6執行完畢
task 9執行完畢
  • 如何合理的配置線程池
    • 要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

      1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
      2. 任務的優先級:高,中和低。
      3. 任務的執行時間:長,中和短。
      4. 任務的依賴性:是否依賴其他系統資源,如數據庫連接。

      任務性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務配置盡可能少的線程數量,如配置Ncpu+1個線程的線程池。IO密集型任務則由於需要等待IO操作,線程並不是一直在執行任務,則配置盡可能多的線程,如2*Ncpu。混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐率要高於串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。

      優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要注意的是如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行。

      執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。

      依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,如果等待的時間越長CPU空閑時間就越長,那么線程數應該設置越大,這樣才能更好的利用CPU。

      建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。有一次我們組使用的后台任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后台任務線程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻塞住,任務積壓在線程池里。如果當時我們設置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內存,導致整個系統不可用,而不只是后台任務出現問題。當然我們的系統所有的任務是用的單獨的服務器部署的,而我們使用不同規模的線程池跑不同類型的任務,但是出現這樣問題時也會影響到其他任務。

  • 最佳實踐
    //創建線程池指定有意義的線程名字, 方便出錯是回溯   
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build();
    ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

    singleThreadPool.execute(()-> System.out.println(Thread.currentThread().getName())
    );
   
    public class TimerTaskThread extends Thread {
        public TimerTaskThread(){
            super.setName("TimerTaskThread"); 
           …
        }    
        …
   }    

 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM