阻塞隊列和線程池


一、阻塞隊列

1.介紹
阻塞隊列會對當前線程產生阻塞,比如一個線程從一個空的阻塞隊列中取元素,此時線程會被阻塞直到阻塞隊列中有了元素。當隊列中有元素后,被阻塞的線程會自動被喚醒(不需要我們編寫代碼去喚醒)。

2.實現
ArrayBlockingQueue:基於數組實現的一個阻塞隊列,在創建ArrayBlockingQueue對象時必須制定容量大小。並且可以指定公平性與非公平性,默認情況下為非公平的,即不保證等待時間最長的隊列最優先能夠訪問隊列。
LinkedBlockingQueue:基於鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE。
PriorityBlockingQueue:以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標志),前面2種都是有界隊列。
DelayQueue:基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。

二、線程池

1.介紹
使用線程池的好處有:1.創建/銷毀線程伴隨着系統開銷,過於頻繁的創建/銷毀線程,會很大程度上影響處理效率 2.線程並發數量過多,搶占系統資源從而導致阻塞 3.線程池可以對線程進行一些簡單的管理
ThreadPoolExecutor類是線程池中最核心的一個類, 在ThreadPoolExecutor類中提供了四個構造方法,其中 三個構造器都是調用的第四個構造器進行的初始化工作。

2.構造方法為:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
七個參數的含義:
* corePoolSize:核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中;

* maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;

* keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;

* unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:

TimeUnit.DAYS;                //天
TimeUnit.HOURS;              //小時
TimeUnit.MINUTES;            //分鍾
TimeUnit.SECONDS;            //秒
TimeUnit.MILLISECONDS;       //毫秒
TimeUnit.MICROSECONDS;       //微妙
TimeUnit.NANOSECONDS;        //納秒


* workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;


ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

* threadFactory:線程工廠,主要用來創建線程;

* handler:表示當拒絕處理任務時的策略,有以下四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

 

3.線程池的執行策略
* 如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
* 如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
* 如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
* 如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

4..測試

public  class  Test {
      public  static  void  main(String[] args) {  
          ThreadPoolExecutor executor =  new  ThreadPoolExecutor( 5 10 200 , TimeUnit.MILLISECONDS,
                  new  ArrayBlockingQueue<Runnable>( 5 ));
           
          for ( int  i= 0 ;i< 15 ;i++){
              MyTask myTask =  new  MyTask(i);
              executor.execute(myTask);
              System.out.println( "線程池中線程數目:" +executor.getPoolSize()+ ",隊列中等待執行的任務數目:" +
              executor.getQueue().size()+ ",已執行玩別的任務數目:" +executor.getCompletedTaskCount());
          }
          executor.shutdown();
      }
}
class  MyTask  implements  Runnable {
     private  int  taskNum;
      
     public  MyTask( int  num) {
         this .taskNum = num;
     }
      
     @Override
     public  void  run() {
         System.out.println( "正在執行task " +taskNum);
         try  {
             Thread.currentThread().sleep( 4000 );
         catch  (InterruptedException e) {
             e.printStackTrace();
         }
         System.out.println( "task " +taskNum+ "執行完畢" );
     }
}


這個程序中當線程池中線程的數目大於5時,便將任務放入任務緩存隊列里面,當任務緩存隊列滿了之后,便創建新的線程。

5.使用Executors類中提供的幾個靜態方法來創建線程池

Executors.newCachedThreadPool();         //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor();    //創建容量為1的緩沖池
Executors.newFixedThreadPool( int );     //創建固定容量大小的緩沖池(定長)


  newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
  newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;1. 有且僅有一個工作線程執行任務2. 所有任務按照指定順序執行,即遵循隊列的入隊出隊規則
  newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。
  實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。
  另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。

三、Callable

創建線程在上一篇wiki中說明了兩種方法,分別是繼承Thread,重寫run方法和實現Runnable接口,重新run方法。現在介紹第三種實現Callable接口,重寫call方法。
區別:

  • Callable可以在任務結束的時候提供一個返回值Future對象,Runnable無法提供這個功能

  • Callable的call方法分可以拋出異常,而Runnable的run方法不能拋出異常。

  • Callable規定的方法是call(),而Runnable規定的方法是run().


測試:

/*
* FileName: TestCallable
* Author:   aiguo.sun
* Date:     2019/3/28 20:06
* Description: 測試callable方法
*/
package  JavaTest;
import  java.util.concurrent.Callable;
import  java.util.concurrent.ExecutionException;
import  java.util.concurrent.FutureTask;
/**
* 一、創建執行線程的方式三:實現 Callable 接口。 相較於實現 Runnable 接口的方式,方法可以有返回值,並且可以拋出異常。
*
* 二、執行 Callable 方式,需要 FutureTask 實現類的支持,用於接收運算結果。  FutureTask 是  Future 接口的實現類
*/
public  class  TestCallable {
     public  static  void  main(String[] args) {
         ThreadDemo td =  new  ThreadDemo();
         //1.執行 Callable 方式,需要 FutureTask 實現類的支持,用於接收運算結果。
         FutureTask<Integer> result =  new  FutureTask<>(td);
         new  Thread(result).start();
         //2.接收線程運算后的結果
         try  {
             //判斷是否完成
             if (!result.isDone())
             {
                 System.out.println( "-sorry-----------------------" );;
             }
             //FutureTask 可用於 閉鎖 類似於CountDownLatch的作用,在所有的線程沒有執行完成之后這里是不會執行的
             Integer sum = result.get();
             System.out.println(sum);
             System.out.println( "------------------------------------" );
         catch  (InterruptedException | ExecutionException e) {
             e.printStackTrace();
         }
     }
}
class  ThreadDemo  implements  Callable<Integer> {
     @Override
     public  Integer call()  throws  Exception {
         int  sum =  0 ;
         for  ( int  i =  0 ; i <=  100000000 ; i++) {
             sum += i;
         }
         return  sum;
     }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM