CyclicBarrier與線程池結合使用


最近使用線程池要等前面線程執行完了把結果匯總,於是想到了使用CyclicBarrier和線程池實現,當然CountDownlatch也是可以的,其他方式也可以的

直接上代碼

public class CyclicBarriarDemo {
    private static final Integer THREAD_SIZE = 3;

    public static void main(String[] args) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        Integer sum = 0;
        ExecutorService cachedPool = Executors.newCachedThreadPool();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        Future<Integer> sum10 = pool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Integer sum1 = 0;
                try {
                    for (int i = 1; i <= 100; i++) {
                        sum1 = sum1 + i;
                    }
                } catch (Exception e) {
                    System.out.println("1 error:" + e.getMessage());
                }
                System.out.println(sum1);
                System.out.println(Thread.currentThread().getName());
                cyclicBarrier.await();
                return sum1;
            }
        });
        System.out.println("sum10:" + sum10.get());

        ThreadPoolExecutor pool2 = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        Future<Integer> sum20 = pool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Integer sum1 = 0;
                try {
                    for (int i = 101; i <= 200; i++) {
                        sum1 = sum1 + i;
                    }
                } catch (Exception e) {
                    System.out.println("1 error:" + e.getMessage());
                }
                System.out.println(sum1);
                System.out.println(Thread.currentThread().getName());
                cyclicBarrier.await();
                return sum1;
            }
        });

        ThreadPoolExecutor pool3 = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        Future<Integer> sum30 = pool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Integer sum1 = 0;
                try {
                    for (int i = 201; i <= 300; i++) {
                        sum1 = sum1 + i;
                    }
                } catch (Exception e) {
                    System.out.println("1 error:" + e.getMessage());
                }
                System.out.println(sum1);
                System.out.println(Thread.currentThread().getName());
                cyclicBarrier.await();
                return sum1;
            }
        });
//        System.out.println("sum30:" + sum30.get());
        pool.shutdown();
        sum = sum10.get() + sum20.get() + sum30.get();
        System.out.println("sum:" + sum);
    }
}

  上面紅色標注的地方會導致程序阻塞,主要是Future的get()方法引起的,通過命令行輸入jps找到對應的程序然后通過jstack -l pid可以看到

FutureTask.get()對應的awaitDone方法

/**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

  其實理解起來不難線程執行的時候CyclicBarrier的await()就阻塞了,你get拿值的時候又是阻塞的導致后面的程序無法繼續運行,拿掉get后面get是因為CyclicBarrier等待的一組線程都執行完了,都能拿到返回結果了,get就能獲取對應的值了。當然上面也說了其他方式也可以,這只是簡單demo而已。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM