Java多線程20:多線程下的其他組件之CountDownLatch、Semaphore、Exchanger


前言

在多線程環境下,JDK給開發者提供了許多的組件供用戶使用(主要在java.util.concurrent下),使得用戶不需要再去關心在具體場景下要如何寫出同時兼顧線程安全性與高效率的代碼。之前講過的線程池、BlockingQueue都是在java.util.concurrent下的組件,Timer雖然不在java.util.concurrent下,但也算是。后兩篇文章將以例子的形式簡單講解一些多線程下其他組件的使用,不需要多深刻的理解,知道每個組件大致什么作用就行。

本文主要講解的是CountDownLatch、Semaphore、Exchanger。

 

CountDownLatch

CountDownLatch主要提供的機制是當多個(具體數量等於初始化CountDownLatch時count參數的值)線程都達到了預期狀態或完成預期工作時觸發事件,其他線程可以等待這個事件來觸發自己的后續工作。值得注意的是,CountDownLatch是可以喚醒多個等待的線程的。

到達自己預期狀態的線程會調用CountDownLatch的countDown方法,等待的線程會調用CountDownLatch的await方法。如果CountDownLatch初始化的count值為1,那么這就退化為一個單一事件了,即是由一個線程來通知其他線程,效果等同於對象的wait和notifyAll,count值大於1是常用的方式,目的是為了讓多個線程到達各自的預期狀態,變為一個事件進行通知,線程則繼續自己的行為。

看一個例子:

private static class WorkThread extends Thread
{
    private CountDownLatch cdl;
    private int sleepSecond;
        
    public WorkThread(String name, CountDownLatch cdl, int sleepSecond)
    {
        super(name);
        this.cdl = cdl;
        this.sleepSecond = sleepSecond;
    }
        
    public void run()
    {
        try
        {
            System.out.println(this.getName() + "啟動了,時間為" + System.currentTimeMillis());
            Thread.sleep(sleepSecond * 1000);
            cdl.countDown();
            System.out.println(this.getName() + "執行完了,時間為" + System.currentTimeMillis());
        } 
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}
    
private static class DoneThread extends Thread
{
    private CountDownLatch cdl;
        
    public DoneThread(String name, CountDownLatch cdl)
    {
        super(name);
        this.cdl = cdl;
    }
        
    public void run()
    {
        try
        {
            System.out.println(this.getName() + "要等待了, 時間為" + System.currentTimeMillis());
            cdl.await();
            System.out.println(this.getName() + "等待完了, 時間為" + System.currentTimeMillis());
        } 
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}
    
public static void main(String[] args) throws Exception
{
    CountDownLatch cdl = new CountDownLatch(3);
    DoneThread dt0 = new DoneThread("DoneThread1", cdl);
    DoneThread dt1 = new DoneThread("DoneThread2", cdl);
    dt0.start();
    dt1.start();
    WorkThread wt0 = new WorkThread("WorkThread1", cdl, 2);
    WorkThread wt1 = new WorkThread("WorkThread2", cdl, 3);
    WorkThread wt2 = new WorkThread("WorkThread3", cdl, 4);
    wt0.start();
    wt1.start();
    wt2.start();
}

看一下運行結果:

DoneThread2要等待了, 時間為1444563077434
DoneThread1要等待了, 時間為1444563077434
WorkThread1啟動了,時間為1444563077434
WorkThread3啟動了,時間為1444563077435
WorkThread2啟動了,時間為1444563077435
WorkThread1執行完了,時間為1444563079435
WorkThread2執行完了,時間為1444563080435
WorkThread3執行完了,時間為1444563081435
DoneThread1等待完了, 時間為1444563081435
DoneThread2等待完了, 時間為1444563081435

效果十分明顯,解釋一下:

1、啟動2個線程DoneThread線程等待3個WorkThread全部執行完

2、3個WorkThread全部執行完,最后執行完的WorkThread3執行了秒符合預期

3、后三句從時間上看幾乎同時出現,說明CountDownLatch設置為3,WorkThread3執行完,兩個wait的線程馬上就執行后面的代碼了

這相當於是一種進化版本的等待/通知機制,它可以的實現的是多個工作線程完成任務后通知多個等待線程開始工作,之前的都是一個工作線程完成任務通知一個等待線程或者一個工作線程完成任務通知所有等待線程。

CountDownLatch其實是很有用的,特別適合這種將一個問題分割成N個部分的場景,所有子部分完成后,通知別的一個/幾個線程開始工作。比如我要統計C、D、E、F盤的文件,可以開4個線程,分別統計C、D、E、F盤的文件,統計完成把文件信息匯總到另一個/幾個線程中進行處理

 

Semaphore

Semaphore是非常有用的一個組件,它相當於是一個並發控制器,是用於管理信號量的。構造的時候傳入可供管理的信號量的數值,這個數值就是控制並發數量的,我們需要控制並發的代碼,執行前先通過acquire方法獲取信號,執行后通過release歸還信號 。每次acquire返回成功后,Semaphore可用的信號量就會減少一個,如果沒有可用的信號,acquire調用就會阻塞,等待有release調用釋放信號后,acquire才會得到信號並返回。

Semaphore分為單值和多值兩種:

1、單值的Semaphore管理的信號量只有1個,該信號量只能被1個,只能被一個線程所獲得,意味着並發的代碼只能被一個線程運行,這就相當於是一個互斥鎖了

2、多值的Semaphore管理的信號量多余1個,主要用於控制並發數

看一下代碼例子:

public static void main(String[] args)
{
    final Semaphore semaphore = new Semaphore(5);
        
    Runnable runnable = new Runnable()
    {
        public void run()
        {
            try
            {
               semaphore.acquire();                    
         System.out.println(Thread.currentThread().getName()
+ "獲得了信號量,時間為" + System.currentTimeMillis()); Thread.sleep(2000);          System.out.println(Thread.currentThread().getName() + "釋放了信號量,時間為" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } }; Thread[] threads = new Thread[10]; for (int i = 0; i < threads.length; i++) threads[i] = new Thread(runnable); for (int i = 0; i < threads.length; i++) threads[i].start(); }

看一下運行結果:

 1 Thread-1獲得了信號量,時間為1444557040464
 2 Thread-2獲得了信號量,時間為1444557040465
 3 Thread-0獲得了信號量,時間為1444557040464
 4 Thread-3獲得了信號量,時間為1444557040465
 5 Thread-4獲得了信號量,時間為1444557040465
 6 Thread-2釋放了信號量,時間為1444557042466
 7 Thread-4釋放了信號量,時間為1444557042466
 8 Thread-0釋放了信號量,時間為1444557042466
 9 Thread-1釋放了信號量,時間為1444557042466
10 Thread-3釋放了信號量,時間為1444557042466
11 Thread-9獲得了信號量,時間為1444557042467
12 Thread-7獲得了信號量,時間為1444557042466
13 Thread-6獲得了信號量,時間為1444557042466
14 Thread-5獲得了信號量,時間為1444557042466
15 Thread-8獲得了信號量,時間為1444557042467
16 Thread-9釋放了信號量,時間為1444557044467
17 Thread-6釋放了信號量,時間為1444557044467
18 Thread-7釋放了信號量,時間為1444557044467
19 Thread-5釋放了信號量,時間為1444557044468
20 Thread-8釋放了信號量,時間為1444557044468

前10行為一部分,運行的線程是1 2 0 3 4,看到時間差也都是代碼約定的2秒;后10行為一部分,運行的線程是9 7 6 5 8,時間差也都是約定的2秒,這就體現出了Semaphore的作用了。

這種通過Semaphore控制並發並發數的方式和通過控制線程數來控制並發數的方式相比,粒度更小,因為Semaphore可以通過acquire方法和release方法來控制代碼塊的並發數。

最后注意兩點:

1、Semaphore可以指定公平鎖還是非公平鎖

2、acquire方法和release方法是可以有參數的,表示獲取/返還的信號量個數

 

Exchanger

Exchanger,從名字上理解就是交換。Exchanger用於在兩個線程之間進行數據交換,注意也只能在兩個線程之間進行數據交換。線程會阻塞在Exchanger的exchange方法上,直到另外一個線程也到了同一個Exchanger的exchange方法時,二者進行數據交換,然后兩個線程繼續執行自身相關的代碼。

Exchanger只有一個exchange方法,用於交換數據。看一下例子:

public static class ExchangerThread extends Thread
{
    private String str;
    private Exchanger<String> exchanger;
    private int sleepSecond;
    
    public ExchangerThread(String str, Exchanger<String> exchanger, int sleepSecond)
    {
        this.str = str;
        this.exchanger = exchanger;
        this.sleepSecond = sleepSecond;
    }
        
    public void run()
    {
        try
        {
            System.out.println(this.getName() + "啟動, 原數據為" + str + ", 時間為" + System.currentTimeMillis());
            Thread.sleep(sleepSecond * 1000);
            str = exchanger.exchange(str);
            System.out.println(this.getName() + "交換了數據, 交換后的數據為" + str + ", 時間為" + System.currentTimeMillis());
        } 
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}
    
public static void main(String[] args)
{
    Exchanger<String> exchanger = new Exchanger<String>();
    ExchangerThread et0 = new ExchangerThread("111", exchanger, 3);
    ExchangerThread et1 = new ExchangerThread("222", exchanger, 2);
    et0.start();
    et1.start();
}

看一下運行結果:

Thread-0啟動, 原數據為111, 時間為1444560972303
Thread-1啟動, 原數據為222, 時間為1444560972303
Thread-0交換了數據, 交換后的數據為222, 時間為1444560975303
Thread-1交換了數據, 交換后的數據為111, 時間為1444560975303

看到兩個線程交換了數據,由於一個線程睡2秒、一個線程睡3秒,既然要交換數據,肯定是睡2秒的要等待睡3秒的,所以看到時間差是3000ms即3s。

從這個例子看來,Exchanger有點像之前Java多線程15:Queue、BlockingQueue以及利用BlockingQueue實現生產者/消費者模型一文中的SynchronousQueue的雙向形式,它可能在遺傳算法和管道設計中很有用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM