主線程等待線程池所有任務完成(慢慢更新)


一、CountDownLatch

public class CountDownLatchDemo {  

     final  static  SimpleDateFormat sdf= new  SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );  
     public  static  void  main(String[] args)  throws  InterruptedException {  
         CountDownLatch latch= new  CountDownLatch( 2 ); //兩個工人的協作  
         Worker worker1= new  Worker( "zhang san" 5000 , latch);  
         Worker worker2= new  Worker( "li si" 8000 , latch);  
         worker1.start(); //  
         worker2.start(); //  
         latch.await(); //等待所有工人完成工作  
         System.out.println( "all work done at " +sdf.format( new  Date()));  
     }  
       
       
     static  class  Worker  extends  Thread{  
         String workerName;   
         int  workTime;  
         CountDownLatch latch;  
         public  Worker(String workerName , int  workTime ,CountDownLatch latch){  
              this .workerName=workerName;  
              this .workTime=workTime;  
              this .latch=latch;  
         }  
         public  void  run(){  
             System.out.println( "Worker " +workerName+ " do work begin at " +sdf.format( new  Date()));  
             doWork(); //工作了  
             System.out.println( "Worker " +workerName+ " do work complete at " +sdf.format( new  Date()));  
             latch.countDown(); //工人完成工作,計數器減一  
   
         }  
           
         private  void  doWork(){  
             try  {  
                 Thread.sleep(workTime);  
             catch  (InterruptedException e) {  
                 e.printStackTrace();  
             }  
         }  
     }  
       
        
}
 
二、主線程等待線程池所有任務完成

http://blog.chenlb.com/2008/12/main-thread-wait-all-sub-thread-finish-task-in-thread-pool.html

 

 

原文出處:http://blog.chenlb.com/2008/12/main-thread-wait-all-sub-thread-finish-task-in-thread-pool.html

用線程池編寫多線程程序時,當所有任務完成時,要做一些統計的工作。而統計工作必須要在所有任務完成才能做。所以要讓主線程等待所有任務完成。可以使用ThreadPoolExecutor.awaitTermination(long timeout, TimeUnit unit)。請看示例代碼:

 
  1. package com.chenlb;  
  2.   
  3. import java.util.Random;  
  4. import java.util.concurrent.LinkedBlockingQueue;  
  5. import java.util.concurrent.ThreadPoolExecutor;  
  6. import java.util.concurrent.TimeUnit;  
  7.   
  8. /** 
  9.  * 線程池使用示例, 主線程等待所有任務完成再結束. 
  10.  * 
  11.  * @author chenlb 2008-12-2 上午10:31:03 
  12.  */  
  13. public class ThreadPoolUse {  
  14.   
  15.     public static class MyTask implements Runnable {  
  16.         private static int id = 0;  
  17.   
  18.         private String name = "task-"+(++id);  
  19.         private int sleep;   
  20.   
  21.         public MyTask(int sleep) {  
  22.             super();  
  23.             this.sleep = sleep;  
  24.         }  
  25.   
  26.         public void run() {  
  27.             System.out.println(name+" -----start-----");  
  28.             try {  
  29.                 Thread.sleep(sleep);    //模擬任務執行.  
  30.             } catch (InterruptedException e) {  
  31.                 e.printStackTrace();  
  32.             }  
  33.             System.out.println(name+" -----end "+sleep+"-----");  
  34.         }  
  35.   
  36.     }  
  37.   
  38.     public static void main(String[] args) {  
  39.         System.out.println("==================start==================");  
  40.         ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());  
  41.         int n = 10;  
  42.         int sleep = 10 * 1000;  //10s  
  43.         Random rm = new Random();  
  44.         for(int i=0; i<n; i++) {  
  45.             executor.execute(new MyTask(rm.nextInt(sleep)+1));  
  46.         }  
  47.   
  48.         executor.shutdown();//只是不能再提交新任務,等待執行的任務不受影響  
  49.   
  50.         try {  
  51.             boolean loop = true;  
  52.             do {    //等待所有任務完成  
  53.                 loop = !executor.awaitTermination(2, TimeUnit.SECONDS);  //阻塞,直到線程池里所有任務結束
  54.             } while(loop);  
  55.         } catch (InterruptedException e) {  
  56.             e.printStackTrace();  
  57.         }  
  58.   
  59.         System.out.println("==================end====================");  
  60.     }  
  61.   
  62. }  

當然還有其它方法。

 

http://xtu-xiaoxin.iteye.com/blog/649677

 

shutDown() 

    當線程池調用該方法時,線程池的狀態則立刻變成SHUTDOWN狀態。此時,則不能再往線程池中添加任何任務,否則將會拋出RejectedExecutionException異常。但是,此時線程池不會立刻退出,直到添加到線程池中的任務都已經處理完成,才會退出。 唯一的影響就是不能再提交任務了,正則執行的任務即使在阻塞着也不會結束,在排隊的任務也不會取消。

shutdownNow() 

     根據JDK文檔描述,大致意思是:執行該方法,線程池的狀態立刻變成STOP狀態,並試圖停止所有正在執行的線程,不再處理還在池隊列中等待的任務,當然,它會返回那些未執行的任務。 
     它試圖終止線程的方法是通過調用Thread.interrupt()方法來實現的,但是大家知道,這種方法的作用有限,如果線程中沒有sleep 、wait、Condition、定時鎖等應用, interrupt()方法是無法中斷當前的線程的。所以,ShutdownNow()並不代表線程池就一定立即就能退出,它可能必須要等待所有正在執行的任務都執行完成了才能退出。 

    上面對shutDown()以及shutDownNow()作了一個簡單的、理論上的分析。如果想知道why,則需要親自打開JDK源碼,分析分析。 
      想要分析shutDown()以及shutDownNow()源碼,我建議首先要對ThreadPoolExecutor有個大概了解。因為關閉線程池的所有方法邏輯都在ThreadPoolExecutor中處理的。 
      如果你真的想知道為什么,建議看一下我以前寫的一篇對ThreadPoolExecutor源碼分析的博文,我想這對你比較透徹的了解shutDown()和shutDownNow()的區別以及Java 線程池原理有很大的幫助。博文URL: 
         http://xtu-xiaoxin.iteye.com/admin/blogs/647744 

     廢話少說,要查看源碼,首先進入ThreadPoolExecutor的shutDown()方法: 

  

Java代碼   收藏代碼
  1. public void shutdown() {  
  2.      SecurityManager security = System.getSecurityManager();  
  3.     if (security != null)  
  4.             security.checkPermission(shutdownPerm);  
  5.         final ReentrantLock mainLock = this.mainLock;  
  6.         mainLock.lock();  
  7.         try {  
  8.             if (security != null) { // Check if caller can modify our threads  
  9.                 for (Worker w : workers)  
  10.                     security.checkAccess(w.thread);  
  11.             }  
  12.             int state = runState;  
  13.             if (state < SHUTDOWN)  
  14.                 //設置線程池狀態為關閉狀態  
  15.                 runState = SHUTDOWN;     //----------------代碼1  
  16.             try {  
  17.                 for (Worker w : workers) {  
  18.                     //一個一個中斷線程  
  19.                     w.interruptIfIdle();  //-----------------代碼2  
  20.                 }  
  21.             } catch (SecurityException se) { // Try to back out  
  22.                 runState = state;  
  23.                 // tryTerminate() here would be a no-op  
  24.                 throw se;  
  25.             }  
  26.             tryTerminate(); // Terminate now if pool and queue empty  
  27.         } finally {  
  28.             mainLock.unlock();  
  29.         }  
  30.     }  


  看上面源碼,代碼1是線程池關閉的關鍵,如果線程池狀態一旦設為SHUTDOWN,則在線程池中會出現兩種現象: 
     1.你不能再往線程池中添加任何任務,否則會拋RejectedExecutionException異常(詳細請看ThreadPoolExecutor的addIfUnderCorePoolSize方法)。 
     2.工作線程Worker獲得池隊列中的任務時(詳細看Worker中的getTask()方法)的處理邏輯也發生了變化:如果線程池為RUNNING狀態,並且池隊列中沒任務時,它會一直等待,直到你提交任務到池隊列中,然后取出任務,返回。但是,一旦你執行了shutDown()方法,線程池狀態為SHUTDOWN狀態,它將不再等待了,直接返回null。如果返回null,則工作線程沒有要執行的任務,直接退出(詳細看Worker中run()方法)。 

    代碼2是針對這種情況的:在線程池關閉前,有部分工作線程就一直在等着要處理的任務,也就是說工作線程空閑着(這種情況我描述的不好,其實就是Worker正在執行getTask()方法中’ r = workQueue.take();’代碼段)。這時,調用interrupt()方法來中斷這些Worker線程。進入代碼2看看吧:。 
   
 
Java代碼   收藏代碼
  1. void interruptIfIdle() {  
  2.             final ReentrantLock runLock = this.runLock;  
  3.             /* 
  4.              * 注意這個條件,擺明的就是要等Worker中runTask()方法運行完后才成立。 
  5.              * 鎖機制 
  6.              */  
  7.             if (runLock.tryLock()) {  
  8.                 try {  
  9.             /* 
  10.              * 如果當前工作線程沒有正在運行,則中斷線程 
  11.              * 他能中斷工作線程的原因是getTask()方法能拋出一個 
  12.              * InterruptedException。這時,則可終止那些正在執行 
  13.              * workQueue.take()方法的工作線程 
  14.              */  
  15.             if (thread != Thread.currentThread())  
  16.             thread.interrupt();           
  17.                 } finally {  
  18.                     runLock.unlock();  
  19.                 }  
  20.             }  
  21.         }  


   最后進入shutDownNow()方法看看,這個更簡單了,就是設置線程池狀態為STOP,然后依次調用工作線程的interrupt()方法,就這么簡單,最后還是把源碼貼出來吧: 
    
     
Java代碼   收藏代碼
  1. public List<Runnable> shutdownNow() {  
  2.        /* 
  3.         * shutdownNow differs from shutdown only in that 
  4.         * 1. runState is set to STOP, 
  5.         * 2. all worker threads are interrupted, not just the idle ones, and 
  6.         * 3. the queue is drained and returned. 
  7.         */  
  8. SecurityManager security = System.getSecurityManager();  
  9. if (security != null)  
  10.            security.checkPermission(shutdownPerm);  
  11.   
  12.        final ReentrantLock mainLock = this.mainLock;  
  13.        mainLock.lock();  
  14.        try {  
  15.            if (security != null) { // Check if caller can modify our threads  
  16.                for (Worker w : workers)  
  17.                    security.checkAccess(w.thread);  
  18.            }  
  19.   
  20.            int state = runState;  
  21.            if (state < STOP)  
  22.                runState = STOP;  
  23.   
  24.            try {  
  25.                for (Worker w : workers) {  
  26.                    w.interruptNow();  
  27.                }  
  28.            } catch (SecurityException se) { // Try to back out  
  29.                runState = state;  
  30.                // tryTerminate() here would be a no-op  
  31.                throw se;  
  32.            }  
  33.   
  34.            List<Runnable> tasks = drainQueue();  
  35.            tryTerminate(); // Terminate now if pool and queue empty  
  36.            return tasks;  
  37.        } finally {  
  38.            mainLock.unlock();  
  39.        }  
  40.    }  

    上面代碼沒什么好分析的了,一看就明白,其實別看上面代碼一大篇,我們只關心“w.interruptNow();”即可。 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM