並發編程(二)—— CountDownLatch、CyclicBarrier和Semaphore


CountDownLatch

      正如每個Java文檔所描述的那樣,CountDownLatch 是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程的操作執行完后再執行。

CountDownLatch是什么?

  CountDownLatch是在java1.5被引入的,跟它一起被引入的並發工具類還有CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它們都存在於java.util.concurrent包下。CountDownLatch這個類能夠使一個線程等待其他線程完成各自的工作后再執行。例如,應用程序的主線程希望在負責啟動框架服務的線程已經啟動所有的框架服務之后再執行。

  CountDownLatch是通過一個計數器來實現的,計數器的初始值為線程的數量。每當一個線程完成了自己的任務后,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然后在閉鎖上等待的線程就可以恢復執行任務。

CountDownLatch類只提供了一個構造器:

public CountDownLatch(int count) {  };  //參數count為計數值

然后下面這3個方法是CountDownLatch類中最重要的方法:

public void await() throws InterruptedException { };   //調用await()方法的線程會被掛起,它會等待直到count值為0才繼續執行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  //和await()類似,只不過等待一定的時間后count值還沒變為0的話就會繼續執行
public void countDown() { };  //將count值減1

構造器中的計數值(count)實際上就是閉鎖需要等待的線程數量。這個值只能被設置一次,而且CountDownLatch沒有提供任何機制去重新設置這個計數值

與CountDownLatch的第一次交互是主線程等待其他線程。主線程必須在啟動其他線程后立即調用CountDownLatch.await()方法。這樣主線程的操作就會在這個方法上阻塞,直到其他線程完成各自的任務。

其他N 個線程必須引用閉鎖對象,因為他們需要通知CountDownLatch對象,他們已經完成了各自的任務。這種通知機制是通過 CountDownLatch.countDown()方法來完成的;每調用一次這個方法,在構造函數中初始化的count值就減1。所以當N個線程都調 用了這個方法,count的值等於0,然后主線程就能通過await()方法,恢復執行自己的任務。

CountDownLatch使用例子

  比如對於馬拉松比賽,進行排名計算,參賽者的排名,肯定是跑完比賽之后,進行計算得出的,翻譯成Java識別的預發,就是N個線程執行操作,主線程等到N個子線程執行完畢之后,再繼續往下執行。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author: ChenHao
 * @Description:
 * @Date: Created in 11:05 2018/11/23
 * @Modified by:馬拉松比賽
 */
public class CountdownLatchTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final CountDownLatch cdOrder = new CountDownLatch(1);
        final CountDownLatch cdAnswer = new CountDownLatch(3);
        for(int i=0;i<3;i++){
            Runnable runnable = new Runnable(){
                @Override
                public void run(){
                    try {
                        System.out.println("運動員" + Thread.currentThread().getName() + "等待信號槍");
                        cdOrder.await();
                        System.out.println("運動員" + Thread.currentThread().getName() + "開跑");
                        Thread.sleep((long)(Math.random()*10000));
                        System.out.println("運動員" + Thread.currentThread().getName() + "到達終點!");
                        cdAnswer.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        try {
            Thread.sleep(5000);

            System.out.println("裁判" + Thread.currentThread().getName() + "即將鳴信號槍");
            cdOrder.countDown();
            System.out.println("裁判" + Thread.currentThread().getName() + "已經鳴槍,等待運動員跑完");
            cdAnswer.await();
            System.out.println("三個運動員都跑到了終點,裁判"+ Thread.currentThread().getName() +"統計名次" );
        } catch (Exception e) {
            e.printStackTrace();
        }
        service.shutdown();
    }
}

運行結果:

 

CyclicBarrier

      字面意思回環柵欄,通過它可以實現讓一組線程等待至某個狀態之后再全部同時執行。

CyclicBarrier是什么?

  CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續干活。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然后當前線程被阻塞。

CyclicBarrier使用例子

 實例代碼如下:

public class CyclicBarrierTest {

    static CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {

                }
                System.out.println(1);
            }
        }).start();

        try {
            c.await();
        } catch (Exception e) {

        }
        System.out.println(2);
    }
}

輸出

2
1

或者輸出

1
2

如果把new CyclicBarrier(2)修改成new CyclicBarrier(3)則主線程和子線程會永遠等待,因為沒有第三個線程執行await方法,即沒有第三個線程到達屏障,所以之前到達屏障的兩個線程都不會繼續執行。

CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties, Runnable barrierAction),用於在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。代碼如下:

public class CyclicBarrierTest2 {

    static CyclicBarrier c = new CyclicBarrier(2, new A());

    public static void main(String[] args) {
        new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    c.await();
                } catch (Exception e) {

                }
                System.out.println(1);
            }
        }).start();

        try {
            c.await();
        } catch (Exception e) {

        }
        System.out.println(2);
    }

    static class A implements Runnable {

        @Override
        public void run() {
            System.out.println(3);
        }

    }

}

輸出

3
1
2

下面我們來看看Barrier循環使用的例子,下面例子中getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數量

周末公司組織大巴去旅游,總共有三個景點,每個景點約定好游玩時間,一個景點結束后需要集中一起出發到下一個景點。

 1 import java.util.concurrent.CyclicBarrier;
 2 import java.util.concurrent.ExecutorService;
 3 import java.util.concurrent.Executors;
 4 
 5 public class CyclicBarrierTest {
 6 
 7     public static void main(String[] args) {
 8         ExecutorService service = Executors.newCachedThreadPool();
 9         final  CyclicBarrier cb = new CyclicBarrier(3);
10         for(int i=0;i<3;i++){
11             Runnable runnable = new Runnable(){
12                 public void run(){
13                     try {
14                         Thread.sleep((long)(Math.random()*10000));
15                         System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點1,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
16                         cb.await();
17 
18                         Thread.sleep((long)(Math.random()*10000));
19                         System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點2,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
20                         cb.await();
21                         Thread.sleep((long)(Math.random()*10000));
22                         System.out.println("線程" + Thread.currentThread().getName() + "即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));
23                         cb.await();
24                     } catch (Exception e) {
25                         e.printStackTrace();
26                     }
27                 }
28             };
29             service.execute(runnable);
30         }
31         service.shutdown();
32     }
33 }

運行結果:

結果分析:第9行設置需要攔截的線程數為3,三個人一起出發先到第一個景點游玩,第一個景點游玩結束后,第一個到達集合地點一的時候,cb.getNumberWaiting()為0,所以當前有1個已經到達,到16行代碼cb.await()處第一個到達的人開始等待剩余兩人到達;

第二人到達后等待第三人,第三人到達時,cb.getNumberWaiting()為2,表示前面等待的人數為2,此時三個人都到達了集合地點一,同時出發前往集合地點二,此時接着執行第18行代碼;同理三個人都到達集合地點二后再前往集合地點三,也就是執行第21行代碼。

CyclicBarrier的應用場景

CyclicBarrier可以用於多線程計算數據,最后合並計算結果的應用場景。比如我們用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個帳戶近一年的每筆銀行流水,現在需要統計用戶的日均銀行流水,先用多線程處理每個sheet里的銀行流水,都執行完之后,得到每個sheet的日均銀行流水,最后,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水。

CyclicBarrier和CountDownLatch的區別

  • CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset() 方法重置。所以CyclicBarrier能處理更為復雜的業務場景,比如如果計算發生錯誤,可以重置計數器,並讓線程們重新執行一次。
  • CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數量。isBroken方法用來知道阻塞的線程是否被中斷。

 

Semaphore

      Semaphore翻譯成字面意思為 信號量,Semaphore可以控同時訪問的線程個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。

Semaphore是什么?

  emaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。把它比作是控制流量的紅綠燈,比如一條馬路要限制流量,只允許同時有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會看到綠燈,可以開進這條馬路,后面的車會看到紅燈,不能駛入馬路,但是如果前一百輛中有五輛車已經離開了馬路,那么后面就允許有5輛車駛入馬路,這個例子里說的車就是線程,駛入馬路就表示線程在執行,離開馬路就表示線程執行完成,看見紅燈就表示線程被阻塞,不能執行。

Semaphore類位於java.util.concurrent包下,它提供了2個構造器:

public Semaphore(int permits) {          //參數permits表示許可數目,即同時可以允許多少線程進行訪問
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {    //這個多了一個參數fair表示是否是公平的,即等待時間越久的越先獲取許可
    sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}

下面說一下Semaphore類中比較重要的幾個方法,首先是acquire()、release()方法:

public void acquire() throws InterruptedException {  }     //獲取一個許可
public void acquire(int permits) throws InterruptedException { }    //獲取permits個許可
public void release() { }          //釋放一個許可
public void release(int permits) { }    //釋放permits個許可
  acquire()用來獲取一個許可,若無許可能夠獲得,則會一直等待,直到獲得許可。
  release()用來釋放許可。注意,在釋放許可之前,必須先獲獲得許可。 
    這4個方法都會被阻塞,如果想立即得到執行結果,可以使用下面幾個方法:
public boolean tryAcquire() { };    //嘗試獲取一個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  //嘗試獲取一個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false
public boolean tryAcquire(int permits) { }; //嘗試獲取permits個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //嘗試獲取permits個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false

Semaphore使用例子

  假若一個工廠有5台機器,但是有8個工人,一台機器同時只能被一個工人使用,只有使用完了,其他工人才能繼續使用。那么我們就可以通過Semaphore來實現:

public class Test {
    public static void main(String[] args) {
        int N = 8;            //工人數
        Semaphore semaphore = new Semaphore(5); //機器數目
        for(int i=0;i<N;i++)
            new Worker(i,semaphore).start();
    }
     
    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }
         
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"占用一個機器在生產...");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"釋放出機器");
                semaphore.release();           
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

執行結果:

工人0占用一個機器在生產...
工人1占用一個機器在生產...
工人2占用一個機器在生產...
工人4占用一個機器在生產...
工人5占用一個機器在生產...
工人0釋放出機器
工人2釋放出機器
工人3占用一個機器在生產...
工人7占用一個機器在生產...
工人4釋放出機器
工人5釋放出機器
工人1釋放出機器
工人6占用一個機器在生產...
工人3釋放出機器
工人7釋放出機器
工人6釋放出機器

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM