java並發之鎖的使用以及原理淺析


        鎖像synchronized同步塊一樣,是一種線程同步機制。讓自Java 5開始,java.util.concurrent.locks包提供了另一種方式實現線程同步機制——Lock。那么問題來了既然都可以通過synchronized來實現同步訪問了,那么為什么還需要提供Lock呢?這個問題我們下面討論java.util.concurrent.locks包中包含了一些鎖的實現,所以我們不需要重復造輪子了。但是我們仍然需要去了解怎樣使用這些鎖,且了解這些實現背后的理論也是很有用處的。

本文將從下面幾個方面介紹

  • 鎖的相關概念
  • java.util.concurrent.locks下常用的幾種鎖

鎖的相關概念

    在學習或者使用Java的過程中進程會遇到各種各樣的鎖的概念:公平鎖、非公平鎖、自旋鎖、可重入鎖、偏向鎖、輕量級鎖、重量級鎖、讀寫鎖、互斥鎖等待。下邊總結了對各種鎖的解釋

公平鎖/非公平鎖

    公平鎖是指多個線程在等待同一個鎖時按照申請鎖的先后順序來獲取鎖。相反的非公平鎖是指多個線程獲取鎖的順序並不是按照申請鎖的順序,有可能后申請的線程比先申請的線程優先獲取鎖。

     公平鎖的好處是等待鎖的線程不會餓死,但是整體效率相對低一些;非公平鎖的好處是整體效率相對高一些,但是有些線程可能會餓死或者說很早就在等待鎖,但要等很久才會獲得鎖。其中的原因是公平鎖是嚴格按照請求所的順序來排隊獲得鎖的,而非公平鎖時可以搶占的,即如果在某個時刻有線程需要獲取鎖,而這個時候剛好鎖可用,那么這個線程會直接搶占,而這時阻塞在等待隊列的線程則不會被喚醒。

        對於Java ReentrantLock而言,通過構造函數指定該鎖是否是公平鎖,默認是非公平鎖。例:new ReentrantLock(true)是公平鎖
對於Synchronized而言,也是一種非公平鎖。由於其並不像ReentrantLock是通過AQS的來實現線程調度,所以並沒有任何辦法使其變成公平鎖。

可重入鎖

    也叫遞歸鎖,是指在外層函數獲得鎖之后,內層遞歸函數仍然可以獲取到該鎖。即線程可以進入任何一個它已經擁有鎖的代碼塊。在JAVA環境下 ReentrantLock 和synchronized 都是可重入鎖。可重入鎖最大的作用是避免死鎖。

   具體區別下文闡述。

自旋鎖

    在Java中,自旋鎖是指嘗試獲取鎖的線程不會立即阻塞,而是采用循環的方式去嘗試獲取鎖,這樣的好處是減少線程上下文切換的消耗,缺點是循環會消耗CPU。

      JDK6中已經變為默認開啟自旋鎖,並且引入了自適應的自旋鎖。自適應意味着自旋的時間不在固定了,而是由前一次在同一個鎖上的自旋時間及鎖的擁有者的狀態來決定。自旋是在輕量級鎖中使用的,在重量級鎖中,線程不使用自旋。

偏向鎖、輕量級鎖和重量級鎖

      這三種鎖是指鎖的狀態,並且是針對Synchronized在Java 5后通過引入鎖升級的機制來實現高效Synchronized。這三種鎖的狀態是通過對象監視器在對象頭中的字段來表明的。如下圖

       

                                這里的無鎖和偏向鎖在對象頭的倒數第三bit中分別采用0和1標記

  • 偏向鎖是JDK6中引入的一項鎖優化,它的目的是消除數據在無競爭情況下的同步原語,進一步提高程序的運行性能。偏向鎖會偏向於第一個獲得它的線程,如果在接下來的執行過程中,該鎖沒有被其他的線程獲取,則持有偏向鎖的線程將永遠不需要同步。但是對於鎖競爭激勵的場合,我其效果不佳。最壞的情況下就是每次都是不同的線程來請求相同的鎖,這樣偏向模式就會失效。
  • 輕量級鎖是指當鎖是偏向鎖的時候,被另一個線程所訪問,偏向鎖就會升級為輕量級鎖,其他線程會通過自旋的形式嘗試獲取鎖,不會阻塞,提高性能。
  • 重量級鎖是指當鎖為輕量級鎖的時候,另一個線程雖然是自旋,但自旋不會一直持續下去,當自旋一定次數的時候,還沒有獲取到鎖,就會進入阻塞,該鎖膨脹為重量級鎖。重量級鎖會讓其他申請的線程進入阻塞,性能降低。

悲觀鎖和樂觀鎖

      樂觀鎖與悲觀鎖不是指具體的什么類型的鎖,而是指看待並發同步的角度

  • 樂觀鎖認為對於同一個數據的並發操作,是不會發生修改的。在更新數據的時候,會采用嘗試更新,不斷重新的方式更新數據。樂觀的認為,不加鎖的並發操作是沒有事情的。即假定不會發生並發沖突,只在提交操作時檢測是否違反數據完整性。(使用版本號或者時間戳來配合實現)。在java中就是 是無鎖編程,常常采用的是CAS算法,典型的例子就是原子類,通過CAS自旋實現原子操作的更新。
  • 悲觀鎖認為對於同一個數據的並發操作,一定是會發生修改的,哪怕沒有修改,也會認為修改。因此對於同一個數據的並發操作,悲觀鎖采取加鎖的形式。悲觀的認為,不加鎖的並發操作一定會出問題。即假定會發生並發沖突,屏蔽一切可能違反數據完整性的操作。在java中就是各種鎖編程。
  • 從上面的描述我們可以看出,悲觀鎖適合寫操作非常多的場景,樂觀鎖適合讀操作非常多的場景,不加鎖會帶來大量的性能提升。

共享鎖和獨占鎖

  • 共享鎖:如果事務T對數據A加上共享鎖后,則其他事務只能對A再加共享鎖,不能加排它鎖。獲准共享鎖的事務只能讀數據,不能修改數據。
  • 獨占鎖:如果事務T對數據A加上獨占鎖后,則其他事務不能再對A加任何類型的鎖。獲得獨占鎖的事務即能讀數據又能修改數據。如Synchronized

互斥鎖和讀寫鎖

  獨占鎖/共享鎖就是一種廣義的說法,互斥鎖/讀寫鎖就是具體的實現。

  •  互斥鎖:就是指一次最多只能有一個線程持有的鎖。在JDK中synchronized和JUC的Lock就是互斥鎖。
  •  讀寫鎖:讀寫鎖是一個資源能夠被多個讀線程訪問,或者被一個寫線程訪問但不能同時存在讀線程。Java當中的讀寫鎖通過ReentrantReadWriteLock實現。ReentrantReadWriteLock運行一個資源可以被多個讀操作訪問,或者一個寫操作訪問,但兩者不能同時進行。

java.util.concurrent.locks下常用的幾種鎖

ReentrantLock

   ReentrantLock,可重入鎖,是一種遞歸無阻塞的同步機制。它可以等同於synchronized的使用,但是ReentrantLock提供了比synchronized更強大、靈活的鎖機制,可以減少死鎖發生的概率。

   ReentrantLock還提供了公平鎖和非公平鎖的選擇,構造方法接受一個可選的公平參數(默認非公平鎖),當設置為true時,表示公平鎖,否則為非公平鎖。

   獲取鎖

    一般使用如下方式獲取鎖

ReentrantLock lock = new ReentrantLock();
lock.lock();

 lock方法:

   public void lock() {
        sync.lock();
    }

      Sync為Sync為ReentrantLock里面的一個內部類,它繼承AQS。關於AQS的相關知識可以自行補充一下。Sync有兩個子類分別是FairSync(公平鎖)和 NofairSync(非公平鎖)。默認使用NofairSync,下面是ReentrantLock的構造類

public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    下邊是一個簡單的重入鎖使用案例

 1 public class ReentrantLockDemo implements Runnable {
 2     public static final Lock lock = new ReentrantLock();
 3     public static int i = 0;
 4 
 5     @Override
 6     public void run() {
 7         for (int j = 0; j < 1000000; j++) {
 8             lock.lock();
 9             try {
10                 i++;
11             } finally {
12                 lock.unlock();
13             }
14         }
15     }
16 
17     public static void main(String[] args) throws InterruptedException {
18         ReentrantLockDemo demo = new ReentrantLockDemo();
19         Thread t1 = new Thread(demo);
20         Thread t2 = new Thread(demo);
21         t1.start();
22         t2.start();
23         t1.join();
24         t2.join();
25         System.out.println(i);
26     }
27 }

       上述代碼的第8~12行,使用了重入鎖保護了臨界區資源i,確保了多線程對i的操作。輸出結果為2000000。可以看到與synchronized相比,重入鎖必選手動指定在什么地方加鎖,什么地方釋放鎖,所以更加靈活。

要注意是,再退出臨界區的時候,需要釋放鎖,否則其他線程就無法訪問臨界區了。這里為啥叫可重入鎖是因為這種鎖是可以被同一個線程反復進入的。比如上述代碼的使用鎖部分可以寫成這樣

           lock.lock();
            lock.lock();
            try {
                i++;
            } finally {
                lock.unlock();
                lock.unlock();
            }

        在這種情況下,一個線程聯連續兩次獲取同一把鎖,這是允許的。但是需要注意的是,如果同一個線程多次獲的鎖,那么在釋放是也要釋放相同次數的鎖。如果釋放的鎖少了,相當於該線程依然持有這個鎖,那么其他線程就無法訪問臨界區了。釋放的次數多了也會拋出java.lang.IllegalMonitorStateException異常。

      除了使用上的靈活,ReentrantLock還提供了一些高級功能如中斷。限時等待等。

     中斷響應

     對用synchrozide來說,如果一個線程在等待,那么結果只有兩種情況,要么獲得這把鎖繼續執行下去要么一直等待下去。而使用重入鎖,提供了另外一種可能,那就是線程可以被中斷。也就是說在這里可以取消對鎖的請求。這種情況對解決死鎖是有一定幫組的。

     下面代碼產生了一個死鎖,但是我們可以通過鎖的中斷,解決這個死鎖。

public class ReentrantLockDemo implements Runnable {
    //重入鎖ReentrantLock
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;
    public ReentrantLockDemo(int lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            if (lock == 1) {
                lock1.lockInterruptibly();
                Thread.sleep(500);
                lock2.lockInterruptibly();
                System.out.println("this is thread 1");
            } else {
                lock2.lockInterruptibly();
                Thread.sleep(500);
                lock1.lockInterruptibly();
                System.out.println("this is thread 2");
            }
        } catch (Exception e) {
            //e.printStackTrace();
        } finally {
            if (lock1.isHeldByCurrentThread()) {
                lock1.unlock();//釋放鎖
            }
            if (lock2.isHeldByCurrentThread()) {
                lock2.unlock();
            }
            System.out.println(Thread.currentThread().getId() + ":線程退出");
        }

    }

    public static void main(String[] args) throws InterruptedException {
        ReentrantLockDemo r1 = new ReentrantLockDemo(1);
        ReentrantLockDemo r2 = new ReentrantLockDemo(2);
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
        Thread.sleep(1000);
        //t2線程被中斷,放棄鎖申請,釋放已獲得的lock2,這個操作使得t1線程順利獲得lock2繼續執行下去;
        //若沒有此段代碼,t2線程沒有中斷,那么會出現t1獲取lock1,請求lock2,而t2獲取lock2,請求lock1的相互等待死鎖情況
        t2.interrupt();
    }
}

        線程t1和t2啟動后,t1先占用lock1然后在請求lock2;t2先占用lock2,然后請求lock1,因此很容易形成線程之間的相互等待。着這里使用的是ReenTrantLock提供了一種能夠中斷等待鎖的線程的機制,通過lock.lockInterruptibly()來實現這個機制。

      最后由於t2線程被中斷,t2會放棄對lock1的1請求,同時釋放lock2。這樣可以使t1繼續執行下去,結果如下圖

   

   鎖申請等待限時

   除了等待通知以外,避免死鎖還有另外一種方式,那就是限時等待。通過給定一個等待時間,讓線程自動放棄。

public class TimeLockDemo implements Runnable {
    private static ReentrantLock reentrantLock = new ReentrantLock();

    @Override
    public void run() {

        try {
            if (reentrantLock.tryLock(5, TimeUnit.SECONDS)) {
                Thread.sleep(6000);
            } else {
                System.out.println("Gets lock failed");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (reentrantLock.isHeldByCurrentThread()){
                reentrantLock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        TimeLockDemo demo1 = new TimeLockDemo();
        TimeLockDemo demo2 = new TimeLockDemo();
        Thread t1 = new Thread(demo1);
        Thread t2 = new Thread(demo2);
        t1.start();
        t2.start();
    }
}

        tryLock有兩個參數,一個表示等待時長,另一個表示計時單位。在這里就是通過lock.tryLock(5,TimeUnit.SECONDS)來設置鎖申請等待限時,此例就是限時等待5秒獲取鎖。在這里的鎖請求最多為5秒,如果超過5秒未獲得鎖請求,則會返回fasle,如果成功獲得鎖就會返回true。此案例中第一個線程會持有鎖長達6秒,所以另外一個線程無法在5秒內獲得鎖 故案例輸出結果為Gets lock failed

        另外tryLock方法也可以不帶參數之直接運行,在這種情況下,當前線程會嘗試獲得鎖,如果鎖並未被其他線程占用,則申請鎖直接成功,立即返回true,否則當前線程不會進行等待,而是立即返回false。這種模式不會引起線程等待,因此也不會產生死鎖。

      下邊展示了這種使用方式   

public class ReentrantLockDemo implements Runnable {
    //重入鎖ReentrantLock
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();

    int lock;
    public ReentrantLockDemo(int lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            if (lock == 1) {
                while (true) {
                    if (lock1.tryLock()) {
                        try {
                            Thread.sleep(1000);
                        } finally {
                            lock1.unlock();
                        }
                    }

                    if (lock2.tryLock()) {
                        try {
                            System.out.println("thread " + Thread.currentThread().getId() + " 執行完畢");

                            return;
                        } finally {
                            lock2.unlock();
                        }
                    }
                }

            } else {
                while (true) {
                    if (lock2.tryLock()) {
                        try {
                            Thread.sleep(1000);
                        } finally {
                            lock2.unlock();
                        }
                    }

                    if (lock1.tryLock()) {
                        try {
                            System.out.println("thread " + Thread.currentThread().getId() + " 執行完畢");
                            return;
                        } finally {
                            lock1.unlock();
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReentrantLockDemo r1 = new ReentrantLockDemo(1);
        ReentrantLockDemo r2 = new ReentrantLockDemo(2);
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
    }
}
View Code

      使用了tryLock后,線程不會傻傻的等待,而是不同的嘗試獲取鎖,因此,只要執行足夠長的時間,線程總是會獲得所有需要的資源。從而正常執行。下邊展示了運行結果。表示兩個線程運行都正常。

      

  在大多數情況下。鎖的申請都是非公平的。也就是說系統只是會從等待鎖的隊列里隨機挑選一個,所以不能保證其公平性。但是公平鎖的實現成本很高,性能也相對低下。因此如果沒有特別要求,也不需要使用公平鎖。

    對上邊ReentrantLock幾個重要的方法整理如下。

  • lock():獲得鎖,如果鎖已經被占用,則等待。
  • lockInterruptibly(): 獲得鎖,但優先響應中斷。
  • tryLock():嘗試獲得鎖,如果成功,返回true,失敗返回false。該方法不等待,立即返回
  • tryLock(long time,TimeUnit unit),在給定時間內嘗試獲得鎖
  • unlock(): 釋放鎖。注:ReentrantLock的鎖釋放一定要在finally中處理,否則可能會產生嚴重的后果。

Condition條件

     Conditon和ReentrantLock的組合可以讓線程在合適的時間等待,或者在某一個特定的時間得到通知,繼續執行。在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統線程的通信方式,Condition都可以實現,這里注意,Condition是被綁定到Lock上的,要創建一個Lock的Condition必須用newCondition()方法。

  • await:當前線程進入等待狀態,直到被通知(signal OR signalAll)或者被中斷時,當前線程進入運行狀態,從await()返回;
  • awaitUninterruptibly:當前線程進入等待狀態,直到被通知,對中斷不做響應;
  • awaitNanos(long nanosTimeout):在await()的返回條件基礎上增加了超時響應,返回值表示當前剩余的時間,如果在nanosTimeout之前被喚醒,返回值 = nanosTimeout - 實際消耗的時間,返回值 <= 0表示超時;
  • boolean await(long time, TimeUnit unit):同樣是在await()的返回條件基礎上增加了超時響應,與上一接口不同的是可以自定義超時時間單位; 返回值返回true/false,在time之前被喚醒,返回true,超時返回false。
  • boolean awaitUntil(Date deadline):當前線程進入等待狀態直到將來的指定時間被通知,如果沒有到指定時間被通知返回true,否則,到達指定時間,返回false;
  • signal():喚醒一個等待在Condition上的線程
  • signalAll():喚醒等待在Condition上所有的線程

     使用案例如下

public class ConditionDemo {
    static class NumberWrapper {
        public int value = 1;
    }

    public static void main(String[] args) {
        //初始化可重入鎖
        final Lock lock = new ReentrantLock();

        //第一個條件當屏幕上輸出到3
        final Condition reachThreeCondition = lock.newCondition();
        //第二個條件當屏幕上輸出到6
        final Condition reachSixCondition = lock.newCondition();

        //NumberWrapper只是為了封裝一個數字,一邊可以將數字對象共享,並可以設置為final
        //注意這里不要用Integer, Integer 是不可變對象
        final NumberWrapper num = new NumberWrapper();
        //初始化A線程
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                //需要先獲得鎖
                lock.lock();
                try {
                    System.out.println("threadA start write");
                    //A線程先輸出前3個數
                    while (num.value <= 3) {
                        System.out.println(num.value);
                        num.value++;
                    }
                    //輸出到3時要signal,告訴B線程可以開始了
                    reachThreeCondition.signal();
                } finally {
                    lock.unlock();
                }
                lock.lock();
                try {
                    //等待輸出6的條件
                    while(num.value <= 6) {
                        reachSixCondition.await();
                    }
                    System.out.println("threadA start write");
                    //輸出剩余數字
                    while (num.value <= 9) {
                        System.out.println(num.value);
                        num.value++;
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        });

        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock.lock();

                    while (num.value <= 3) {
                        //等待3輸出完畢的信號
                        reachThreeCondition.await();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
                try {
                    lock.lock();
                    //已經收到信號,開始輸出4,5,6
                    System.out.println("threadB start write");
                    while (num.value <= 6) {
                        System.out.println(num.value);
                        num.value++;
                    }
                    //4,5,6輸出完畢,告訴A線程6輸出完了
                    reachSixCondition.signal();
                } finally {
                    lock.unlock();
                }
            }
        });

        //啟動兩個線程
        threadB.start();
        threadA.start();
    }
}
View Code

 

   結果如下

   這樣看來,Condition和傳統的線程通信沒什么區別,Condition的強大之處在於它可以為多個線程間建立不同的Condition,下面引入API中的一段代碼,加以說明。

class BoundedBuffer {
   final Lock lock = new ReentrantLock();//鎖對象
   final Condition notFull  = lock.newCondition();//寫線程條件 
   final Condition notEmpty = lock.newCondition();//讀線程條件 

   final Object[] items = new Object[100];//緩存隊列
   int putptr/*寫索引*/, takeptr/*讀索引*/, count/*隊列中存在的數據個數*/;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)//如果隊列滿了 
         notFull.await();//阻塞寫線程
       items[putptr] = x;//賦值 
       if (++putptr == items.length) putptr = 0;//如果寫索引寫到隊列的最后一個位置了,那么置為0
       ++count;//個數++
       notEmpty.signal();//喚醒讀線程
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)//如果隊列為空
         notEmpty.await();//阻塞讀線程
       Object x = items[takeptr];//取值 
       if (++takeptr == items.length) takeptr = 0;//如果讀索引讀到隊列的最后一個位置了,那么置為0
       --count;//個數--
       notFull.signal();//喚醒寫線程
       return x;
     } finally {
       lock.unlock();
     }
   } 
 }

      這個示例中BoundedBuffer是一個固定長度的集合,這個在其put操作時,如果發現長度已經達到最大長度,那么要等待notFull信號才能繼續put,如果得到notFull信號會像集合中添加元素,並且put操作會發出notEmpty的信號,而在其take方法中如果發現集合長度為空,那么會等待notEmpty的信號,接受到notEmpty信號才能繼續take,同時如果拿到一個元素,那么會發出notFull的信號。

     信號量(Semaphore)

       信號量(Semaphore)為多線程協作提供了更為強大的控制用法。無論是內部鎖Synchronized還是ReentrantLock,一次都只允許一個線程訪問資源,而信號量可以多個線程訪問同一資源。Semaphore是用來保護一個或者多個共享資源的訪問,Semaphore內部維護了一個計數器,其值為可以訪問的共享資源的個數。一個線程要訪問共享資源,先獲得信號量,如果信號量的計數器值大於1,意味着有共享資源可以訪問,則使其計數器值減去1,再訪問共享資源。如果計數器值為0,線程進入休眠。當某個線程使用完共享資源后,釋放信號量,並將信號量內部的計數器加1,之前進入休眠的線程將被喚醒並再次試圖獲得信號量。

     信號量的UML的類圖如下,可以看出和ReentrantLock一樣,Semaphore也包含了sync對象,sync是Sync類型;而且,Sync是一個繼承於AQS的抽象類。Sync包括兩個子類:"公平信號量"FairSync 和 "非公平信號量"NonfairSync。sync是"FairSync的實例",或者"NonfairSync的實例";默認情況下,sync是NonfairSync(即,默認是非公平信號量)

 

       信號量主要提供了以下構造函數

Semaphore(int num)
Semaphore(int num,boolean how)

       這里,num指定初始許可計數。因此,它指定了一次可以訪問共享資源的線程數。如果是1,則任何時候只有一個線程可以訪問該資源。默認情況下,所有等待的線程都以未定義的順序被授予許可。通過設置how為true,可以確保等待線程按其請求訪問的順序被授予許可。信號量的主要邏輯方法如下

// 從此信號量獲取一個許可,在提供一個許可前一直將線程阻塞,否則線程被中斷。
void acquire()
// 從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被中斷。
void acquire(int permits)
// 從此信號量中獲取許可,在有可用的許可前將其阻塞。
void acquireUninterruptibly()
// 從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。
void acquireUninterruptibly(int permits)
// 返回此信號量中當前可用的許可數。
// 釋放一個許可,將其返回給信號量。
void release()
// 釋放給定數目的許可,將其返回到信號量。

// 僅在調用時此信號量存在一個可用許可,才從信號量獲取許可。
boolean tryAcquire()
// 僅在調用時此信號量中有給定數目的許可時,才從此信號量中獲取這些許可。
boolean tryAcquire(int permits)
// 如果在給定的等待時間內此信號量有可用的所有許可,並且當前線程未被中斷,則從此信號量獲取給定數目的許可。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
// 如果在給定的等待時間內,此信號量有可用的許可並且當前線程未被中斷,則從此信號量獲取一個許可。

      實例如下:這里我們模擬10個人去銀行存款,但是該銀行只有兩個辦公櫃台,有空位則上去存錢,沒有空位則只能去排隊等待。最后輸出銀行總額

public class SemaphoreThread {
    private int customer;

    public SemaphoreThread() {
        customer = 0;
    }

    /**
     * 銀行存錢類
     */
    class Bank {
        private int account = 100;

        public int getAccount() {
            return account;
        }

        public void save(int money) {
            account += money;
        }
    }

    /**
     * 線程執行類,每次存10塊錢
     */
    class NewThread implements Runnable {
        private Bank bank;
        private Semaphore semaphore;

        public NewThread(Bank bank, Semaphore semaphore) {
            this.bank = bank;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            int tempCustomer = customer++;
            if (semaphore.availablePermits() > 0) {
                System.out.println("客戶" + tempCustomer + "啟動,進入銀行,有位置立即去存錢");
            } else {
                System.out.println("客戶" + tempCustomer + "啟動,進入銀行,無位置,去排隊等待等待");
            }
            try {
                semaphore.acquire();
                bank.save(10);
                System.out.println(tempCustomer + "銀行余額為:" + bank.getAccount());
                Thread.sleep(1000);
                System.out.println("客戶" + tempCustomer + "存錢完畢,離開銀行");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }

    /**
     * 建立線程,調用內部類,開始存錢
     */
    public void useThread() {
        Bank bank = new Bank();
        // 定義2個新號量
        Semaphore semaphore = new Semaphore(2);
        // 建立一個緩存線程池
        ExecutorService es = Executors.newCachedThreadPool();
        // 建立10個線程
        for (int i = 0; i < 10; i++) {
            // 執行一個線程
            es.submit(new Thread(new NewThread(bank, semaphore)));
        }
        // 關閉線程池
        es.shutdown();

        // 從信號量中獲取兩個許可,並且在獲得許可之前,一直將線程阻塞
        semaphore.acquireUninterruptibly(2);
        System.out.println("到點了,工作人員要吃飯了");
        // 釋放兩個許可,並將其返回給信號量
        semaphore.release(2);
    }

    public static void main(String[] args) {
        SemaphoreThread test = new SemaphoreThread();
        test.useThread();
    }
}
View Code

讀寫鎖ReentrantReadWriteLock

     ReentrantReadWriteLock是Lock的另一種實現方式,我們已經知道了ReentrantLock是一個排他鎖,同一時間只允許一個線程訪問,而ReentrantReadWriteLock允許多個讀線程同時訪問(也就是讀操作),但不允許寫線程和讀線程、寫線程和寫線程同時訪問。約束如下

  • 讀—讀不互斥:讀與讀之間不阻塞 
  • 讀—寫:讀阻塞寫,寫也會阻塞讀
  • 寫—寫:寫寫阻塞      

      相對於排他鎖,提高了並發性。在實際應用中,大部分情況下對共享數據(如緩存)的訪問都是讀操作遠多於寫操作,這時ReentrantReadWriteLock能夠提供比排他鎖更好的並發性和吞吐量。  

      看一下官方案例     

lass CachedData {
  Object data;
  volatile boolean cacheValid;
  final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

  public void processCachedData() {
    rwl.readLock().lock();//1
    if (!cacheValid) {
      // Must release read lock before acquiring write lock
      rwl.readLock().unlock();//2
      rwl.writeLock().lock();//3
      try {
        // Recheck state because another thread might have,acquired write lock and changed state before we did.
        if (!cacheValid) {
          data = ...
          cacheValid = true;
        }
        // 在釋放寫鎖之前通過獲取讀鎖降級寫鎖(注意此時還沒有釋放寫鎖)
        rwl.readLock().lock();//4
      } finally {
        // 釋放寫鎖而此時已經持有讀鎖
        rwl.writeLock().unlock();//5
      }
    }

    try {
      use(data);
    } finally {
      rwl.readLock().unlock();//6
    }
  }
}
View Code
  1.  多個線程同時訪問該緩存對象時,都加上當前對象的讀鎖,之后其中某個線程優先查看data數據是否為空。【加鎖順序序號:1 】
  2.  當前查看的線程,如果發現沒有值則釋放讀鎖,然后立即加上寫鎖,准備寫入緩存數據。(進入寫鎖的前提是當前沒有其他線程的讀鎖或者寫鎖)【加鎖順序序號:2和3 】
  3. 為什么還會再次判斷是否為空值(!cacheValid)是因為第二個、第三個線程獲得讀的權利時也是需要判斷是否為空,否則會重復寫入數據。 
  4. 寫入數據后先進行讀鎖的降級后再釋放寫鎖。【加鎖順序序號:4和5】
  5. 最后數據數據返回前釋放最終的讀鎖。【加鎖順序序號:6 】

  如果不使用鎖降級功能,如先釋放寫鎖,然后獲得讀鎖,在這個get過程中,可能會有其他線程競爭到寫鎖 或者是更新數據 則獲得的數據是其他線程更新的數據,可能會造成數據的污染,即產生臟讀的問題    

 1 public class ReadAndWriteLock {
 2     private static ReentrantLock lock = new ReentrantLock();
 3     private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
 4     private static Lock readLock = readWriteLock.readLock();
 5     private static Lock writeLock = readWriteLock.writeLock();
 6 
 7     public ReadAndWriteLock setValue(int value) {
 8         this.value = value;
 9         return this;
10     }
11 
12     private int value;
13 
14     public Object handleRead(Lock lock) throws InterruptedException {
15         try {
16             //模擬讀操作
17             lock.lock();
18             System.out.println("thread:" + Thread.currentThread().getId() + " value:" + value);
19             Thread.sleep(1000);
20             return value;
21         } finally {
22             lock.unlock();
23         }
24     }
25 
26     public Object handleWrite(Lock lock, int index) throws InterruptedException {
27         try {
28             //模擬寫操作
29             lock.lock();
30             value = index;
31             Thread.sleep(1000);
32             System.out.println("thread:" + Thread.currentThread().getId() + " value:" + value);
33             return value;
34 
35         } finally {
36             lock.unlock();
37         }
38     }
39 
40     public static void main(String[] args) throws InterruptedException {
41         final ReadAndWriteLock demo = new ReadAndWriteLock();
42         demo.setValue(0);
43         Runnable readRunnable = new Runnable() {
44             @Override
45             public void run() {
46                 try {
47                      //讀鎖
48                     demo.handleRead(readLock);
49                      //可重入鎖
50                     //demo.handleRead(lock);
51 
52                 } catch (InterruptedException e) {
53                     e.printStackTrace();
54                 }
55 
56             }
57         };
58 
59         Runnable writeRunnable = new Runnable() {
60             @Override
61             public void run() {
62                 try {
63                      //寫鎖
64                     demo.handleWrite(readLock, (int) (Math.random() * 1000));
65                     //可重入鎖
66                     //demo.handleWrite(lock, (int) (Math.random() * 1000));
67                 } catch (InterruptedException e) {
68                     e.printStackTrace();
69                 }
70 
71             }
72         };
73         ExecutorService exec = new ThreadPoolExecutor(0, 200,
74                 0, TimeUnit.SECONDS,
75                 new SynchronousQueue<Runnable>());
76         ;
77         long startTime = System.currentTimeMillis();
78 
79         for (int i = 0; i < 18; i++) {
80             exec.execute(readRunnable);
81         }
82 
83         for (int i = 0; i < 18; i++) {
84             exec.execute(writeRunnable);
85         }
86         exec.shutdown();
87         exec.awaitTermination(60, TimeUnit.MINUTES);
88         long endTime = System.currentTimeMillis(); //獲取結束時間
89         System.out.println("程序運行時間: " + (endTime - startTime) + "ms");
90 
91     }
92 }
View Code

           在這里讀線程完全並行,而寫會阻塞讀。程序執行時間如下

         

        將上述案例中的讀寫鎖改成可重入鎖,即將第行代碼注釋掉那么所有的讀和寫線程都必須相互等待,程序執行時間如下所示     

        

倒計時器:CountDownLatch

       CountDownLatch是java1.5版本之后util.concurrent提供的工具類。這里簡單介紹一下CountDownLatch,可以將其看成是一個計數器,await()方法可以阻塞至超時或者計數器減至0,其他線程當完成自己目標的時候可以減少1,利用這個機制我們可以將其用來做並發。 比如有一個任務A,它要等待其他4個任務執行完畢之后才能執行,此時就可以利用CountDownLatch來實現這種功能了。      

      CountDownLatch類只提供了一個構造器,該構造器接受一個整數作為參數,即當前這個計數器的計數個數 。

public CountDownLatch(int count) {  };  //參數count為計數值

     使用場景:比如對於馬拉松比賽,進行排名計算,參賽者的排名,肯定是跑完比賽之后,進行計算得出的,翻譯成Java識別的預發,就是N個線程執行操作,主線程等到N個子線程執行完畢之后,在繼續往下執行。

public class CountDownLatchTest {
    public static void main(String[] args){

        int threadCount = 10;

        final CountDownLatch latch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {

            new Thread(new Runnable() {

                @Override
                public void run() {
                    System.out.println("線程" + Thread.currentThread().getId() + "開始出發");

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println("線程" + Thread.currentThread().getId() + "已到達終點");

                    latch.countDown();
                }
            }).start();
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("10個線程已經執行完畢!開始計算排名");
    }
}
View Code

     結果如下 

線程12開始出發
線程14開始出發
線程15開始出發
線程17開始出發
線程13開始出發
線程16開始出發
線程18開始出發
線程19開始出發
線程20開始出發
線程21開始出發
線程16已到達終點
線程13已到達終點
線程19已到達終點
線程18已到達終點
線程17已到達終點
線程14已到達終點
線程15已到達終點
線程12已到達終點
線程21已到達終點
線程20已到達終點
10個線程已經執行完畢!開始計算排名
View Code

     CountDownLatch在並行化應用中也是比較常用。常用的並行化框架OpenMP中也是借鑒了這種思想。比如有這樣的一個需求,在你淘寶訂單的時候,這筆訂單可能還需要查,用戶信息,折扣信息,商家信息,商品信息等,用同步的方式(也就是串行的方式)流程如下。

   

         設想一下這5個查詢服務,平均每次消耗100ms,那么本次調用至少是500ms,我們這里假設,在這個這五個服務其實並沒有任何數據依賴,誰先獲取誰后獲取都可以,那么我們可以想辦法並行化這五個服務。

         

       這里可以使用CountDownLatch來實現這個效果。         

public class CountDownDemo {
    private static final int CORE_POOL_SIZE = 4;
    private static final int MAX_POOL_SIZE = 8;
    private static final long KEEP_ALIVE_TIME = 5L;
    private final static int QUEUE_SIZE = 1600;

    protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,
            KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));

    public static void main(String[] args) throws InterruptedException {
        // 新建一個為5的計數器
        CountDownLatch countDownLatch = new CountDownLatch(5);
        OrderInfo orderInfo = new OrderInfo();
        THREAD_POOL.execute(() -> {
            System.out.println("當前任務Customer,線程名字為:" + Thread.currentThread().getName());
            orderInfo.setCustomerInfo(new CustomerInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當前任務Discount,線程名字為:" + Thread.currentThread().getName());
            orderInfo.setDiscountInfo(new DiscountInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當前任務Food,線程名字為:" + Thread.currentThread().getName());
            orderInfo.setFoodListInfo(new FoodListInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當前任務Tenant,線程名字為:" + Thread.currentThread().getName());
            orderInfo.setTenantInfo(new TenantInfo());
            countDownLatch.countDown();
        });
        THREAD_POOL.execute(() -> {
            System.out.println("當前任務OtherInfo,線程名字為:" + Thread.currentThread().getName());
            orderInfo.setOtherInfo(new OtherInfo());
            countDownLatch.countDown();
        });
        countDownLatch.await(1, TimeUnit.SECONDS);
        System.out.println("主線程:" + Thread.currentThread().getName());
    }

}
View Code

       建立一個線程池(具體配置根據具體業務,具體機器配置),進行並發的執行我們的任務(生成用戶信息,菜品信息等),最后利用await方法阻塞等待結果成功返回。  

循環柵欄CyclicBarrier

       字面意思循環柵欄,柵欄就是一種障礙物。這里就是內存屏障。通過它可以實現讓一組線程等待至某個狀態之后再全部同時執行。叫做回環是因為當所有等待線程都被釋放以后,CyclicBarrier可以被重用。CyclicBarrier比CountDownLatch 功能更強大一些,CyclicBarrier可以接受一個參數作為barrierAction。所謂barrierAction就是當計算器一次計數完成后,系統會執行的動作。CyclicBarrier強調的是n個線程,大家相互等待,只要有一個沒完成,所有人都得等着。(這種思想在高性能計算最為常見,GPU計算中關於也有類似內存屏障的用法)。構造函數如下,其中parties表示計數總數,也就是參與的線程總數。

public CyclicBarrier(int parties, Runnable barrierAction) {
} 
public CyclicBarrier(int parties) {
}

     案例10個人去旅行,規定達到一個地點后才能繼續前行.代碼如下         

class CyclicBarrierWorker implements Runnable {
    private int id;
    private CyclicBarrier barrier;
    public CyclicBarrierWorker(int id, final CyclicBarrier barrier) {
        this.id = id;
        this.barrier = barrier;
    }
    @Override
    public void run() {
        try {
            Thread.sleep(Math.abs(new Random().nextInt()%10000));
            System.out.println(id + " th people wait");
            barrier.await(); // 大家等待最后一個線程到達
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

public class TestCyclicBarrier {
    public static void main(String[] args) {
        int num = 10;
        CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() {
                @Override
            public void run() {
                System.out.println("go on together!");
            }
        });
        for (int i = 1; i <= num; i++) {
            new Thread(new CyclicBarrierWorker(i, barrier)).start();
        }
    }
}
View Code

 

       

從上面輸出結果可以看出,每個線程執行自己的操作之后,就在等待其他線程執行操作完畢。當所有線程線程執行操作完畢之后,所有線程就繼續進行后續的操作了。

      

參考資料

《Java高並發編程設計》

  https://www.cnblogs.com/-new/p/7256297.html

  https://www.cnblogs.com/dolphin0520/p/3923167.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM