一、CountDownLatch
public
class
CountDownLatchDemo {
final
static
SimpleDateFormat sdf=
new
SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss"
);
public
static
void
main(String[] args)
throws
InterruptedException {
CountDownLatch latch=
new
CountDownLatch(
2
);
//兩個工人的協作
Worker worker1=
new
Worker(
"zhang san"
,
5000
, latch);
Worker worker2=
new
Worker(
"li si"
,
8000
, latch);
worker1.start();
//
worker2.start();
//
latch.await();
//等待所有工人完成工作
System.out.println(
"all work done at "
+sdf.format(
new
Date()));
}
static
class
Worker
extends
Thread{
String workerName;
int
workTime;
CountDownLatch latch;
public
Worker(String workerName ,
int
workTime ,CountDownLatch latch){
this
.workerName=workerName;
this
.workTime=workTime;
this
.latch=latch;
}
public
void
run(){
System.out.println(
"Worker "
+workerName+
" do work begin at "
+sdf.format(
new
Date()));
doWork();
//工作了
System.out.println(
"Worker "
+workerName+
" do work complete at "
+sdf.format(
new
Date()));
latch.countDown();
//工人完成工作,計數器減一
}
private
void
doWork(){
try
{
Thread.sleep(workTime);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
}
}
}
http://blog.chenlb.com/2008/12/main-thread-wait-all-sub-thread-finish-task-in-thread-pool.html
原文出處:http://blog.chenlb.com/2008/12/main-thread-wait-all-sub-thread-finish-task-in-thread-pool.html
用線程池編寫多線程程序時,當所有任務完成時,要做一些統計的工作。而統計工作必須要在所有任務完成才能做。所以要讓主線程等待所有任務完成。可以使用ThreadPoolExecutor.awaitTermination(long timeout, TimeUnit unit)。請看示例代碼:
- package com.chenlb;
- import java.util.Random;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- /**
- * 線程池使用示例, 主線程等待所有任務完成再結束.
- *
- * @author chenlb 2008-12-2 上午10:31:03
- */
- public class ThreadPoolUse {
- public static class MyTask implements Runnable {
- private static int id = 0;
- private String name = "task-"+(++id);
- private int sleep;
- public MyTask(int sleep) {
- super();
- this.sleep = sleep;
- }
- public void run() {
- System.out.println(name+" -----start-----");
- try {
- Thread.sleep(sleep); //模擬任務執行.
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(name+" -----end "+sleep+"-----");
- }
- }
- public static void main(String[] args) {
- System.out.println("==================start==================");
- ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
- int n = 10;
- int sleep = 10 * 1000; //10s
- Random rm = new Random();
- for(int i=0; i<n; i++) {
- executor.execute(new MyTask(rm.nextInt(sleep)+1));
- }
- executor.shutdown();//只是不能再提交新任務,等待執行的任務不受影響
- try {
- boolean loop = true;
- do { //等待所有任務完成
- loop = !executor.awaitTermination(2, TimeUnit.SECONDS); //阻塞,直到線程池里所有任務結束
- } while(loop);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("==================end====================");
- }
- }
當然還有其它方法。
http://xtu-xiaoxin.iteye.com/blog/649677
shutDown()
當線程池調用該方法時,線程池的狀態則立刻變成SHUTDOWN狀態。此時,則不能再往線程池中添加任何任務,否則將會拋出RejectedExecutionException異常。但是,此時線程池不會立刻退出,直到添加到線程池中的任務都已經處理完成,才會退出。 唯一的影響就是不能再提交任務了,正則執行的任務即使在阻塞着也不會結束,在排隊的任務也不會取消。
shutdownNow()
根據JDK文檔描述,大致意思是:執行該方法,線程池的狀態立刻變成STOP狀態,並試圖停止所有正在執行的線程,不再處理還在池隊列中等待的任務,當然,它會返回那些未執行的任務。
它試圖終止線程的方法是通過調用Thread.interrupt()方法來實現的,但是大家知道,這種方法的作用有限,如果線程中沒有sleep 、wait、Condition、定時鎖等應用, interrupt()方法是無法中斷當前的線程的。所以,ShutdownNow()並不代表線程池就一定立即就能退出,它可能必須要等待所有正在執行的任務都執行完成了才能退出。
上面對shutDown()以及shutDownNow()作了一個簡單的、理論上的分析。如果想知道why,則需要親自打開JDK源碼,分析分析。
想要分析shutDown()以及shutDownNow()源碼,我建議首先要對ThreadPoolExecutor有個大概了解。因為關閉線程池的所有方法邏輯都在ThreadPoolExecutor中處理的。
如果你真的想知道為什么,建議看一下我以前寫的一篇對ThreadPoolExecutor源碼分析的博文,我想這對你比較透徹的了解shutDown()和shutDownNow()的區別以及Java 線程池原理有很大的幫助。博文URL:
http://xtu-xiaoxin.iteye.com/admin/blogs/647744
廢話少說,要查看源碼,首先進入ThreadPoolExecutor的shutDown()方法:
- public void shutdown() {
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- security.checkPermission(shutdownPerm);
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (security != null) { // Check if caller can modify our threads
- for (Worker w : workers)
- security.checkAccess(w.thread);
- }
- int state = runState;
- if (state < SHUTDOWN)
- //設置線程池狀態為關閉狀態
- runState = SHUTDOWN; //----------------代碼1
- try {
- for (Worker w : workers) {
- //一個一個中斷線程
- w.interruptIfIdle(); //-----------------代碼2
- }
- } catch (SecurityException se) { // Try to back out
- runState = state;
- // tryTerminate() here would be a no-op
- throw se;
- }
- tryTerminate(); // Terminate now if pool and queue empty
- } finally {
- mainLock.unlock();
- }
- }
看上面源碼,代碼1是線程池關閉的關鍵,如果線程池狀態一旦設為SHUTDOWN,則在線程池中會出現兩種現象:
1.你不能再往線程池中添加任何任務,否則會拋RejectedExecutionException異常(詳細請看ThreadPoolExecutor的addIfUnderCorePoolSize方法)。
2.工作線程Worker獲得池隊列中的任務時(詳細看Worker中的getTask()方法)的處理邏輯也發生了變化:如果線程池為RUNNING狀態,並且池隊列中沒任務時,它會一直等待,直到你提交任務到池隊列中,然后取出任務,返回。但是,一旦你執行了shutDown()方法,線程池狀態為SHUTDOWN狀態,它將不再等待了,直接返回null。如果返回null,則工作線程沒有要執行的任務,直接退出(詳細看Worker中run()方法)。
代碼2是針對這種情況的:在線程池關閉前,有部分工作線程就一直在等着要處理的任務,也就是說工作線程空閑着(這種情況我描述的不好,其實就是Worker正在執行getTask()方法中’ r = workQueue.take();’代碼段)。這時,調用interrupt()方法來中斷這些Worker線程。進入代碼2看看吧:。
- void interruptIfIdle() {
- final ReentrantLock runLock = this.runLock;
- /*
- * 注意這個條件,擺明的就是要等Worker中runTask()方法運行完后才成立。
- * 鎖機制
- */
- if (runLock.tryLock()) {
- try {
- /*
- * 如果當前工作線程沒有正在運行,則中斷線程
- * 他能中斷工作線程的原因是getTask()方法能拋出一個
- * InterruptedException。這時,則可終止那些正在執行
- * workQueue.take()方法的工作線程
- */
- if (thread != Thread.currentThread())
- thread.interrupt();
- } finally {
- runLock.unlock();
- }
- }
- }
最后進入shutDownNow()方法看看,這個更簡單了,就是設置線程池狀態為STOP,然后依次調用工作線程的interrupt()方法,就這么簡單,最后還是把源碼貼出來吧:
- public List<Runnable> shutdownNow() {
- /*
- * shutdownNow differs from shutdown only in that
- * 1. runState is set to STOP,
- * 2. all worker threads are interrupted, not just the idle ones, and
- * 3. the queue is drained and returned.
- */
- SecurityManager security = System.getSecurityManager();
- if (security != null)
- security.checkPermission(shutdownPerm);
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (security != null) { // Check if caller can modify our threads
- for (Worker w : workers)
- security.checkAccess(w.thread);
- }
- int state = runState;
- if (state < STOP)
- runState = STOP;
- try {
- for (Worker w : workers) {
- w.interruptNow();
- }
- } catch (SecurityException se) { // Try to back out
- runState = state;
- // tryTerminate() here would be a no-op
- throw se;
- }
- List<Runnable> tasks = drainQueue();
- tryTerminate(); // Terminate now if pool and queue empty
- return tasks;
- } finally {
- mainLock.unlock();
- }
- }
上面代碼沒什么好分析的了,一看就明白,其實別看上面代碼一大篇,我們只關心“w.interruptNow();”即可。