本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》,由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買,京東自營鏈接:http://item.jd.com/12299018.html

在66節,我們介紹了利用synchronized實現鎖,我們提到了synchronized的一些局限性,本節,我們探討Java並發包中的顯式鎖,它可以解決synchronized的限制。
Java並發包中的顯式鎖接口和類位於包java.util.concurrent.locks下,主要接口和類有:
- 鎖接口Lock,主要實現類是ReentrantLock
- 讀寫鎖接口ReadWriteLock,主要實現類是ReentrantReadWriteLock
本節主要介紹接口Lock和實現類ReentrantLock,關於讀寫鎖,我們后續章節介紹。
接口Lock
顯式鎖接口Lock的定義為:
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
我們解釋一下:
- lock()/unlock():就是普通的獲取鎖和釋放鎖方法,lock()會阻塞直到成功。
- lockInterruptibly():與lock()的不同是,它可以響應中斷,如果被其他線程中斷了,拋出InterruptedException。
- tryLock():只是嘗試獲取鎖,立即返回,不阻塞,如果獲取成功,返回true,否則返回false。
- tryLock(long time, TimeUnit unit) :先嘗試獲取鎖,如果能成功則立即返回true,否則阻塞等待,但等待的最長時間為指定的參數,在等待的同時響應中斷,如果發生了中斷,拋出InterruptedException,如果在等待的時間內獲得了鎖,返回true,否則返回false。
- newCondition:新建一個條件,一個Lock可以關聯多個條件,關於條件,我們留待下節介紹。
可以看出,相比synchronized,顯式鎖支持以非阻塞方式獲取鎖、可以響應中斷、可以限時,這使得它靈活的多。
可重入鎖ReentrantLock
基本用法
Lock接口的主要實現類是ReentrantLock,它的基本用法lock/unlock實現了與synchronized一樣的語義,包括:
- 可重入,一個線程在持有一個鎖的前提下,可以繼續獲得該鎖
- 可以解決競態條件問題
- 可以保證內存可見性
ReentrantLock有兩個構造方法:
public ReentrantLock() public ReentrantLock(boolean fair)
參數fair表示是否保證公平,不指定的情況下,默認為false,表示不保證公平。所謂公平是指,等待時間最長的線程優先獲得鎖。保證公平會影響性能,一般也不需要,所以默認不保證,synchronized鎖也是不保證公平的,待會我們還會再分析實現細節。
使用顯式鎖,一定要記得調用unlock,一般而言,應該將lock之后的代碼包裝到try語句內,在finally語句內釋放鎖,比如,使用ReentrantLock實現Counter,代碼可以為:
public class Counter { private final Lock lock = new ReentrantLock(); private volatile int count; public void incr() { lock.lock(); try { count++; } finally { lock.unlock(); } } public int getCount() { return count; } }
使用tryLock避免死鎖
使用tryLock(),可以避免死鎖。在持有一個鎖,獲取另一個鎖,獲取不到的時候,可以釋放已持有的鎖,給其他線程機會獲取鎖,然后再重試獲取所有鎖。
我們來看個例子,銀行賬戶之間轉賬,用類Account表示賬戶,代碼如下:
public class Account { private Lock lock = new ReentrantLock(); private volatile double money; public Account(double initialMoney) { this.money = initialMoney; } public void add(double money) { lock.lock(); try { this.money += money; } finally { lock.unlock(); } } public void reduce(double money) { lock.lock(); try { this.money -= money; } finally { lock.unlock(); } } public double getMoney() { return money; } void lock() { lock.lock(); } void unlock() { lock.unlock(); } boolean tryLock() { return lock.tryLock(); } }
Account里的money表示當前余額,add/reduce用於修改余額。在賬戶之間轉賬,需要兩個賬戶都鎖定,如果不使用tryLock,直接使用lock,代碼看上去可以這樣:
public class AccountMgr { public static class NoEnoughMoneyException extends Exception {} public static void transfer(Account from, Account to, double money) throws NoEnoughMoneyException { from.lock(); try { to.lock(); try { if (from.getMoney() >= money) { from.reduce(money); to.add(money); } else { throw new NoEnoughMoneyException(); } } finally { to.unlock(); } } finally { from.unlock(); } } }
但這么寫是有問題的,如果兩個賬戶同時給對方轉賬,都先獲取了第一個鎖,則會發生死鎖。我們寫段代碼來模擬這個過程:
public static void simulateDeadLock() { final int accountNum = 10; final Account[] accounts = new Account[accountNum]; final Random rnd = new Random(); for (int i = 0; i < accountNum; i++) { accounts[i] = new Account(rnd.nextInt(10000)); } int threadNum = 100; Thread[] threads = new Thread[threadNum]; for (int i = 0; i < threadNum; i++) { threads[i] = new Thread() { public void run() { int loopNum = 100; for (int k = 0; k < loopNum; k++) { int i = rnd.nextInt(accountNum); int j = rnd.nextInt(accountNum); int money = rnd.nextInt(10); if (i != j) { try { transfer(accounts[i], accounts[j], money); } catch (NoEnoughMoneyException e) { } } } } }; threads[i].start(); } }
以上代碼創建了10個賬戶,100個線程,每個線程執行100次循環,在每次循環中,隨機挑選兩個賬戶進行轉賬。在我的電腦上,每次執行該段代碼,都會發生死鎖。讀者可以更改這些數值進行試驗。
我們使用tryLock來進行修改,先定義一個tryTransfer方法:
public static boolean tryTransfer(Account from, Account to, double money) throws NoEnoughMoneyException { if (from.tryLock()) { try { if (to.tryLock()) { try { if (from.getMoney() >= money) { from.reduce(money); to.add(money); } else { throw new NoEnoughMoneyException(); } return true; } finally { to.unlock(); } } } finally { from.unlock(); } } return false; }
如果兩個鎖都能夠獲得,且轉賬成功,則返回true,否則返回false,不管怎樣,結束都會釋放所有鎖。transfer方法可以循環調用該方法以避免死鎖,代碼可以為:
public static void transfer(Account from, Account to, double money) throws NoEnoughMoneyException { boolean success = false; do { success = tryTransfer(from, to, money); if (!success) { Thread.yield(); } } while (!success); }
獲取鎖信息
除了實現Lock接口中的方法,ReentrantLock還有一些其他方法,通過它們,可以獲取關於鎖的一些信息,這些信息可以用於監控和調試目的,比如:
//鎖是否被持有,只要有線程持有就返回true,不一定是當前線程持有 public boolean isLocked() //鎖是否被當前線程持有 public boolean isHeldByCurrentThread() //鎖被當前線程持有的數量,0表示不被當前線程持有 public int getHoldCount() //鎖等待策略是否公平 public final boolean isFair() //是否有線程在等待該鎖 public final boolean hasQueuedThreads() //指定的線程thread是否在等待該鎖 public final boolean hasQueuedThread(Thread thread) //在等待該鎖的線程個數 public final int getQueueLength()
實現原理
ReentrantLock的用法是比較簡單的,它是怎么實現的呢?在最底層,它依賴於上節介紹的CAS方法,另外,它依賴於類LockSupport中的一些方法。
LockSupport
類LockSupport也位於包java.util.concurrent.locks下,它的基本方法有:
public static void park() public static void parkNanos(long nanos) public static void parkUntil(long deadline) public static void unpark(Thread thread)
park使得當前線程放棄CPU,進入等待狀態(WAITING),操作系統不再對它進行調度,什么時候再調度呢?有其他線程對它調用了unpark,unpark需要指定一個線程,unpark會使之恢復可運行狀態。我們看個例子:
public static void main(String[] args) throws InterruptedException { Thread t = new Thread (){ public void run(){ LockSupport.park(); System.out.println("exit"); } }; t.start(); Thread.sleep(1000); LockSupport.unpark(t); }
線程t啟動后調用park,會放棄CPU,主線程睡眠1秒鍾后,調用unpark,線程t恢復運行,輸出exit。
park不同於Thread.yield(),yield只是告訴操作系統可以先讓其他線程運行,但自己依然是可運行狀態,而park會放棄調度資格,使線程進入WAITING狀態。
需要說明的是,park是響應中斷的,當有中斷發生時,park會返回,線程的中斷狀態會被設置。另外,還需要說明一下,park可能會無緣無故的返回,程序應該重新檢查park等待的條件是否滿足。
park有兩個變體:
- parkNanos:可以指定等待的最長時間,參數是相對於當前時間的納秒數。
- parkUntil:可以指定最長等到什么時候,參數是絕對時間,是相對於紀元時的毫秒數。
當等待超時的時候,它們也會返回。
這些park方法還有一些變體,可以指定一個對象,表示是由於該對象進行等待的,以便於調試,通常傳遞的值是this,這些方法有:
public static void park(Object blocker) public static void parkNanos(Object blocker, long nanos) public static void parkUntil(Object blocker, long deadline)
LockSupport有一個方法,可以返回一個線程的blocker對象:
public static Object getBlocker(Thread t)
這些park/unpark方法是怎么實現的呢?與CAS方法一樣,它們也調用了Unsafe類中的對應方法,Unsafe類最終調用了操作系統的API,從程序員的角度,我們可以認為LockSupport中的這些方法就是基本操作。
AQS (AbstractQueuedSynchronizer)
利用CAS和LockSupport提供的基本方法,就可以用來實現ReentrantLock了。但Java中還有很多其他並發工具,如ReentrantReadWriteLock、Semaphore、CountDownLatch,它們的實現有很多類似的地方,為了復用代碼,Java提供了一個抽象類AbstractQueuedSynchronizer,我們簡稱為AQS,它簡化了並發工具的實現。AQS的整體實現比較復雜,我們主要以ReentrantLock的使用為例進行簡要介紹。
AQS封裝了一個狀態,給子類提供了查詢和設置狀態的方法:
private volatile int state; protected final int getState() protected final void setState(int newState) protected final boolean compareAndSetState(int expect, int update)
用於實現鎖時,AQS可以保存鎖的當前持有線程,提供了方法進行查詢和設置:
private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread t) protected final Thread getExclusiveOwnerThread()
AQS內部維護了一個等待隊列,借助CAS方法實現了無阻塞算法進行更新。
下面,我們以ReentrantLock的使用為例簡要介紹下AQS的原理。
ReentrantLock
ReentrantLock內部使用AQS,有三個內部類:
abstract static class Sync extends AbstractQueuedSynchronizer static final class NonfairSync extends Sync static final class FairSync extends Sync
Sync是抽象類,NonfairSync是fair為false時使用的類,FairSync是fire為true時使用的類。ReentrantLock內部有一個Sync成員:
private final Sync sync;
在構造方法中sync被賦值,比如:
public ReentrantLock() { sync = new NonfairSync(); }
我們來看ReentrantLock中的基本方法lock/unlock的實現,先看lock方法,代碼為:
public void lock() { sync.lock(); }
NonfairSync的lock代碼為:
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
ReentrantLock使用state表示是否被鎖和持有數量,如果當前未被鎖定,則立即獲得鎖,否則調用acquire(1)獲得鎖,acquire是AQS中的方法,代碼為:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
它調用tryAcquire獲取鎖,tryAcquire必須被子類重寫,NonfairSync的實現為:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
nonfairTryAcquire是sync中實現的,代碼為:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
這段代碼應該容易理解,如果未被鎖定,則使用CAS進行鎖定,否則,如果已被當前線程鎖定,則增加鎖定次數。
如果tryAcquire返回false,則AQS會調用:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
其中,addWaiter會新建一個節點Node,代表當前線程,然后加入到內部的等待隊列中,限於篇幅,具體代碼就不列出來了。放入等待隊列后,調用acquireQueued嘗試獲得鎖,代碼為:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
主體是一個死循環,在每次循環中,首先檢查當前節點是不是第一個等待的節點,如果是且能獲得到鎖,則將當前節點從等待隊列中移除並返回,否則最終調用LockSupport.park放棄CPU,進入等待,被喚醒后,檢查是否發生了中斷,記錄中斷標志,在最終方法返回時返回中斷標志。如果發生過中斷,acquire方法最終會調用selfInterrupt方法設置中斷標志位,其代碼為:
private static void selfInterrupt() { Thread.currentThread().interrupt(); }
以上就是lock方法的基本過程,能獲得鎖就立即獲得,否則加入等待隊列,被喚醒后檢查自己是否是第一個等待的線程,如果是且能獲得鎖,則返回,否則繼續等待,這個過程中如果發生了中斷,lock會記錄中斷標志位,但不會提前返回或拋出異常。
ReentrantLock的unlock方法的代碼為:
public void unlock() { sync.release(1); }
release是AQS中定義的方法,代碼為:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease方法會修改狀態釋放鎖,unparkSuccessor會調用LockSupport.unpark將第一個等待的線程喚醒,具體代碼就不列舉了。
FairSync和NonfairSync的主要區別是,在獲取鎖時,即在tryAcquire方法中,如果當前未被鎖定,即c==0,FairSync多個一個檢查,如下:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } ...
這個檢查是指,只有不存在其他等待時間更長的線程,它才會嘗試獲取鎖。
這樣保證公平不是很好嗎?為什么默認不保證公平呢?保證公平整體性能比較低,低的原因不是這個檢查慢,而是會讓活躍線程得不到鎖,進入等待狀態,引起上下文切換,降低了整體的效率,通常情況下,誰先運行關系不大,而且長時間運行,從統計角度而言,雖然不保證公平,也基本是公平的。
需要說明是,即使fair參數為true,ReentrantLock中不帶參數的tryLock方法也是不保證公平的,它不會檢查是否有其他等待時間更長的線程,其代碼為:
public boolean tryLock() { return sync.nonfairTryAcquire(1); }
ReentrantLock對比synchronized
相比synchronized,ReentrantLock可以實現與synchronized相同的語義,但還支持以非阻塞方式獲取鎖、可以響應中斷、可以限時等,更為靈活。
不過,synchronized的使用更為簡單,寫的代碼更少,也更不容易出錯。
synchronized代表一種聲明式編程,程序員更多的是表達一種同步聲明,由Java系統負責具體實現,程序員不知道其實現細節,顯式鎖代表一種命令式編程,程序員實現所有細節。
聲明式編程的好處除了簡單,還在於性能,在較新版本的JVM上,ReentrantLock和synchronized的性能是接近的,但Java編譯器和虛擬機可以不斷優化synchronized的實現,比如,自動分析synchronized的使用,對於沒有鎖競爭的場景,自動省略對鎖獲取/釋放的調用。
簡單總結,能用synchronized就用synchronized,不滿足要求,再考慮ReentrantLock。
小結
本節主要介紹了顯式鎖ReentrantLock,介紹了其用法和實現原理,在用法方面,我們重點介紹了使用tryLock避免死鎖,在原理上,ReentrantLock使用CAS、LockSupport和AQS,最后,我們比較了ReentrantLock和synchronized,建議優先使用synchronized。
下一節,我們來看顯式條件。
(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic)
----------------
未完待續,查看最新文章,敬請關注微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及計算機技術的本質。用心原創,保留所有版權。

