CyclicBarrier与线程池结合使用


最近使用线程池要等前面线程执行完了把结果汇总,于是想到了使用CyclicBarrier和线程池实现,当然CountDownlatch也是可以的,其他方式也可以的

直接上代码

public class CyclicBarriarDemo {
    private static final Integer THREAD_SIZE = 3;

    public static void main(String[] args) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        Integer sum = 0;
        ExecutorService cachedPool = Executors.newCachedThreadPool();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        Future<Integer> sum10 = pool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Integer sum1 = 0;
                try {
                    for (int i = 1; i <= 100; i++) {
                        sum1 = sum1 + i;
                    }
                } catch (Exception e) {
                    System.out.println("1 error:" + e.getMessage());
                }
                System.out.println(sum1);
                System.out.println(Thread.currentThread().getName());
                cyclicBarrier.await();
                return sum1;
            }
        });
        System.out.println("sum10:" + sum10.get());

        ThreadPoolExecutor pool2 = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        Future<Integer> sum20 = pool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Integer sum1 = 0;
                try {
                    for (int i = 101; i <= 200; i++) {
                        sum1 = sum1 + i;
                    }
                } catch (Exception e) {
                    System.out.println("1 error:" + e.getMessage());
                }
                System.out.println(sum1);
                System.out.println(Thread.currentThread().getName());
                cyclicBarrier.await();
                return sum1;
            }
        });

        ThreadPoolExecutor pool3 = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        Future<Integer> sum30 = pool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Integer sum1 = 0;
                try {
                    for (int i = 201; i <= 300; i++) {
                        sum1 = sum1 + i;
                    }
                } catch (Exception e) {
                    System.out.println("1 error:" + e.getMessage());
                }
                System.out.println(sum1);
                System.out.println(Thread.currentThread().getName());
                cyclicBarrier.await();
                return sum1;
            }
        });
//        System.out.println("sum30:" + sum30.get());
        pool.shutdown();
        sum = sum10.get() + sum20.get() + sum30.get();
        System.out.println("sum:" + sum);
    }
}

  上面红色标注的地方会导致程序阻塞,主要是Future的get()方法引起的,通过命令行输入jps找到对应的程序然后通过jstack -l pid可以看到

FutureTask.get()对应的awaitDone方法

/**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

  其实理解起来不难线程执行的时候CyclicBarrier的await()就阻塞了,你get拿值的时候又是阻塞的导致后面的程序无法继续运行,拿掉get后面get是因为CyclicBarrier等待的一组线程都执行完了,都能拿到返回结果了,get就能获取对应的值了。当然上面也说了其他方式也可以,这只是简单demo而已。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM