一、前言
前段時間結束了jdk1.8集合框架的源碼閱讀,在過年的這段時間里,一直在准備JUC(java.util.concurrent)的源碼閱讀。平時接觸的並發場景開發並不很多,但是有網絡的地方,就存在並發,所以想找幾本書閱讀深入一下,看到網上推薦較多的兩本書《Java並發編程實戰》和《Java多線程編程核心技術》。看了兩書的優缺點后,筆者選擇了先看后者,據說代碼例子較多,書到手后,看完后的印象就是對並發的關鍵字、幾個常見類的api進行了介紹,內容挺早以前,講的也是不是很深,對Java SE5新加的類介紹很少,只能說對於剛接觸並發編程的人來說,還是值得一看的。
二、java.util.concurrent.locks圖概覽
在JUC的包里面,有一個包專門用於存放鎖相關的類,筆者將其中的大部分內容整理進了下面UML中:

具體關系大家可以去看看UML的關系圖,順便介紹個生成UML的工具:PlantUml,UML界的markdown,真的挺好用。
圖中要提的是:圓圈里面有個+的關系,代表內部類,筆者為了圖片看的更簡潔,把ReentrantLock、Semaphore等類中的內部類Sync合到了一起,其實它們是一個類,只不過都叫這個名字。
從圖中我們可以看到,關系較為緊密的是AbstractQueuedSynchronizer抽象類,而它則直接依賴了LockSupport這個類,筆者將在后面先分析這個類的源碼。
三、基礎接口的源碼解析
3.1 Lock接口
在JDK1.5以后,添加了Lock接口,它用於實現與Synchronized關鍵字相同的鎖操作,來實現多個線程控制對共享資源的訪問。但是能提供更加靈活的結構,可能具有完全不同的屬性,並且可能支持多個相關的Condition對象。基本用法如下:
Lock l = ...;
l.lock();
try {
// 訪問被鎖保護的資源
} finally {
l.unlock();
}
下面我們來簡單看一下它下面的具體內容:
public interface Lock {
// 獲得鎖資源
void lock();
// 嘗試獲得鎖,如果當前線程被調用了interrupted則中斷,並拋出異常,否則就獲得鎖
void lockInterruptibly() throws InterruptedException;
// 判斷能否獲得鎖,如果能獲得,則獲得鎖,並返回true(此時已經獲得了鎖)
boolean tryLock();
// 保持給定的等待時間,如果期間能拿到鎖,則獲得鎖,同樣如果期間被中斷,則拋異常
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 釋放鎖
void unlock();
// 返回與此Lock對象綁定Condition實例
Condition newCondition();
}
其中,tryLock只會嘗試一次,如果返回false,則走false的流程,不會一直讓線程一直等待。
3.2 Condition接口
Condition與Lock要結合使用,使用Condition可以用來實現wait()和notify()/notifyAll()類似的等待/通知模式。與Object對象里不同的是,Condition更加靈活,可以在一個Lock對象里創建多個Condition實例,有選擇的進行線程通知,在線程調度上更加靈活。使用Condition注釋上的例子:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
// 當count等於數組的大小時,當前線程等待,直到notFull通知,再進行生產
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
// 當count為0,進入等待,直到notEmpty通知,進行消費。
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
可以通過多個線程來調用put和take方法,來模擬生產者和消費者。
我們來換成常規的wait/notify的實現方式:
class BoundedBuffer {
private final Object lock;
public BoundedBuffer(Object lock) {
this.lock = lock;
}
public void put(Object x) {
try {
synchronized (items) {
while (count == items.length) {
items.wait();
}
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
// items.notify();
items.notifyAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Object take() {
try {
synchronized (items) {
while (count == 0) {
items.wait();
}
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
// items.notify();
items.notifyAll();
return x;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
如果將items.notifyAll()換成items.notify(),在多生產者和多消費者模式情況下,可能出現take喚醒了take的情況,導致生產者在等待消費者消費,而消費者等待生產者生產,最終導致程序無限等待,而用notifyAll(),則喚醒所有的生產者和消費者,不像Condition可以選擇性的通知。下面我們來看一下它的源碼:
public interface Condition {
// 讓當前線程等待,直到被通知或者被中斷
void await() throws InterruptedException;
// 與前者的區別是,當等待過程中被中斷時,仍會繼續等待,直到被喚醒,才會設置中斷狀態
void awaitUninterruptibly();
// 讓當前線程等待,直到它被告知或中斷,或指定的等待時間已經過。
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 與上面的類似,讓當前線程等待,不過時間單位是納秒
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 讓當前線程等待到確切的指定時間,而不是時長
boolean awaitUntil(Date deadline) throws InterruptedException;
// 喚醒一個等待當前condition的線程,有多個則隨機選一個
void signal();
// 喚醒所有等待當前condition的線程
void signalAll();
}
3.3 ReadWriteLock接口
讀寫鎖與一般的互斥鎖不同,它分為讀鎖和寫鎖,在同一時間里,可以有多個線程獲取讀鎖來進行共享資源的訪問。如果此時有線程獲取了寫鎖,那么讀鎖的線程將等待,直到寫鎖釋放掉,才能進行共享資源訪問。簡單來說就是讀鎖與寫鎖互斥。
讀寫鎖比互斥鎖允許對於共享數據更大程度的並發。每次只能有一個寫線程,但是同時可以有多個線程並發地讀數據。ReadWriteLock適用於讀多寫少的並發情況。
public interface ReadWriteLock {
// 返回寫鎖
Lock writeLock();
// 返回讀鎖
Lock readLock();
}
再看一下源碼里提供的示例:
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
// 獲得寫鎖
rwl.readLock().lock();
// 緩存無效,則重寫數據
if (!cacheValid) {
// 在獲得寫鎖之前,必須先釋放讀鎖
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// 重寫檢查一次,因為其他線程可能在這段時間里獲得了寫鎖,並且修改了狀態
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 在釋放寫鎖之前,通過獲取讀鎖來降級。
rwl.readLock().lock();
} finally {
// 釋放寫鎖
rwl.writeLock().unlock();
}
}
// cacheValid,直接獲取數據,並釋放讀鎖
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
ReentrantReadWriteLock中,讀鎖可以獲取寫鎖,而返過來,寫鎖不能獲得讀鎖,所以在上面代碼中,要先釋放寫鎖,再獲取讀鎖,具體的源碼分析后面再細說。
四、總結
開了個新坑,邊看邊學。最后謝謝各位園友觀看,如果有描述不對的地方歡迎指正,與大家共同進步!
