原文:http://www.jiacheo.org/blog/262
Java如何等待子線程執行結束
今天討論一個入門級的話題, 不然沒東西更新對不起空間和域名~~
工作總往往會遇到異步去執行某段邏輯, 然后先處理其他事情, 處理完后再把那段邏輯的處理結果進行匯總的產景, 這時候就需要使用線程了.
一個線程啟動之后, 是異步的去執行需要執行的內容的, 不會影響主線程的流程, 往往需要讓主線程指定后, 等待子線程的完成. 這里有幾種方式.
站在 主線程的角度, 我們可以分為主動式和被動式.
主動式指主線主動去檢測某個標志位, 判斷子線程是否已經完成. 被動式指主線程被動的等待子線程的結束, 很明顯, 比較符合人們的胃口. 就是你事情做完了, 你告訴我, 我匯總一下, 哈哈.
那么主線程如何等待子線程工作完成呢. 很簡單, Thread 類給我們提供了join 系列的方法, 這些方法的目的就是等待當前線程的die. 舉個例子.
public
class
Threads {
public
static
void
main
(String[]
args
) {
SubThread thread
=
new
SubThread
()
;
thread
.
start
()
;
//主線程處理其他工作,讓子線程異步去執行.
mainThreadOtherWork
()
;
System
.
out
.
println
(
“now waiting sub thread done.”
)
;
//主線程其他工作完畢,等待子線程的結束, 調用join系列的方法即可(可以設置超時時間)
try
{
thread
.
join
()
;
}
catch
( InterruptedException e) {
e
.
printStackTrace
()
;
}
System
.
out
.
println
(
“now all done.”
)
;
}
private
static
void
mainThreadOtherWork
() {
System
.
out
.
println
(
“main thread work start”
)
;
try
{
Thread
.
sleep
(
3000L
)
;
}
catch
( InterruptedException e) {
e
.
printStackTrace
()
;
}
System
.
out
.
println
(
“main thread work done.”
)
;
}
public
static
class
SubThread
extends
Thread{
@Override
public
void
run
() {
working
()
;
}
private
void
working
() {
System
.
out
.
println
(
“sub thread start working.”
)
;
busy
()
;
System
.
out
.
println
(
“sub thread stop working.”
)
;
}
private
void
busy
() {
try
{
sleep
(
5000L
)
;
}
catch
( InterruptedException e) {
e
.
printStackTrace
()
;
}
}
}
}
本程序的數據有可能是如下:
- main thread work start
- sub thread start working.
- main thread work done.
- now waiting sub thread done.
- sub thread stop working.
- now all done.
忽略標號, 當然輸出也有可能是1和2調換位置了. 這個我們是無法控制的. 我們看下線程的join操作, 究竟干了什么.
public
final
void
join()
throws
InterruptedException {
join(0)
;
}
這里是調用了
public final synchronized void join( long millis)
方法, 參數為0, 表示沒有超時時間, 等到線程結束為止. join(millis)方法里面有這么一段代碼:
while (isAlive()) {
wait(0)
;
}
說明, 當線程處於活躍狀態的時候, 會一直等待, 直到這里的isAlive方法返回false, 才會結束.isAlive方法是一個本地方法, 他的作用是判斷線程是否已經執行結束. 注釋是這么寫的:
Tests if this thread is alive. A thread is alive if it has been started and has not yet died.
可見, join系列方法可以幫助我們等待一個子線程的結束.
那么要問, 有沒有另外一種方法可以等待子線程結束? 當然有的, 我們可以使用並發包下面的Future模式.
Future是一個任務執行的結果, 他是一個將來時, 即一個任務執行, 立即異步返回一個Future對象, 等到任務結束的時候, 會把值返回給這個future對象里面. 我們可以使用ExecutorService接口來提交一個線程.
public
class
Threads {
static
ExecutorService
executorService
=
Executors
.
newFixedThreadPool
(
1
)
;
@SuppressWarnings
(
“rawtypes”
)
public
static
void
main
(String[]
args
)
throws
InterruptedException
,
ExecutionException {
SubThread thread
=
new
SubThread
()
;
// thread.start();
Future
future
=
executorService
.
submit
(thread)
;
mainThreadOtherWork
()
;
System
.
out
.
println
(
“now waiting sub thread done.”
)
;
future
.
get
()
;
// try {
// thread.join();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System
.
out
.
println
(
“now all done.”
)
;
executorService
.
shutdown
()
;
}
private
static
void
mainThreadOtherWork
() {
System
.
out
.
println
(
“main thread work start”
)
;
try
{
Thread
.
sleep
(
3000L
)
;
}
catch
(InterruptedException e) {
e
.
printStackTrace
()
;
}
System
.
out
.
println
(
“main thread work done.”
)
;
}
public
static
class
SubThread
extends
Thread{
@Override
public
void
run
() {
working
()
;
}
private
void
working
() {
System
.
out
.
println
(
“sub thread start working.”
)
;
busy
()
;
System
.
out
.
println
(
“sub thread stop working.”
)
;
}
private
void
busy
() {
try
{
sleep
(
5000L
)
;
}
catch
(InterruptedException e) {
e
.
printStackTrace
()
;
}
}
}
}
這 里, ThreadPoolExecutor 是實現了 ExecutorService的方法, sumbit的過程就是把一個Runnable接口對象包裝成一個 Callable接口對象, 然后放到 workQueue里等待調度執行. 當然, 執行的啟動也是調用了thread的start來做到的, 只不過這里被包裝掉了. 另外, 這里的thread是會被重復利用的, 所以這里要退出主線程, 需要執行以下shutdown方法以示退出使用線程池. 扯遠了.
這 種方法是得益於Callable接口和Future模式, 調用future接口的get方法, 會同步等待該future執行結束, 然后獲取到結果. Callbale接口的接口方法是 V call(); 是可以有返回結果的, 而Runnable的 void run(), 是沒有返回結果的. 所以, 這里即使被包裝成Callbale接口, future.get返回的結果也是null的.如果需要得到返回結果, 建議使用Callable接口.
通過隊列來控制線程的進度, 是很好的一個理念. 我們完全可以自己搞個隊列, 自己控制. 這樣也可以實現. 不信看代碼:
public class Threads {
// static ExecutorService executorService = Executors.newFixedThreadPool(1);
static final BlockingQueue < Integer > queue = new ArrayBlockingQueue < Integer > ( 1 ) ;
public static void main (String[] args ) throws InterruptedException , ExecutionException {
SubThread thread = new SubThread ( queue ) ;
thread . start () ;
// Future future = executorService.submit(thread);
mainThreadOtherWork () ;
System . out . println ( “now waiting sub thread done.” ) ;
// future.get();
queue . take () ;
// try {
// thread.join();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System . out . println ( “now all done.” ) ;
// executorService.shutdown();
}
private static void mainThreadOtherWork () {
System . out . println ( “main thread work start” ) ;
try {
Thread . sleep ( 3000L ) ;
} catch (InterruptedException e) {
e . printStackTrace () ;
}
System . out . println ( “main thread work done.” ) ;
}
public static class SubThread extends Thread{
private BlockingQueue < Integer > queue ;
/**
* @param queue
*/
public SubThread ( BlockingQueue < Integer > queue ) {
this . queue = queue ;
}
@Override
public void run () {
try {
working () ;
} finally {
try {
queue . put ( 1 ) ;
} catch (InterruptedException e) {
e . printStackTrace () ;
}
}
}
private void working () {
System . out . println ( “sub thread start working.” ) ;
busy () ;
System . out . println ( “sub thread stop working.” ) ;
}
private void busy () {
try {
sleep ( 5000L ) ;
} catch (InterruptedException e) {
e . printStackTrace () ;
}
}
}
}
這 里是得益於我們用了一個阻塞隊列, 他的put操作和take操作都會阻塞(同步), 在滿足條件的情況下.當我們調用take()方法是, 由於子線程還沒結束, 隊列是空的, 所以這里的take操作會阻塞, 直到子線程結束的時候, 往隊列里面put了個元素, 表明自己結束了. 這時候主線程的take()就會返回他拿到的數據. 當然, 他拿到什么我們是不必去關心的.
以上幾種情況都是針對子線程只有1個的時候. 當子線程有多個的時候, 情況就不妙了.
第一種方法, 你要調用很多個線程的join, 特別是當你的線程不是for循環創建的, 而是一個一個創建的時候.
第二種方法, 要調用很多的future的get方法, 同第一種方法.
第三種方法, 比較方便一些, 只需要每個線程都在queue里面 put一個元素就好了.但是, 第三種方法, 這個隊列里的對象, 對我們是毫無用處, 我們為了使用隊列, 而要不明不白浪費一些內存, 那有沒有更好的辦法呢?
有的, concurrency包里面提供了好多有用的東東, 其中, CountDownLanch就是我們要用的.
CountDownLanch 是一個倒數計數器, 給一個初始值(>=0), 然后沒countDown一次就會減1, 這很符合等待多個子線程結束的產景: 一個線程結束的時候, countDown一次, 直到所有都countDown了 , 那么所有子線程就都結束了.
先看看CountDownLanch有哪些方法:
await: 會阻塞等待計數器減少到0位置. 帶參數的await是多了等待時間.
countDown: 將當前的技術減1
getCount(): 返回當前的計數
顯而易見, 我們只需要在子線程執行之前, 賦予初始化countDownLanch, 並賦予線程數量為初始值.
每個線程執行完畢的時候, 就countDown一下.主線程只需要調用await方法, 可以等待所有子線程執行結束, 看代碼:
public class Threads {
// static ExecutorService executorService = Executors.newFixedThreadPool(1);
static final BlockingQueue < Integer > queue = new ArrayBlockingQueue < Integer > ( 1 ) ;
public static void main (String[] args ) throws InterruptedException , ExecutionException {
int threads = 5 ;
CountDownLatch countDownLatch = new CountDownLatch (threads) ;
for ( int i = 0 ; i < threads ; i ++ ){
SubThread thread = new SubThread ( 2000 * (i + 1 ) , countDownLatch) ;
thread . start () ;
}
// Future future = executorService.submit(thread);
mainThreadOtherWork () ;
System . out . println ( “now waiting sub thread done.” ) ;
// future.get();
// queue.take();
countDownLatch . await () ;
// try {
// thread.join();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System . out . println ( “now all done.” ) ;
// executorService.shutdown();
}
private static void mainThreadOtherWork () {
System . out . println ( “main thread work start” ) ;
try {
Thread . sleep ( 3000L ) ;
} catch (InterruptedException e) {
e . printStackTrace () ;
}
System . out . println ( “main thread work done.” ) ;
}
public static class SubThread extends Thread{
// private BlockingQueue<Integer> queue;
private CountDownLatch countDownLatch ;
private long work ;
/**
* @param queue
*/
// public SubThread(BlockingQueue<Integer> queue) {
// this.queue = queue;
// this.work = 5000L;
// }
public SubThread ( long work , CountDownLatch countDownLatch ) {
// this.queue = queue;
this . countDownLatch = countDownLatch ;
this . work = work ;
}
@Override
public void run () {
try {
working () ;
} finally {
// try {
// queue.put(1);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
countDownLatch . countDown () ;
}
}
private void working () {
System . out . println ( getName () + ” sub thread start working.” ) ;
busy () ;
System . out . println ( getName () + ” sub thread stop working.” ) ;
}
private void busy () {
try {
sleep ( work ) ;
} catch (InterruptedException e) {
e . printStackTrace () ;
}
}
}
}
此種方法也適用於使用 ExecutorService summit 的任務的執行.
另外還有一個並發包的類CyclicBarrier, 這個是(子)線程之間的互相等待的利器. 柵欄, 就是把大家都在一個地方堵住, 就像水閘, 等大家都完成了之前的操作, 在一起繼續下面的操作. 不過就不再本篇的討論訪問內了.
EOF

