Executor框架(三)線程池詳細介紹與ThreadPoolExecutor


本文將介紹線程池的設計細節,這些細節與 ThreadPoolExecutor類的參數一一對應,所以,將直接通過此類介紹線程池。

ThreadPoolExecutor類 簡介

  java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。

ThreadPoolExecutor 的構造方法

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

注意: 幾個參數的大小范圍,corePoolSize >= 0,maximumPoolSize >= 1 ,keepAliveTime >= 0(keepAliveTime為0時,表示線程永久存活,即使空閑很長時間,也不會撤銷)

線程池配置的各種參數以及策略

1. corePoolSize (核心池的大小)與 maximumPoolSize(線程池最大線程數)

  • ThreadPoolExecutor 將根據 corePoolSize 和 maximumPoolSize 設置的邊界自動調整池大小。
  • 池中線程的創建策略。 當新任務在方法 execute(java.lang.Runnable) 中提交時,如果運行的線程少於 corePoolSize,則創建新線程來處理請求,即使其他輔助線程是空閑的。如果運行的線程多於 corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才創建新線程。
  • 數量固定的線程池。 如果設置的 corePoolSize 和 maximumPoolSize 相同,則創建了固定大小的線程池。
  • 線程池數量任意。 如果將 maximumPoolSize 設置為基本的無界值(如 Integer.MAX_VALUE),則允許池適應任意數量的並發任務。
  • 動態更改大小。 在大多數情況下,核心和最大池大小僅基於構造來設置,不過也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。

2. 線程創建的時機

  • 默認情況下,即使核心線程最初只是在新任務到達時才創建和啟動的
  • 也可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 對其進行動態重寫。如果構造帶有非空隊列的池,則可能希望預先啟動線程。

3. 線程創建的工廠方法 ThreadFactory

  • 使用 ThreadFactory 創建新線程。通過提供不同的 ThreadFactory,可以改變線程的名稱、線程組、優先級、守護進程狀態,等等。如果從 newThread 返回 null 時 ThreadFactory 未能創建線程,則執行程序將繼續運行,但不能執行任何任務。

4. 保持活動時間 keepAliveTime

  • 如果池中當前有多於 corePoolSize 的線程,則這些多出的線程在空閑時間超過 keepAliveTime 時將會終止,直到池中的數量減少到核心數。這提供了當池處於非活動狀態時減少資源消耗的方法。
  • 默認情況下,保持活動策略只在有多於 corePoolSizeThreads 的線程時應用。但是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法調用后,也可將此超時策略應用於核心線程。

5. BlockingQueue 任務隊列

  • 如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。
  • 如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
  • 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。
  • 任務隊列的策略
    • 直接提交。工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用於立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
    • 無界隊列。使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界隊列;例如,在 Web 頁服務器中。這種排隊可用於處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
    • 有界隊列。當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。

6. 被拒絕的任務的處理策略

  當 Executor 已經關閉,並且 Executor 將有限邊界用於最大線程和工作隊列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被拒絕。在以上兩種情況下,execute 方法都將調用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預定義的處理程序策略:

  • ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
  • ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
  • ThreadPoolExecutor.DiscardOldestPolicy:如果執行程序尚未關閉,丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
  • ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務。即直接在 execute 方法的調用線程中運行被拒絕的任務;

7. 隊列維護

  • 方法 getQueue() 允許出於監控和調試目的而訪問工作隊列。強烈反對出於其他任何目的而使用此方法。remove(java.lang.Runnable) 和 purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行存儲回收。

8. 鈎子(hook)方法

此類提供 protected 可重寫的 beforeExecute(Runnable)afterExecute(Runnable, Throwable) 方法,這兩種方法分別在執行每個任務之前和之后調用。它們可用於操縱執行環境;例如,重新初始化 ThreadLocal、搜集統計信息或添加日志條目。此外,還可以重寫方法 terminated() 來執行 Executor 完全終止后需要完成的所有特殊處理。


@ Example1 鈎子用法示例

此類的大多數擴展可以重寫一個或多個受保護的鈎子 (hook) 方法。例如,下面是一個添加了簡單的暫停/恢復功能的子類:

 class PausableThreadPoolExecutor extends ThreadPoolExecutor {
   private boolean isPaused;
   private ReentrantLock pauseLock = new ReentrantLock();
   private Condition unpaused = pauseLock.newCondition();

   public PausableThreadPoolExecutor(...) { super(...); }
 
   protected void beforeExecute(Thread t, Runnable r) {
     super.beforeExecute(t, r);
     pauseLock.lock();
     try {
       while (isPaused) unpaused.await();
     } catch(InterruptedException ie) {
       t.interrupt();
     } finally {
       pauseLock.unlock();
     }
   }
 
   public void pause() {
     pauseLock.lock();
     try {
       isPaused = true;
     } finally {
       pauseLock.unlock();
     }
   }
 
   public void resume() {
     pauseLock.lock();
     try {
       isPaused = false;
       unpaused.signalAll();
     } finally {
       pauseLock.unlock();
     }
   }
 }

@ Example2 任務拒絕策略示例

  下面的例子是通過傳入各種參數,配置創建了一個ThreadPoolExecutor線程池實例,並向此線程池提交多個任務,而且任務的數量大於線程池的承受數量。

public class Test_29 {
   public static void main(String[] args) throws InterruptedException {
	   SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
	   MyThreadFactory threadFactory = new MyThreadFactory();
       //創建一個線程池
	   ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 3, 60,TimeUnit.SECONDS,queue, threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

       //向線程池提交四個任務   
	   for(int i=0;i<4;i++){
		   MyRunnable myRunnable = new MyRunnable();
		   poolExecutor.execute(myRunnable);
	   }
	   
	   //關閉線程池
	   poolExecutor.shutdown();
   }
}

//自定義的工廠方法
class MyThreadFactory implements ThreadFactory{

	static int number=0;
	final String BASE_NAME = "poolthread_";

	@Override
	public Thread newThread(Runnable r) {
		number++;
		//自定線程池的創建線程的工廠方法,這里指定線程池中的每個線程命名:poolthread_i
		Thread thread = new Thread(r,BASE_NAME+number);
		System.out.println("線程池創建了一個線程:"+BASE_NAME+number);
		return thread;
	}
}

class MyRunnable implements Runnable{

	@Override
	public void run() {
		try {
			//休眠一秒,模擬線程的執行過程
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		//輸出當前執行任務的線程的名稱
        System.out.println("任務完成,執行任務的線程是:"+Thread.currentThread().getName());	
	}
}

運行結果:

線程池創建了一個線程:poolthread_1
線程池創建了一個線程:poolthread_2
線程池創建了一個線程:poolthread_3
任務完成,執行任務的線程是:main
任務完成,執行任務的線程是:poolthread_2
任務完成,執行任務的線程是:poolthread_3
任務完成,執行任務的線程是:poolthread_1

  注意此線程池的配置,隊列用的是 SynchronousQueue ,即不會存儲任務,都是要立即執行任務,所以此線程池的同一時間內只能最多接受3個任務。而例子一共提交了4個任務,由於拒絕任務的策略是ThreadPoolExecutor.CallerRunsPolicy,所以被線程池拒絕執行的任務,就由main線程執行了。
  拒絕任務的策略,除了JDK已經提供的四種外,還可以自定義策略,方法就是實現 RejectedExecutionHandler 接口


Executors 中提供了三種常用的ThreadPoolExecutor的創建:

1. FixedThreadPool 固定線程池

  固定線程池 的線程數量是固定的,由傳入的參數決定。線程 keepAliveTime 為0,即不會因為空閑超時而關閉線程,同時隊列是無邊界的隊列,不會發生任務丟棄。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2. SingleThreadPoolExcutor 單線程池
  單線程池中線程數量固定為1.

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

3. CachedThreadPool 緩存線程池

  緩存線程池的核心線程corePoolSize 數量為0,但是池中的最大線程數是 無邊界。空閑超時為60s,隊列用了SynchronousQueue,即任務是立即交付運行的。

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM