Java線程池主線程等待子線程執行完成


今天討論一個入門級的話題, 不然沒東西更新對不起空間和域名~~

工作總往往會遇到異步去執行某段邏輯, 然后先處理其他事情, 處理完后再把那段邏輯的處理結果進行匯總的產景, 這時候就需要使用線程了.

一個線程啟動之后, 是異步的去執行需要執行的內容的, 不會影響主線程的流程,  往往需要讓主線程指定后, 等待子線程的完成. 這里有幾種方式.
站在 主線程的角度, 我們可以分為主動式和被動式.
主動式指主線主動去檢測某個標志位, 判斷子線程是否已經完成. 被動式指主線程被動的等待子線程的結束, 很明顯, 比較符合人們的胃口. 就是你事情做完了, 你告訴我, 我匯總一下, 哈哈.
那么主線程如何等待子線程工作完成呢. 很簡單, Thread 類給我們提供了join 系列的方法, 這些方法的目的就是等待當前線程的die. 舉個例子.
 
public   class  Threads  {
 
       public   static   void  main (String[]  args ) {
           SubThread  thread  =   new  SubThread () ;
          thread . start () ;
          //主線程處理其他工作,讓子線程異步去執行.
            mainThreadOtherWork () ;
           System .  out . println ( “now waiting sub thread done.” ) ;
          //主線程其他工作完畢,等待子線程的結束, 調用join系列的方法即可(可以設置超時時間)
            try  {
              thread . join () ;
          }   catch   ( InterruptedException  e) {
              e . printStackTrace () ;
          }
          System .  out . println ( “now all done.” ) ;
     }
 
       private   static   void  mainThreadOtherWork () {
          System .  out . println ( “main thread work start” ) ;
            try  {
               Thread .  sleep ( 3000L ) ;
          }   catch   ( InterruptedException  e) {
              e . printStackTrace () ;
          }
           System .  out . println ( “main thread work done.” ) ;
     }
 
       public   static   class  SubThread   extends  Thread {
           @Override
            public   void  run () {
               working () ;
          }
 
            private   void  working () {
               System .  out . println ( “sub thread start working.” ) ;
               busy () ;
               System .  out . println ( “sub thread stop working.” ) ;
          }
 
            private   void  busy () {
                try  {
                     sleep ( 5000L ) ;
              }   catch   ( InterruptedException  e) {
                   e . printStackTrace () ;
              }
          }
          
     }
}
 
本程序的數據有可能是如下:
  1. main thread work start
  2. sub thread start working.
  3. main thread work done.
  4. now waiting sub thread done.
  5. sub thread stop working.
  6. now all done.
忽略標號, 當然輸出也有可能是1和2調換位置了. 這個我們是無法控制的. 我們看下線程的join操作, 究竟干了什么.

 

      public   final   void  join()   throws  InterruptedException {
     join(0) ;
    }
這里是調用了
     public   final   synchronized   void  join(  long  millis) 
方法, 參數為0, 表示沒有超時時間, 等到線程結束為止. join(millis)方法里面有這么一段代碼:

          while  (isAlive()) {

          wait(0) ;
         }
說明, 當線程處於活躍狀態的時候, 會一直等待, 直到這里的isAlive方法返回false, 才會結束.isAlive方法是一個本地方法, 他的作用是判斷線程是否已經執行結束. 注釋是這么寫的: 

Tests if this thread is alive. A thread is alive if it has been started and has not yet died.

 
可見, join系列方法可以幫助我們等待一個子線程的結束.
 
那么要問, 有沒有另外一種方法可以等待子線程結束? 當然有的, 我們可以使用並發包下面的Future模式.
Future是一個任務執行的結果, 他是一個將來時, 即一個任務執行, 立即異步返回一個Future對象, 等到任務結束的時候, 會把值返回給這個future對象里面. 我們可以使用ExecutorService接口來提交一個線程.
 
public   class  Threads {
 
       static  ExecutorService   executorService  =  Executors .  newFixedThreadPool ( 1 ) ;
     
      @SuppressWarnings ( “rawtypes” )
       public   static   void  main (String[]  args )   throws  InterruptedException ,  ExecutionException {
          SubThread thread  =   new  SubThread () ;
//        thread.start();
           Future  future  =   executorService . submit (thread) ;
            mainThreadOtherWork () ;
          System .  out . println ( “now waiting sub thread done.” ) ;
          future . get () ;
//        try {
//            thread.join();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
          System .  out . println ( “now all done.” ) ;
            executorService . shutdown () ;
     }
 
       private   static   void  mainThreadOtherWork () {
          System .  out . println ( “main thread work start” ) ;
            try  {
              Thread .  sleep ( 3000L ) ;
          }   catch  (InterruptedException e) {
              e . printStackTrace () ;
          }
          System .  out . println ( “main thread work done.” ) ;
     }
 
       public   static   class  SubThread   extends  Thread{
           @Override
            public   void  run () {
               working () ;
          }
 
            private   void  working () {
              System .  out . println ( “sub thread start working.” ) ;
               busy () ;
              System .  out . println ( “sub thread stop working.” ) ;
          }
 
            private   void  busy () {
                try  {
                     sleep ( 5000L ) ;
              }   catch  (InterruptedException e) {
                   e . printStackTrace () ;
              }
          }
          
     }
     
}
 
這 里, ThreadPoolExecutor 是實現了 ExecutorService的方法, sumbit的過程就是把一個Runnable接口對象包裝成一個 Callable接口對象, 然后放到 workQueue里等待調度執行. 當然, 執行的啟動也是調用了thread的start來做到的, 只不過這里被包裝掉了. 另外, 這里的thread是會被重復利用的, 所以這里要退出主線程, 需要執行以下shutdown方法以示退出使用線程池. 扯遠了. 
 
這 種方法是得益於Callable接口和Future模式, 調用future接口的get方法, 會同步等待該future執行結束, 然后獲取到結果. Callbale接口的接口方法是 V call(); 是可以有返回結果的, 而Runnable的 void run(), 是沒有返回結果的. 所以, 這里即使被包裝成Callbale接口, future.get返回的結果也是null的.如果需要得到返回結果, 建議使用Callable接口.
 
通過隊列來控制線程的進度, 是很好的一個理念. 我們完全可以自己搞個隊列, 自己控制. 這樣也可以實現. 不信看代碼:
 
public  class  Threads {
 
//   static ExecutorService executorService = Executors.newFixedThreadPool(1);
       static  final  BlockingQueue < Integer >   queue  =   new  ArrayBlockingQueue < Integer > ( 1 ) ;
       public  static  void  main (String[]  args )   throws  InterruptedException ,  ExecutionException {
          SubThread thread  =   new  SubThread (  queue ) ;
          thread . start () ;
//        Future future = executorService.submit(thread);
            mainThreadOtherWork () ;
          System .  out . println ( “now waiting sub thread done.” ) ;
//        future.get();
            queue . take () ;
//        try {
//            thread.join();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
          System .  out . println ( “now all done.” ) ;
//        executorService.shutdown();
     }
 
       private  static  void  mainThreadOtherWork () {
          System .  out . println ( “main thread work start” ) ;
            try  {
              Thread .  sleep ( 3000L ) ;
          }   catch  (InterruptedException e) {
              e . printStackTrace () ;
          }
          System .  out . println ( “main thread work done.” ) ;
     }
 
       public  static  class  SubThread   extends  Thread{
          
            private  BlockingQueue < Integer >  queue ;
          
           /**
           *   @param  queue
           */
            public  SubThread ( BlockingQueue < Integer >  queue ) {
                this . queue  =  queue ;
          }
 
           @Override
            public  void  run () {
                try {
               working () ;
              }  finally {
                     try  {
                         queue . put ( 1 ) ;
                   }   catch  (InterruptedException e) {
                        e . printStackTrace () ;
                   }
              }
              
          }
 
            private  void  working () {
              System .  out . println ( “sub thread start working.” ) ;
               busy () ;
              System .  out . println ( “sub thread stop working.” ) ;
          }
 
            private  void  busy () {
                try  {
                     sleep ( 5000L ) ;
              }   catch  (InterruptedException e) {
                   e . printStackTrace () ;
              }
          }
          
     }
     
}
 
這 里是得益於我們用了一個阻塞隊列, 他的put操作和take操作都會阻塞(同步), 在滿足條件的情況下.當我們調用take()方法是, 由於子線程還沒結束, 隊列是空的, 所以這里的take操作會阻塞, 直到子線程結束的時候, 往隊列里面put了個元素, 表明自己結束了. 這時候主線程的take()就會返回他拿到的數據. 當然, 他拿到什么我們是不必去關心的.
以上幾種情況都是針對子線程只有1個的時候. 當子線程有多個的時候, 情況就不妙了.
第一種方法, 你要調用很多個線程的join, 特別是當你的線程不是for循環創建的, 而是一個一個創建的時候.
第二種方法, 要調用很多的future的get方法, 同第一種方法.
第三種方法, 比較方便一些, 只需要每個線程都在queue里面 put一個元素就好了.但是, 第三種方法, 這個隊列里的對象, 對我們是毫無用處, 我們為了使用隊列, 而要不明不白浪費一些內存, 那有沒有更好的辦法呢?
有的, concurrency包里面提供了好多有用的東東, 其中, CountDownLanch就是我們要用的.
CountDownLanch 是一個倒數計數器, 給一個初始值(>=0), 然后沒countDown一次就會減1, 這很符合等待多個子線程結束的產景: 一個線程結束的時候, countDown一次, 直到所有都countDown了 , 那么所有子線程就都結束了.
先看看CountDownLanch有哪些方法:
await: 會阻塞等待計數器減少到0位置. 帶參數的await是多了等待時間.
countDown: 將當前的技術減1
getCount(): 返回當前的計數
顯而易見, 我們只需要在子線程執行之前, 賦予初始化countDownLanch, 並賦予線程數量為初始值.
每個線程執行完畢的時候, 就countDown一下.主線程只需要調用await方法, 可以等待所有子線程執行結束, 看代碼:
 
public   class  Threads {
 
//   static ExecutorService executorService = Executors.newFixedThreadPool(1);
       static   final  BlockingQueue < Integer >   queue  =   new  ArrayBlockingQueue < Integer > ( 1 ) ;
       public   static   void  main (String[]  args )   throws  InterruptedException ,  ExecutionException {
            int  threads  =  5 ;
          CountDownLatch countDownLatch  =   new  CountDownLatch (threads) ;
            for (  int  i = 0 ; i < threads ; i ++ ){
              SubThread thread  =   new  SubThread ( 2000 * (i + 1 ) ,  countDownLatch) ;
              thread . start () ;
          }
//        Future future = executorService.submit(thread);
            mainThreadOtherWork () ;
          System .  out . println ( “now waiting sub thread done.” ) ;
//        future.get();
//        queue.take();
          countDownLatch . await () ;
//        try {
//            thread.join();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
          System .  out . println ( “now all done.” ) ;
//        executorService.shutdown();
     }
 
       private   static   void  mainThreadOtherWork () {
          System .  out . println ( “main thread work start” ) ;
            try  {
              Thread .  sleep ( 3000L ) ;
          }   catch  (InterruptedException e) {
              e . printStackTrace () ;
          }
          System .  out . println ( “main thread work done.” ) ;
     }
 
       public   static   class  SubThread   extends  Thread{
          
//        private BlockingQueue<Integer> queue;
            private  CountDownLatch  countDownLatch ;
            private   long  work ;
          
           /**
           *   @param  queue
           */
//        public SubThread(BlockingQueue<Integer> queue) {
//            this.queue = queue;
//            this.work = 5000L;
//        }
          
            public  SubThread (  long  work ,  CountDownLatch  countDownLatch ) {
//            this.queue = queue;
                this . countDownLatch  =  countDownLatch ;
                this . work  =  work ;
          }
 
           @Override
            public   void  run () {
                try {
               working () ;
              }  finally {
//                 try {
//                      queue.put(1);
//                 } catch (InterruptedException e) {
//                      e.printStackTrace();
//                 }
                    countDownLatch . countDown () ;
              }
              
          }
 
            private   void  working () {
              System .  out . println ( getName () + ” sub thread start working.” ) ;
               busy () ;
              System .  out . println ( getName () + ” sub thread stop working.” ) ;
          }
 
            private   void  busy () {
                try  {
                     sleep ( work ) ;
              }   catch  (InterruptedException e) {
                   e . printStackTrace () ;
              }
          }
          
     }
}
此種方法也適用於使用 ExecutorService summit 的任務的執行.
另外還有一個並發包的類CyclicBarrier, 這個是(子)線程之間的互相等待的利器. 柵欄, 就是把大家都在一個地方堵住, 就像水閘, 等大家都完成了之前的操作, 在一起繼續下面的操作. 不過就不再本篇的討論訪問內了.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM