多線程應用中,經常會遇到這種場景:后面的處理,依賴前面的N個線程的處理結果,必須等前面的線程執行完畢后,后面的代碼才允許執行。
在我不知道CyclicBarrier之前,最容易想到的就是放置一個公用的static變量,假如有10個線程,每個線程處理完上去累加下結果,然后后面用一個死循環(或類似線程阻塞的方法),去數這個結果,達到10個,說明大家都爽完了,可以進行后續的事情了,這個想法雖然土鱉,但是基本上跟語言無關,幾乎所有主流編程語言都支持。
package yjmyzz.test;
public class ThreadLockTest {
public static int flag = 0;//公用變量
public static void main(String[] args) throws Exception {
ThreadLockTest testObj = new ThreadLockTest();
final int threadNum = 10;
for (int i = 0; i < threadNum; i++) {
new Thread(new MyRunable(i, testObj)).start();
}
while (true) {
if (testObj.flag >= threadNum) {
System.out.println("-----------\n所有thread執行完成!");
break;
}
Thread.sleep(10);
}
}
static class MyRunable implements Runnable {
int _i = 0;
ThreadLockTest _test;
public MyRunable(int i, ThreadLockTest test) {
this._i = i;
this._test = test;
}
@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 10));
System.out.println("thread " + _i + " done");
//利用synchronized獲得同步鎖
synchronized (_test) {
_test.flag += 1;
}
System.out.println("thread " + _i + " => " + _test.flag);//測試用
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
輸出結果:
thread 0 done
thread 0 => 1
thread 9 done
thread 9 => 2
thread 1 done
thread 1 => 3
thread 3 done
thread 3 => 4
thread 7 done
thread 7 => 5
thread 6 done
thread 6 => 6
thread 2 done
thread 2 => 7
thread 4 done
thread 4 => 8
thread 8 done
thread 8 => 9
thread 5 done
thread 5 => 10
-----------
所有thread執行完成!
除了這個方法,還可以借助FutureTask,達到類似的效果,其get方法會阻塞線程,等到該異步處理完成。缺點就是,FutureTask調用的是Callable,必須要有返回值,所以就算你不想要返回值,也得返回點啥
package yjmyzz.test;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String>[] tasks = new FutureTask[10];
for (int i = 0; i < tasks.length; i++) {
final int j = i;
tasks[i] = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep((long) (Math.random() * 100));
return "task" + j + " done";
}
});
new Thread(tasks[i]).start();
}
for (int i = 0; i < tasks.length; i++) {
System.out.println(tasks[i].get());//依次等待所有task執行完畢
}
System.out.println("-----------\n所有task執行完成!");
}
}
執行結果:
task0 done
task1 done
task2 done
task3 done
task4 done
task5 done
task6 done
task7 done
task8 done
task9 done
-----------
所有task執行完成!
此外,Thread的Join方法也可以實現類似的效果,主要代碼如下:
public static void main(String[] args) throws Exception {
final int threadNum = 10;
Thread[] threads = new Thread[threadNum];
for (int i = 0; i < threadNum; i++) {
threads[i] = new Thread(new MyRunable(i));
threads[i].start();
}
for (int i = 0; i < threadNum; i++) {
threads[i].join();
}
System.out.println("-----------\n所有thread執行完成!");
}
當然,這個需求最“正統”的解法應該是使用CyclicBarrier,它可以設置一個所謂的“屏障點”(或稱集合點),好比在一項團隊活動中,每個人都是一個線程,但是規定某一項任務開始前,所有人必須先到達集合點,集合完成后,才能繼續后面的任務。
package yjmyzz.test;
import java.util.concurrent.CyclicBarrier;
public class ThreadTest {
public static void main(String[] args) throws Exception {
final int threadNum = 10;
CyclicBarrier cb = new CyclicBarrier(threadNum + 1);//注意:10個子線程 + 1個主線程
for (int i = 0; i < threadNum; i++) {
new Thread(new MyRunable(cb, i)).start();
}
cb.await();
System.out.println("-----------\n所有thread執行完成!");
}
static class MyRunable implements Runnable {
CyclicBarrier _cb;
int _i = 0;
public MyRunable(CyclicBarrier cb, int i) {
this._cb = cb;
this._i = i;
}
@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 100));
System.out.println("thread " + _i + " done,正在等候其它線程完成...");
_cb.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
thread 9 done,正在等候其它線程完成...
thread 5 done,正在等候其它線程完成...
thread 0 done,正在等候其它線程完成...
thread 6 done,正在等候其它線程完成...
thread 4 done,正在等候其它線程完成...
thread 2 done,正在等候其它線程完成...
thread 3 done,正在等候其它線程完成...
thread 8 done,正在等候其它線程完成...
thread 7 done,正在等候其它線程完成...
thread 1 done,正在等候其它線程完成...
-----------
所有thread執行完成!
參考文章:
http://ifeve.com/concurrency-cyclicbarrier/
http://ifeve.com/thread-synchronization-utilities-5/
http://ifeve.com/semaphore-countdownlatch-cyclicbarrier-phaser-exchanger-in-java/
