線程池ThreadPoolExecutor與阻塞隊列BlockingQueue應用


作者QQ:1095737364    QQ群:123300273     歡迎加入!

1.線程池介紹

  JDK5.0以上: java.util.concurrent.ThreadPoolExecutor 
構造函數簽名:
public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    RejectedExecutionHandler handler
);   

 

參數介紹:
corePoolSize 核心線程數,指保留的線程池大小(不超過maximumPoolSize值時,線程池中最多有corePoolSize 個線程工作)。 
maximumPoolSize 指的是線程池的最大大小(線程池中最大有corePoolSize 個線程可運行)。 
keepAliveTime 指的是空閑線程結束的超時時間(當一個線程不工作時,過keepAliveTime 長時間將停止該線程)。 
unit 是一個枚舉,表示 keepAliveTime 的單位(有NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS,7個可選值)。 
workQueue 表示存放任務的隊列(存放需要被線程池執行的線程隊列)。 
handler 拒絕策略(添加任務失敗后如何處理該任務).

2.運行策略

1、線程池剛創建時,里面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列里面有任務,線程池也不會馬上執行它們。
2、當調用 execute() 方法添加一個任務時,線程池會做如下判斷:
    a. 如果正在運行的線程數量小於 corePoolSize,那么馬上創建線程運行這個任務;
    b. 如果正在運行的線程數量大於或等於 corePoolSize,那么將這個任務放入隊列。
    c. 如果這時候隊列滿了,而且正在運行的線程數量小於 maximumPoolSize,那么還是要創建線程運行這個任務;
    d. 如果隊列滿了,而且正在運行的線程數量大於或等於 maximumPoolSize,那么線程池會拋出異常,告訴調用者“我不能再接受任務了”。
3、當一個線程完成任務時,它會從隊列中取下一個任務來執行。
4、當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行 的線程數大於 corePoolSize,那么這個線程就被停掉。所以線程池的所有任務完成后,它最終會收縮到 corePoolSize 的大小。
 
       這個過程說明,並不是先加入任務就一定會先執行。假設隊列大小為 4,corePoolSize為2,maximumPoolSize為6,那么當加入15個任務時,執行的順序類似這樣:首先執行任務 1、2,然后任務3~6被放入隊列。這時候隊列滿了,任務7、8、9、10 會被馬上執行,而任務 11~15 則會拋出異常。最終順序是:1、2、7、8、9、10、3、4、5、6。當然這個過程是針對指定大小的ArrayBlockingQueue<Runnable>來說,如果是LinkedBlockingQueue<Runnable>,因為該隊列無大小限制,所以不存在上述問題。

3.測試示例

(1)LinkedBlockingQueue<Runnable>隊列使用1:

package threadQueueTest;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* User: 楊永生
* Date: 15:47 2017/8/8
* Email: kevin@hiibook.com
*/
public class ThreadPoolTest implements Runnable {
    public void run() {
      synchronized(this) {
        try{
          System.out.println(Thread.currentThread().getName());
          Thread.sleep(3000);
        }catch (InterruptedException e){
          e.printStackTrace();
        }
      }
    }
 
    public static void main(String[] args) {
      BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
      ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
      for (int i = 0; i < 10; i++) {
        executor.execute(new Thread(new ThreadPoolTest(),"TestThread".concat(""+i)));
        int threadSize = queue.size();
        System.out.println("線程隊列大小為-->"+threadSize);
      }
      executor.shutdown();
    }
}

 

結果:
線程隊列大小為-->0
線程隊列大小為-->0
線程隊列大小為-->1
線程隊列大小為-->2
線程隊列大小為-->3
線程隊列大小為-->4
線程隊列大小為-->5
線程隊列大小為-->6
線程隊列大小為-->7
線程隊列大小為-->8
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
 
說明:可見,線程隊列最大為8,共執行了10個線線程。因為是從線程池里運行的線程,所以雖然將線程的名稱設為"TestThread".concat(""+i),但輸出后還是變成了pool-1-thread-x。
 

(2)LinkedBlockingQueue<Runnable>隊列使用2:

package threadQueueTest;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* User: 楊永生
* Date: 15:55 2017/8/8
* Email: kevin@hiibook.com
*/
public class ThreadPoolTest2 implements Runnable {
  public void run() {
    synchronized(this) {
      try{
        System.out.println("線程名稱:"+Thread.currentThread().getName());
        Thread.sleep(3000); //休眠是為了讓該線程不至於執行完畢后從線程池里釋放
      }catch (InterruptedException e){
        e.printStackTrace();
      }
    }
  }
 
public static void main(String[] args) throws InterruptedException {
  BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定為4的線程隊列
  ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
  for (int i = 0; i < 10; i++) {
    executor.execute(new Thread(new ThreadPoolTest2(), "TestThread".concat(""+i)));
    int threadSize = queue.size();
    System.out.println("線程隊列大小為-->"+threadSize);
  }
  executor.shutdown();
  }
}

 

結果:
線程隊列大小為-->0
線程隊列大小為-->0
線程隊列大小為-->1
線程隊列大小為-->2
線程隊列大小為-->3
線程隊列大小為-->4
線程隊列大小為-->4
線程隊列大小為-->4
線程隊列大小為-->4
線程隊列大小為-->4
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-4
線程名稱:pool-1-thread-6
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-3
線程名稱:pool-1-thread-5
線程名稱:pool-1-thread-4
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-6
線程名稱:pool-1-thread-1
 
說明: 可見,總共10個線程,因為核心線程數為2,2個線程被立即運行,線程隊列大小為4,所以4個線程被加入隊列,最大線程數為6,還能運行6-2=4個,其10個線程的其余4個線程又立即運行了。
 

(3)LinkedBlockingQueue<Runnable>隊列使用3(測試異常):

如果將我們要運行的線程數10改為11,則由於最大線程數6+線程隊列大小4=10<11,則根據線程池工作原則,最后一個線程將被拒絕策略拒絕,將示例二的main方法改為如下
public static void main(String[] args) throws InterruptedException {
  BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定為4的線程隊列
  ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
  for (int i = 0; i < 11; i++) {
    executor.execute(new Thread(new ThreadPoolTest2(), "TestThread".concat(""+i)));
    int threadSize = queue.size();
    System.out.println("線程隊列大小為-->"+threadSize);
  }
  executor.shutdown();
}

 

 
結果:
線程隊列大小為-->0
線程隊列大小為-->0
線程隊列大小為-->1
線程隊列大小為-->2
線程隊列大小為-->3
線程隊列大小為-->4
線程隊列大小為-->4
線程隊列大小為-->4
線程隊列大小為-->4
線程隊列大小為-->4
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-4
線程名稱:pool-1-thread-6
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task Thread[TestThread10,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@21588809[Running, pool size = 6, active threads = 6, queued tasks = 4, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at threadQueueTest.ThreadPoolTest2.main(ThreadPoolTest2.java:27)
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-3
線程名稱:pool-1-thread-5
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-4
線程名稱:pool-1-thread-6
線程名稱:pool-1-thread-1
 
說明:很明顯,拋RejectedExecutionException異常了,被拒絕策略拒絕了,這就說明線程超出了線程池的總容量(線程隊列大小+最大線程數)。
 
  對於 java.util.concurrent.BlockingQueue 類有有三種方法將線程添加到線程隊列里面,然而如何區別三種方法的不同呢,其實在隊列未滿的情況下結果相同,都是將線程添加到線程隊列里面,區分就在於當線程隊列已經滿的時候,此時
public boolean add(E e) 方法將拋出IllegalStateException異常,說明隊列已滿。
public boolean offer(E e) 方法則不會拋異常,只會返回boolean值,告訴你添加成功與否,隊列已滿,當然返回false。
public void put(E e) throws InterruptedException 方法則一直阻塞(即等待,直到線程池中有線程運行完畢,可以加入隊列為止)。
 
 

(4)LinkedBlockingQueue<Runnable>隊列使用3(測試add(E e)異常):

將示例二的main方法改為如下:
public static void main(String[] args) throws InterruptedException {
  BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定為4的線程隊列
  ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
  for (int i = 0; i < 10; i++) {
    executor.execute(new Thread(new ThreadPoolTest4(), "TestThread".concat(""+i)));
    int threadSize = queue.size();
    System.out.println("線程隊列大小為-->"+threadSize);
    if (threadSize==4){
      queue.add(new Runnable() { //隊列已滿,拋異常
        @Override
        public void run(){
         System.out.println("我是新線程,看看能不能搭個車加進去!");
 
        }
       });
    }
  }
  executor.shutdown();
}

 

結果:
線程隊列大小為-->0
線程隊列大小為-->0
線程隊列大小為-->1
線程隊列大小為-->2
線程隊列大小為-->3
線程隊列大小為-->4
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
at threadQueueTest.ThreadPoolTest4.main(ThreadPoolTest4.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1

(5)LinkedBlockingQueue<Runnable>隊列使用3(測試 offer(E e)異常):

將示例二的main方法改為如下:
public static void main(String[] args) throws InterruptedException {
  BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定為4的線程隊列
  ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
  for (int i = 0; i < 10; i++) {
    executor.execute(new Thread(new ThreadPoolTest5(), "TestThread".concat(""+i)));
    int threadSize = queue.size();
    System.out.println("線程隊列大小為-->"+threadSize);
    if (threadSize==4){
      final boolean flag = queue.offer(new Runnable() {
        @Override
        public void run(){
          System.out.println("我是新線程,看看能不能搭個車加進去!");
        }
      });
      System.out.println("添加新線程標志為-->"+flag);
    }
  }
  executor.shutdown();
}

 

結果:
線程隊列大小為-->0
線程隊列大小為-->0
線程隊列大小為-->1
線程隊列大小為-->2
線程隊列大小為-->3
線程隊列大小為-->4
添加新線程標志為-->false
線程隊列大小為-->4
添加新線程標志為-->false
線程隊列大小為-->4
添加新線程標志為-->false
線程隊列大小為-->4
添加新線程標志為-->false
線程隊列大小為-->4
添加新線程標志為-->false
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-4
線程名稱:pool-1-thread-6
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-3
線程名稱:pool-1-thread-5
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-4
線程名稱:pool-1-thread-6
線程名稱:pool-1-thread-1

(6)LinkedBlockingQueue<Runnable>隊列使用3(測試put(E e)異常):

將示例二的main方法改為如下:
public static void main(String[] args) throws InterruptedException {
  BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定為4的線程隊列
  ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
  for (int i = 0; i < 10; i++) {
    executor.execute(new Thread(new ThreadPoolTest6(), "TestThread".concat(""+i)));
    int threadSize = queue.size();
    System.out.println("線程隊列大小為-->"+threadSize);
    if (threadSize==4){
      queue.put(new Runnable() {
        @Override
        public void run(){
          System.out.println("我是新線程,看看能不能搭個車加進去!");
        }
      });
    }
  }
  executor.shutdown();
}

 

結果:
線程隊列大小為-->0
線程隊列大小為-->0
線程隊列大小為-->1
線程隊列大小為-->2
線程隊列大小為-->3
線程隊列大小為-->4
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-2
線程名稱:pool-1-thread-1
線程隊列大小為-->4
線程名稱:pool-1-thread-3
線程名稱:pool-1-thread-2
線程隊列大小為-->4
線程名稱:pool-1-thread-4
線程名稱:pool-1-thread-1
線程名稱:pool-1-thread-3
線程隊列大小為-->3
線程隊列大小為-->4
線程名稱:pool-1-thread-5
我是新線程,看看能不能搭個車加進去!
我是新線程,看看能不能搭個車加進去!
我是新線程,看看能不能搭個車加進去!
線程名稱:pool-1-thread-2
我是新線程,看看能不能搭個車加進去!
 
說明:很明顯,嘗試了四次才加進去,前面三次嘗試添加,但由於線程sleep(3000),所以沒有執行完,線程隊列一直處於滿的狀態,直到某個線程執行完,隊列有空位,新線程才加進去,沒空位之前一直阻塞(即等待),我能加進去為止。
 

4.總結:

那么線程池的排除策略是什么樣呢,一般按如下規律執行:
A.  如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。
B.  如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。
C.  如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。
 
總結:
1. 線程池可立即運行的最大線程數 即maximumPoolSize 參數。
2. 線程池能包含的最大線程數 = 可立即運行的最大線程數 + 線程隊列大小 (一部分立即運行,一部分裝隊列里等待)
3. 核心線程數可理解為建議值,即建議使用的線程數,或者依據CPU核數
4. add,offer,put三種添加線程到隊列的方法只在隊列滿的時候有區別,add為拋異常,offer返回boolean值,put直到添加成功為止。
5.同理remove,poll, take三種移除隊列中線程的方法只在隊列為空的時候有區別, remove為拋異常,poll為返回boolean值, take等待直到有線程可以被移除。
看看下面這張圖就清楚了:
 
 
 
 

版權聲明: 本文有 ```...襇簞點 發表於 bloghome博客

轉載聲明: 可自由轉載、引用,但需要屬名作者且注明文章出處。

文章鏈接: https://www.bloghome.com.cn/user/yysblog


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM