最近使用线程池要等前面线程执行完了把结果汇总,于是想到了使用CyclicBarrier和线程池实现,当然CountDownlatch也是可以的,其他方式也可以的
直接上代码
public class CyclicBarriarDemo {
private static final Integer THREAD_SIZE = 3;
public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
Integer sum = 0;
ExecutorService cachedPool = Executors.newCachedThreadPool();
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
Future<Integer> sum10 = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Integer sum1 = 0;
try {
for (int i = 1; i <= 100; i++) {
sum1 = sum1 + i;
}
} catch (Exception e) {
System.out.println("1 error:" + e.getMessage());
}
System.out.println(sum1);
System.out.println(Thread.currentThread().getName());
cyclicBarrier.await();
return sum1;
}
});
System.out.println("sum10:" + sum10.get());
ThreadPoolExecutor pool2 = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
Future<Integer> sum20 = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Integer sum1 = 0;
try {
for (int i = 101; i <= 200; i++) {
sum1 = sum1 + i;
}
} catch (Exception e) {
System.out.println("1 error:" + e.getMessage());
}
System.out.println(sum1);
System.out.println(Thread.currentThread().getName());
cyclicBarrier.await();
return sum1;
}
});
ThreadPoolExecutor pool3 = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
Future<Integer> sum30 = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Integer sum1 = 0;
try {
for (int i = 201; i <= 300; i++) {
sum1 = sum1 + i;
}
} catch (Exception e) {
System.out.println("1 error:" + e.getMessage());
}
System.out.println(sum1);
System.out.println(Thread.currentThread().getName());
cyclicBarrier.await();
return sum1;
}
});
// System.out.println("sum30:" + sum30.get());
pool.shutdown();
sum = sum10.get() + sum20.get() + sum30.get();
System.out.println("sum:" + sum);
}
}
上面红色标注的地方会导致程序阻塞,主要是Future的get()方法引起的,通过命令行输入jps找到对应的程序然后通过jstack -l pid可以看到
FutureTask.get()对应的awaitDone方法
/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
其实理解起来不难线程执行的时候CyclicBarrier的await()就阻塞了,你get拿值的时候又是阻塞的导致后面的程序无法继续运行,拿掉get后面get是因为CyclicBarrier等待的一组线程都执行完了,都能拿到返回结果了,get就能获取对应的值了。当然上面也说了其他方式也可以,这只是简单demo而已。