Java並發——ThreadPoolExecutor線程池解析及Executor創建線程常見四種方式


前言:

  在剛學Java並發的時候基本上第一個demo都會寫new Thread來創建線程。但是隨着學的深入之后發現基本上都是使用線程池來直接獲取線程。那么為什么會有這樣的情況發生呢?

new Thread和線程池的比較

  每次new Thread是新建了線程對象,並且不能重復使用,為什么不能重復使用?因為new是相當於在內存中獨立開辟一個內存來讓該線程運行,所以只能釋放線程資源和新建線程,性能差。而使用線程池,可以重復使用存在的線程,減少對象的創建,消亡的開銷,性能較好

  線程缺乏統一管理,可能無限制的新建線程,相互競爭,有可能占用過多的系統資源導致死機或者拋出OutOfMemoryError。而使用線程池可以有效控制最大並發線程數,提高系統資源利用率,同時避免過多資源競爭,避免阻塞

  同時new Thread,當我們需要定期執行,更多執行,線程中斷等等使用Thread操作起來非常的繁瑣。線程池則提供定時執行,定期執行,單線程,並發控制等功能,讓我們操作線程起來特別方便

ThreadPoolExecutor如何創建對象

  在這里介紹的是JUC包下的ThreadPoolExecutor線程池,這個線程池里有4個構造方法

public class ThreadPoolExecutor extends AbstractExecutorService{
//第一個構造方法
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
//第二個構造方法
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
//第三個構造方法
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
//第四個也是真正的初始化構造函數
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
}

  從這四個函數來看,其實是分是否需要默認的線程池工廠和handler。接下來就講講這些參數代表什么。

  corePoolSize:核心線程數量。當線程數少於corePoolSize的時候,直接創建新的線程,盡管其他線程是空閑的。當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;

  maximunPoolSize:線程池最大線程數。如果線程數量少於線程最大數且大於核心線程數量的時候,只有當阻塞隊列滿了才創建新線程。當線程數量大於最大線程數且阻塞隊列滿了這時候就會執行一些策略來響應該線程。

  workQueue:阻塞隊列,存儲等待執行的任務,會對線程池的運行產生很大的影響。當提交一個新的任務到線程池的時候,線程池會根據當前線程數量來選擇不同的處理方式

    直接切換隊列SynchronousQueue:該隊列傳遞任務到線程而不持有它們。在這一點上,試圖向該隊列壓入一個任務,如果沒有可用的線程立刻運行任務,那么就會入列失敗,所以一個新的線程就會被創建。當處理那些內部依賴的任務集合時,這個選擇可以避免鎖住。直接接傳遞通常需要無邊界的最大線程數來避免新提交任務被拒絕處理。當任務以平均快於被處理的速度提交到線程池時,它依次地確認無邊界線程增長的可能性

    使用無界隊列LinkedBlockingQueue:使用這個隊列的話,沒有預先定義容量的無界隊列,最大線程數是為corePoolSize,在核心線程都繁忙的時候會使新提交的任務在隊列中等待被執行,所以將不會創建更多的線程,這時候,maximunPoolSize最大線程數的值將不起作用。當每個任務之間是相互獨立的時比較適合該隊列,任務之間不能互相影響執行。

    使用有界隊列ArrayBlockingQueue:使用這個隊列,線程池中的最大線程數量就是maximunPoolSize,能夠降低資源消耗,但是卻使得線程之間調度變得更加困難,因為隊列容量和線程池都規定完了。

    如果想降低系統資源消耗,包括CPU使用率,操作系統資源消耗,上下文切換開銷等等,可以設置一個較大的隊列容量,較小的maximunPoolSize。如果線程經常發生阻塞,那么可以稍微將maximunPoolSize設置大一點

  keepAliveTime:線程沒有任務執行最多保持多久時間終止。也就是當線程數量超過核心線程數量的時候,且小於最大線程數量,這一部分的線程在沒有任務執行的時候是會保持直到超過keepAliveTime才會銷毀

  unit:keepAliveTime的時間單位

  threadFactory:線程工廠,用來創建線程,當使用默認的線程工廠創建線程的時候,會使得線程具有相同優先級,並且設置了守護性,同時也設置線程名稱

  handler:拒絕策略。當workQueue滿了,並且沒有空閑的線程數,即線程達到最大線程數。就會有四種不同策略來處理

    直接拋出異常(默認)

    使用調用者所在線程執行任務

    丟棄隊列中最靠前的任務,並執行當前任務

    直接丟棄當前任務

  這四種就是對應的策略實現類(從上到下),是在ThreadPoolExecutor中的內部類

線程池五種狀態

  RUNNING:在這個狀態的線程池能判斷接受新提交的任務,並且也能處理阻塞隊列中的任務

  SHUTDOWN:處於關閉的狀態,該線程池不能接受新提交的任務,但是可以處理阻塞隊列中已經保存的任務,在線程處於RUNNING狀態,調用shutdown()方法能切換為該狀態。

  STOP:線程池處於該狀態時既不能接受新的任務也不能處理阻塞隊列中的任務,並且能中斷現在線程中的任務。當線程處於RUNNING和SHUTDOWN狀態,調用shutdownNow()方法就可以使線程變為該狀態

  TIDYING:在SHUTDOWN狀態下阻塞隊列為空,且線程中的工作線程數量為0就會進入該狀態,當在STOP狀態下時,只要線程中的工作線程數量為0就會進入該狀態。

  TERMINATED:在TIDYING狀態下調用terminated()方法就會進入該狀態。可以認為該狀態是最終的終止狀態。

execute()方法解析(JDK1.8)

  execute():提交任務交給線程池運行

public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {
        //任務為空則報空異常
        if (command == null)
            throw new NullPointerException();
        /*
         * 1. 如果正在運行的線程數少於corePoolSize。那么就嘗試去開始一個新線程並用傳入的command作為它第一個任務,
      * 然后讓addworker去原子性的檢查線程池的運行狀態和線程數量,以至於能提前知道是否能添加線程進去。
      * 如果成功則線程運行,不成功就會去到下一步,並且再獲取一次ctl的值(ctl是用來獲取線程狀態和線程數的)
*/ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } /* * 2.如果一個任務能被成功的排隊,那么仍然需要雙重檢查是否我們需要添加一個新的線程(因為有可能會第一次檢查完后有一個線程銷毀,所以需要雙重檢查)
      * 或者進入此方法后線程關閉。所以很有必要重新檢查一遍狀態是否需要回滾排隊,是否停止或開始一個新線程或是否不存在
*/     if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* *3.如果我們無法將任務進行排隊(即進入隊列中),那么我們嘗試添加一個新線程,如果失敗我們就使用拒絕策略拒絕這個任務 */ else if (!addWorker(command, false)) reject(command); } }

線程池的一些常用方法

  submit():提交任務,能夠返回執行結果execute+Future

  shutdown():關閉線程池,等待任務都執行完

  shutdownNow():關閉線程池,不等待任務執行完

  getTaskCount():線程池已執行和未執行的任務總數

  getCompletedTaskCount():已完成的任務數量

  getPoolSize():線程池當前線程數量

  getActiveCount():當前線程池中正在執行任務的線程數量

Executor線程池創建的四種線程

  newFixedThreadPool:創建的是定長的線程池,可以控制線程最大並發數,超出的線程會在線程中等待,使用的是無界隊列,核心線程數和最大線程數一樣,當線程池中的線程沒有任務時候立刻銷毀,使用默認線程工廠。

  newSingleThreadExecutor:創建的是單線程化的線程池,只會用唯一一個工作線程執行任務,可以指定按照是否是先入先出,還是優先級來執行任務。同樣使用無界隊列,核心線程數和最大線程數都是1個,同樣keepAliveTime為0,可選擇是否使用默認線程工廠。

  newCachedThreadPool:設定一個可緩存的線程池,當線程池長度超過處理的需要,可以靈活回收空閑線程,如果沒有可以回收的才新建線程。沒有核心線程數,當線程沒有任務60s之后就會回收空閑線程,使用有界隊列。同樣可以選擇是否使用默認線程工廠。

  newScheduledThreadPool:支持線程定時操作和周期性操作。

  下面是方法的源碼:

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM