線程池中的柵欄


多線程中有三個類,分別是CountDownLatch,CyclicBarrier,Semaphore。代表着線程中的柵欄。共享鎖。

CountDownLatch

在一組線程中,一個線程等待其他線程。我把它理解為門栓。

查看該類的數據結構圖如下圖一

​ 圖一

有一個靜態的內部類,Sync繼承自AQS。

  • CountDownLatch(int) : 有參構造方法,該類擁有的共享鎖次數。
  • await() : 阻塞,等待該類釋放掉共享鎖,也就是說,擁有共享鎖的次數為0
  • countDown:釋放一次共享鎖
  • getCount: 獲取共享鎖次數

使用例子:代碼如下:


/**
 * @ClassName CountDownLatchTest
 * @Description 共享鎖。在完成一組正在其他線程中執行的操作之前,允許一個或者多個線程一直等待
 * @Author ouyangkang
 * @Date 2018/10/23 14:33
 **/
public class CountDownLatchTest {
    private static int SIZE = 6;

    private static CountDownLatch countDownLatch;

    /**
     * @Date 2018/10/25 15:49
     * @Description 阻塞隊列為3 核心池的大小為2 最大池的大小為6的線程池
     **/
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2, 6, 60L,TimeUnit.SECONDS, new LinkedBlockingDeque<>(3));

    public static void main(String[] args) {
        countDownLatch = new CountDownLatch(SIZE);

        for (int i = 0; i < 6; i++) {
            threadPoolExecutor.execute(new ThreadTest());
        }

        try {
            // 阻塞等待
            countDownLatch.await();
            System.out.printf("thread main = [{%s}] \n", Thread.currentThread().getName());
            threadPoolExecutor.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    static class ThreadTest extends Thread {
        @Override
        public void run() {
            System.out.printf("thread-name = [%s] \n", Thread.currentThread().getName());
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("thread-name =[%s] 執行 \n", Thread.currentThread().getName());

            // 數值減一
            countDownLatch.countDown();
        }
    }
}

返回結果:

thread-name = [pool-1-thread-1] 
thread-name = [pool-1-thread-3] 
thread-name = [pool-1-thread-2] 
thread-name =[pool-1-thread-1] 執行 
thread-name =[pool-1-thread-3] 執行 
thread-name = [pool-1-thread-1] 
thread-name = [pool-1-thread-3] 
thread-name =[pool-1-thread-2] 執行 
thread-name = [pool-1-thread-2] 
thread-name =[pool-1-thread-1] 執行 
thread-name =[pool-1-thread-3] 執行 
thread-name =[pool-1-thread-2] 執行 
thread main = [{main}] 
  • 線程池中阻塞隊列為3,啟動了6個線程。阻塞隊列中阻塞3個,核心池2個在執行任務,最大池的大小為6,那么就會新建一個線程來處理該任務。
  • 主線程最后執行,wait等待其他線程任務執行完畢,再繼續執行主線程。

CyclicBarrier

在一組線程中允許多個線程相互等待。就是,每個線程都先執行一下,然后互相等待到一個點。然后再執行。

查看該類的數據結構圖

有參構造方法,設置屏障點是多少,創建線程,等待。直到線程數量大於等於該屏障點。處於該屏障點等待得線程全部喚醒。

使用例子如下:


/**
 * @ClassName CyclicBarrierTest
 * @Description 屏障,允許多個線程相互等待。到達了某一個臨界點,就喚醒所有線程
 * @Author ouyangkang
 * @Date 2018/10/23 15:00
 **/
public class CyclicBarrierTest {

    private static int SIZE = 5;
    
    private static CyclicBarrier cyclicBarrier;

    public static void main(String[] args) {
        cyclicBarrier = new CyclicBarrier(SIZE);
        for (int i = 0; i < 5 ;i++){
            new ThreadTest().start();
        }
    }

    static class ThreadTest extends Thread {
        @Override
        public void run() {
            System.out.printf("thread=[%s] 開始-- \n", Thread.currentThread().getName());

            try {
                Thread.sleep(1000);
                cyclicBarrier.await();
                System.out.printf("thread=[%s] 繼續--- \n", Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

結果如下:

thread=[Thread-0] 開始-- 
thread=[Thread-4] 開始-- 
thread=[Thread-3] 開始-- 
thread=[Thread-1] 開始-- 
thread=[Thread-2] 開始-- 
thread=[Thread-2] 繼續--- 
thread=[Thread-4] 繼續--- 
thread=[Thread-0] 繼續--- 
thread=[Thread-1] 繼續--- 
thread=[Thread-3] 繼續--- 

Semaphore

在一組線程中,線程持有信號量,如果信息量得大小為10,那么所有線程能夠持有得信號量不能超過10,如果3個線程分別持有得信號量是3 4 5 。 那么只能是兩個線程運行,當其中一個釋放了該信號量,其他線程才可以運行。

類圖結構如下:

具體例子如下:


/**
 * @ClassName SemaphoreTest
 * @Description 信號量
 * @Author ouyangkang
 * @Date 2018/10/23 16:34
 **/
public class SemaphoreTest {



    private static int SIZE = 10;


    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(SIZE);
        ExecutorService executorService = new  ThreadPoolExecutor(2,3,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(5));
        executorService.execute(new ThreadTest(3,semaphore));
        executorService.execute(new ThreadTest(4,semaphore));
        executorService.execute(new ThreadTest(5,semaphore));
        executorService.shutdown();
    }


    static class ThreadTest extends Thread{

        private Integer count ;

        private Semaphore semaphore;

        public ThreadTest(Integer count , Semaphore semaphore){
            this.count = count ;
            this.semaphore = semaphore;
        }
        @Override
        public void run() {
            try {
                semaphore.acquire(count);
                System.out.printf("thread=[%s]  擁有信號量 semaphore=[%d] 開始---- \n",Thread.currentThread().getName(), count);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                semaphore.release(count);
                System.out.printf("thread=[%s] 釋放了 semaphore=[%d] --\n",Thread.currentThread().getName(),count);
            }


        }
    }
}

具體結果如下:

thread=[pool-1-thread-2]  擁有信號量 semaphore=[4] 開始---- 
thread=[pool-1-thread-1]  擁有信號量 semaphore=[3] 開始---- 
thread=[pool-1-thread-2] 釋放了 semaphore=[4] --
thread=[pool-1-thread-2]  擁有信號量 semaphore=[5] 開始---- 
thread=[pool-1-thread-1] 釋放了 semaphore=[3] --
thread=[pool-1-thread-2] 釋放了 semaphore=[5] --


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM