ReentrantLock和BlockingQueue
首先,看到這個標題,不要懷疑自己進錯文章,也不要懷疑筆者寫錯,哈哈。本章筆者會從BlockingQueue(阻塞隊列)的角度,看看juc包下的阻塞隊列是如何使用ReentrantLock。這個章節筆者會介紹部分阻塞隊列的源碼,但不會着墨過多,我們的重點依舊在ReentrantLock上。
BlockingQueue(阻塞隊列)是juc包下提供一種數據結構,相比普通的隊列,阻塞隊列可以讓我們在不需要關心線程安全的情況下往隊列中存取數據。此外,阻塞隊列還支持我們從一個隊列中獲取元素時,如果隊列為空則陷入阻塞,直到隊列有元素入隊為止;也支持我們向隊列中存儲元素時,如果隊列已滿則陷入阻塞,直到隊列有多余的位置可以容納待入隊的元素。
不論是入隊、出隊亦或是檢查隊列元素,阻塞隊列都提供了多種方法,這些方法都可以實現入隊、出隊和檢查,但每個方法都有自己的特性和適用場景:
- 入隊:
- add(E e):嘗試將指定元素e插入到隊列,如果沒有超過隊列的容量限制則返回true表示成功,否則拋出IllegalStateException異常表示隊列沒有多余的空間容納元素。
- offer(E e):嘗試將指定元素e插入到隊列,如果沒有超過隊列的容量限制則返回true表示成功,如果隊列沒有多余的空間容納元素則返回false。
- put(E e):嘗試將指定元素e插入到隊列,如果沒有超過隊列的容量則立即返回,否則陷入阻塞直到隊列有多余的空間容納元素。
- offer(E e, long timeout, TimeUnit unit):嘗試將指定元素e插入到隊列,如果沒有超過隊列的容量則立即返回;如果超時前隊列有多余的空間可容納元素則返回true,否則返回false。
- 出隊:
- take():返回並移除隊頭元素,如果隊列為空則陷入阻塞直到有元素入隊。
- poll():返回並移除隊頭元素,此方法不會陷入阻塞,如果隊列為空則返回null。
- poll(time, unit):返回並移除隊頭元素,隊列為空則陷入阻塞,如果在超時前都沒有元素入隊則返回null。
- remove():返回並移除隊頭元素,如果隊列為空則拋出NoSuchElementException異常。
- 檢查:
- element():返回隊頭元素,但不移除,如果隊列為空則拋出NoSuchElementException異常。
- peek():返回隊頭元素,但不移除,如果隊列為空則返回null。
public interface Queue<E> extends Collection<E> { //... element(); peek(); E poll(); //... } public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; E take() throws InterruptedException; E poll(long timeout, TimeUnit unit) throws InterruptedException; boolean remove(Object o); //... }
當我們試圖向阻塞隊列中添加一個空元素(null),阻塞隊列會拋出NullPointerException異常。在阻塞隊列中null是一個敏感值,一般用於表示隊列無元素或者超時獲取元素失敗,如果允許往隊列中添加空元素,就無法分辨返回的null到底是獲取元素失敗,還是這個空元素本身就是隊列中的元素。
阻塞隊列可以設置容量,如果我們添加的元素超過隊列剩余可容納的數量,可能會陷入阻塞。阻塞隊列主要用於生產者-消費者隊列這樣的場景,此外阻塞隊列還實現了Collection接口。我們可以調用remove(Object o)從隊列中移除一個元素,例如從隊列中移除一條消息,但這個方法的實現效率通常不是很高,應謹慎使用。
阻塞隊列是線程安全的,其實現需要通過內部鎖或者其他形式的並發控制來保證原子性。但是阻塞隊列繼承自Collection接口的addAll、containsAll、retainAll 、removeAll並不保證原子性。例如:addAll(c)的實現是循環將元素通過add(e)方法入隊,那么在超過隊列容量的時候,就會拋出異常。
下面,讓我們思考下如何實現一個線程安全的阻塞隊列。可能很多人看到文章的標題,都會直接想到ReentrantLock,只要涉及到元素的存取,我們都可以在業務執行前和業務執行后加上lock()和unlock(),這樣就能保證線程的安全性。於是,我們阻塞隊列的實現就變得尤為簡單了:
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class MyBlockingQueue<E> { private Lock lock = new ReentrantLock(); public void put(E e) { try { lock.lock(); //put method body... } finally { lock.unlock(); } } public E take() { try { lock.lock(); //take method body... } finally { lock.unlock(); } } }
但如果我們的實現真的這么簡單暴力,會出現一個問題,假設Thread-1線程執行put(e)方法並占用了鎖,但發現隊列已滿陷入阻塞。此時Thread-2需要從隊列中獲取元素,同樣要占有鎖后再從隊列中獲取元素。但先前Thread-1已經占有了鎖,Thread-2還能占有鎖嗎?或者換一個說法,在隊列是空的時候Thread-2要從隊列中獲取元素,此時會先占有鎖在陷入阻塞,Thread-1想往隊列中添加元素,它能獲取到鎖嗎?如果僅僅是使用ReentrantLock,那肯定是獲取不到的。
按照我們現有的實現方式,隊列滿時獲取元素的線程一定要優先在添加元素的線程,如果添加元素的線程優先於獲取元素的線程,會出現添加元素的線程占有了鎖,並等待隊列空出多余的位置容納元素,而獲取元素的線程等待前一個線程釋放鎖,兩個線程永遠無法結束;或者隊列為空的時候添加元素的線程一定要優先在獲取元素的線程,如果獲取元素的線程優先於添加元素的線程,會出現獲取元素占有了鎖,並等待有元素入隊,同時添加元素的線程等待前一個線程釋放鎖,兩個線程同樣無法結束。
但我們要知道,這種實現方式一定是不合理的,我們不能要求開發者在往阻塞隊列中存取元素的時候,要求哪個線程要優先哪個線程,甚至是開發者自己都很難做這個優先順序。那么像juc包下的阻塞隊列又是如何在保證線程安全的情況下,避免出現上面所說的死鎖呢?我們來看看實現較為簡單的ArrayBlockingQueue:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; //... public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } //... public void put(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } //... }
可以看到,在ArrayBlockingQueue的實現中,也是非常簡單暴力的用ReentrantLock的lock()、unlock()來實現隊列的存取。那么我們用一段代碼來測試下,看看ArrayBlockingQueue在隊列滿時還有線程往內添加元素的時候,能否從隊列獲取元素。
putAndTake(BlockingQueue<Integer> queue, int n)接收一個阻塞隊列和一個數值n,會啟動n個線程往阻塞隊列添加元素,之后休眠1s,確認目前n個線程要嘛添加元素成功,要嘛隊列已滿處於阻塞狀態,最后循環n次從阻塞隊列中獲取元素。這里我們只要保證n的數值大於阻塞隊列的容量,就可以保證n個線程里會存在部分線程往阻塞隊列添加元素時被阻塞。在main方法中我們設定線程數n為5,阻塞線程的容量為5/2=2,我們分別創建兩個阻塞隊列ArrayBlockingQueue和LinkedBlockingQueue,看看當隊列滿時仍然有線程向阻塞隊列存放元素,獲取元素的線程能否正常從隊列中獲取元素。
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class BlockQueueTest { public static void putAndTake(BlockingQueue<Integer> queue, int n) { for (int i = 0; i < n; i++) {//<1> new Thread(() -> { try { int r = new Random().nextInt(20); queue.put(r); System.out.println("往隊列存放數值:" + r); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } try { Thread.sleep(1000);//<2> } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < n; i++) {//<3> try { System.out.println("從隊列取出數值:" + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { int n = 5; int capacity = n / 2; BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(capacity); System.out.println("_____ArrayBlockingQueue_____"); putAndTake(queue, n); System.out.println("_____LinkedBlockingQueue_____"); queue = new LinkedBlockingQueue<>(capacity); putAndTake(queue, n); } }
執行結果:
_____ArrayBlockingQueue_____ 往隊列存放數值:9 往隊列存放數值:16 往隊列存放數值:11 從隊列取出數值:9 從隊列取出數值:16 往隊列存放數值:10 往隊列存放數值:4 從隊列取出數值:11 從隊列取出數值:10 從隊列取出數值:4 _____LinkedBlockingQueue_____ 往隊列存放數值:4 往隊列存放數值:19 從隊列取出數值:4 往隊列存放數值:13 往隊列存放數值:17 從隊列取出數值:19 從隊列取出數值:13 往隊列存放數值:4 從隊列取出數值:17 從隊列取出數值:4
可以看到ArrayBlockingQueue和LinkedBlockingQueue並沒有我們先前假定的情況,但按照先前我們看到的ArrayBlockingQueue的take()和put(e)的實現,又會在方法的開始便占有鎖。那么ArrayBlockingQueue是怎么做到當隊列滿時,存放元素的線程占有鎖后陷入阻塞,又允許獲取元素的線程搶鎖,然后從隊列中獲取元素呢?
首先我們能肯定,只要是不同的線程在競爭同一個可重入互斥鎖,如果鎖被占用,一定要等到鎖被完全釋放成為無主狀態,別的線程才可以占有鎖。因此不論時隊列滿時有線程占有鎖並嘗試往隊列存放元素,又或者隊列為空時有線程占有鎖從隊列獲取元素,這兩種情況一定會在某一個時機釋放鎖,因為這兩個線程一定會被阻塞起來,直到有線程從已滿的隊列中獲取元素,或者有線程向空隊列存放元素,且這兩個線程不能影響別的線程從已滿的隊列獲取元素,或者向空隊列存放元素。
那么在下面的take()和put(e)兩個方法中會是哪一段代碼偷偷完成釋放鎖並阻塞當前線程這一操作呢?很有可能是<1>處和<2>處。這里我們看到ReentrantLock另外一種用法,我們可以用ReentrantLock生成一個Condition條件對象,相信會有人產生疑問,Condition對象在下面的代碼究竟起到什么樣的作用呢?
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final Object[] items; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; //... public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //... public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await();//<2> return dequeue(); } finally { lock.unlock(); } } //... public void put(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await();//<1> enqueue(e); } finally { lock.unlock(); } } //... }
在介紹Condition之前,我們先來回顧下synchronized關鍵字,我們知道synchronized可以對一個對象加鎖以保證並發情況下線程以串行的方式訪問同步代碼塊,當線程執行完同步代碼塊里的代碼,就會釋放對象鎖。但是synchronized也支持線程在尚未執行完同步代碼塊中的代碼時,就調用object.wait()釋放對象鎖從而讓當前線程進入阻塞,當需要線程繼續往下執行時,調用object.notify()或者object.notifyAll()喚醒阻塞線程。
synchronized (object){ //... }
下面我們用一個例子來加深對synchronized的理解,Restaurant類的main方法中,我們先在<1>處生成一個廚師對象cook,然后在<2>處會循環生成6個線程,每個線程都作為一個顧客獲取cook的對象鎖,在通知廚師做菜后陷入阻塞。之后主線程休眠1s,確保6個線程都陷入阻塞后,調用<3>處的cook.finishOne()方法后間接調用cook.notify()隨機選擇一個線程喚醒,被喚醒的線程會退出object.wait()然后重新競爭object對象鎖,在占有對象鎖后線程繼續執行cook.wait()之后的代碼,取走菜並銷毀當前線程。之后主方法又休眠了1s,調用<4>處的cook.finishAll()方法后間接調用cook.notifyAll()喚醒所有等待cook對象鎖的線程,所有等待線程會退出cook.wait()開始競爭cook對象鎖,接着按照之前的邏輯,搶鎖成功的線程取走菜然后銷毀線程。
public class Restaurant { //廚師類 static class Cook { public synchronized void finishOne() { System.out.println("廚師完成一道菜"); this.notify(); } public synchronized void finishAll() { System.out.println("廚師完成所有菜"); this.notifyAll(); } } public static void main(String[] args) { final Cook cook = new Cook();//<1> for (int i = 1; i <= 6; i++) { final int no = i; new Thread(() -> {//<2> synchronized (cook) { try { System.out.println("顧客" + no + "號通知廚師做菜"); cook.wait(); System.out.println("顧客" + no + "號取菜"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } try { Thread.sleep(1000); cook.finishOne();//<3> Thread.sleep(1000); cook.finishAll();//<4> } catch (InterruptedException e) { e.printStackTrace(); } } }
執行結果:
顧客1號通知廚師做菜 顧客2號通知廚師做菜 顧客5號通知廚師做菜 顧客3號通知廚師做菜 顧客4號通知廚師做菜 顧客6號通知廚師做菜 廚師完成一道菜 顧客1號取菜 廚師完成所有菜 顧客2號取菜 顧客6號取菜 顧客4號取菜 顧客3號取菜 顧客5號取菜
那么會有人問,這里的synchronized和之前我們說的Condition又有什么關系呢?別着急,現在就來回答這個問題。如果我們把synchronized等同於ReentrantLock,那么Condition就相當於對象鎖,當我們占有互斥鎖后,我們可以調用Condition.await()釋放當前線程對鎖的占用,並讓當前線程陷入阻塞,當我們希望當前線程重新執行時,可以調用Condition.signal()或Condition.signalAll()喚醒陷入阻塞的線程,被喚醒的線程會重新開始搶鎖,搶到鎖的線程會繼續執行Condition.await()之后的代碼,沒有搶到鎖的線程則接着等待。
我們用新的一個Restaurant2類來模擬上面的通知廚師做菜->廚師通知取菜的例子,這里我們用ReentrantLock和Condition來實現。我們在<1>處生成一個ReentrantLock類型的廚師鎖cook,在<2>處根據廚師鎖生成一個條件condition對象。之后我們在main方法的<3>處循環啟動6個線程,每個線程代表一個顧客去占有廚師鎖通知廚師做菜然后陷入阻塞。之后主線程分次調用finishOne()和finishAll(),繼而調用condition.signal()和condition.signalAll()通知一個線程取菜和通知所有線程取菜。
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Restaurant2 { private static final ReentrantLock cook = new ReentrantLock();//<1> private static final Condition condition = cook.newCondition();//<2> public static void finishOne() { try { cook.lock(); System.out.println("廚師完成一道菜"); condition.signal(); } finally { cook.unlock(); } } public static void finishAll() { try { cook.lock(); System.out.println("廚師完成所有菜"); condition.signalAll(); } finally { cook.unlock(); } } public static void main(String[] args) { for (int i = 1; i <= 6; i++) { final int no = i; new Thread(() -> {//<3> try { cook.lock(); System.out.println("顧客" + no + "通知廚師做菜"); condition.await(); System.out.println("顧客" + no + "號取菜"); } catch (InterruptedException e) { e.printStackTrace(); } finally { cook.unlock(); } }).start(); } try { Thread.sleep(1000); finishOne();//<4> Thread.sleep(1000); finishAll();//<5> } catch (InterruptedException e) { e.printStackTrace(); } } }
執行結果:
顧客3通知廚師做菜 顧客5通知廚師做菜 顧客2通知廚師做菜 顧客4通知廚師做菜 顧客1通知廚師做菜 顧客6通知廚師做菜 廚師完成一道菜 顧客3號取菜 廚師完成所有菜 顧客5號取菜 顧客2號取菜 顧客4號取菜 顧客1號取菜 顧客6號取菜
可以看到ReentrantLock也可以完成類似synchronized阻塞-通知的工作,那么ReentrantLock相比於synchronized的優勢又在哪里呢?是可擴展性,之前介紹ReentrantLock的時候就說過,ReentrantLock完成的工作雖然和synchronized相似,但比synchronized多了擴展性。如果在synchronized代碼塊里調用對象鎖的wait()方法,當前占有對象鎖的線程會釋放鎖並進入等待,JVM會幫我們維護這個對象鎖的等待線程集合,當我們調用object.notify(),JVM會幫我們選擇一個線程喚醒,我們只知道這個線程會退出object.wait()方法並開始搶鎖,但搶到鎖之后,我們並不清楚這個線程會從哪里開始執行。
比如下面的代碼,methodA()和methodB()都有對象鎖object的同步代碼塊,調用object.wait()都會陷入阻塞,但是當我們調用object.notify()時,我們無法指定要喚醒的線程是執行methodB()的線程而非執行methodA()的線程,JVM喚醒的線程,也可能是正在執行methodA()的線程。
public void methodA() { synchronized (object) { //... object.wait(); //... } } public void methodB() { synchronized (object) { //... object.wait(); //... } }
基於wait()、notify()、notifyAll()帶來的限制,juc包的作者Doug Lea大師推出了Condition對象,我們可以基於一個鎖生成多個Condition對象,每個Condition對象都類似synchronized的對象鎖,會維護自己的一個等待線程集合。當線程要執行某項工作,發現條件不滿足時可以調用Condition.await()方法釋放鎖並陷入阻塞,當線程在執行某項工作后發現條件滿足,也可以調用Condition.singal()通知需要此條件的線程。這樣講可能有些人還是不太理解,這里筆者接着以之前的阻塞隊列為例,看看如何借助ReentrantLock和Condition在阻塞隊列滿的情況下,存儲線程在占有鎖后釋放鎖,直到收到隊列有多余空間的消息為止,同理我們也可以看看,在隊列為空時,獲取線程在占有鎖后釋放鎖,直到收到隊列不為空的消息為止。
在初始化MyBlockingQueue的時候,會在<1>處生成一個可重入互斥鎖lock,同時會分別在<2>、<3>處生成隊列未滿(notFull)和隊列非空(notEmpty)兩個條件對象。當有存儲線程要存儲元素,調用put(e)方法並占有了鎖,如果發現隊列已滿,會先調用<4>處的未滿條件對象(notFull)釋放鎖並陷入阻塞,之后獲取線程要獲取元素,調用take()方法發現隊列非空,於是在取走一個元素后多出空余的空間,調用<7>處的未滿條件對象(notFull)通知其陷入等待的存儲線程,現在隊列中有空余的空間可以容納元素。同理,當有獲取線程要從隊列獲取元素,調用take()方法並占有了鎖,如果發現隊列是空的情況下會調用<6>處非空條件對象(notEmpty)釋放鎖並陷入阻塞,之后存儲線程向隊列存儲元素,先調用put(e)占有了鎖,發現隊列未滿,於是將元素存儲進隊列,然后調用<5>處的非空條件對象通知等待元素的線程可以從隊列中獲取元素了。
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class MyBlockingQueue<E> { final Lock lock = new ReentrantLock();//<1> final Condition notFull = lock.newCondition();//<2> final Condition notEmpty = lock.newCondition();//<3> final Object[] items; int putptr, takeptr, count; public MyBlockingQueue(int n) { items = new Object[n]; } public void put(E x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await();//<4> items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal();//<5> } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await();//<6> E x = (E) items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal();//<7> return x; } finally { lock.unlock(); } } }