
目前已經更新完《Java並發編程》,《Docker教程》和《JVM性能優化》,歡迎關注【后端精進之路】,輕松閱讀全部文章。
![]()
Java並發編程:
- Java並發編程系列-(1) 並發編程基礎
- Java並發編程系列-(2) 線程的並發工具類
- Java並發編程系列-(3) 原子操作與CAS
- Java並發編程系列-(4) 顯式鎖與AQS
- Java並發編程系列-(5) Java並發容器
- Java並發編程系列-(6) Java線程池
- Java並發編程系列-(7) Java線程安全
- Java並發編程系列-(8) JMM和底層實現原理
- Java並發編程系列-(9) JDK 8/9/10中的並發
Docker教程:
JVM性能優化:
4 顯示鎖和AQS
4.1 Lock接口
核心方法
Java在java.util.concurrent.locks包中提供了一系列的顯示鎖類,其中最基礎的就是Lock接口,該接口提供了幾個常見的鎖相關的操作。
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
下面分別進行介紹:
- void lock();
獲取鎖。如果鎖不可用,出於線程調度目的,將禁用當前線程,並且在獲得鎖之前,該線程將一直處於休眠狀態。
- void lockInterruptibly();
如果當前線程未被中斷,則獲取鎖。如果鎖可用,則獲取鎖,並立即返回。與lock()接口唯一的區別是可以被中斷。
- boolean tryLock();
試圖獲取鎖,若鎖可用,則獲取鎖,並立即返回值true。若鎖不可用,則此方法將立即返回值false。
- boolean tryLock(long time, TimeUnit unit) throws
與上個方法不同的就是給定了超時時間,若鎖在給定的等待時間內空閑,並且當前線程未被中斷,則獲取鎖。
- Condition newCondition();
返回綁定到此 Lock 實例的新 Condition 實例。
使用模板
通常使用顯示鎖Lock時,會采用下面的操作流程:
lock.lock();
try {
//...需要保證線程安全的代碼。
} finally {
lock.unlock();
}
Lock的lock()方法保證了只有一個線程能夠執有此鎖。對於任何一個lock()方法,都需要一個unlock()方法與之對應,通常情況下為了保證unlock()方法總是能夠執行,unlock()方法被置於finally中。
Lock VS synchronized
Synchronized是Java的關鍵字,當它用來修飾一個方法或一個代碼塊時,能夠保證在同一時刻最多只有一個線程執行該代碼。因為當調用Synchronized修飾的代碼時,並不需要顯示的加鎖和解鎖的過程,代碼簡潔,一般稱之為隱式鎖。
Lock是一個接口,提供了無條件的、可輪詢的、定時的、可中斷的鎖獲取操作,所有的加鎖和解鎖操作方法都是顯示的,因而稱為顯示鎖。
4.2 ReentrantLock
可重入鎖ReentrantLock是對Lock接口的一種實現,支持當一個線程獲取鎖以后,可以再次得到該對象鎖。
ReentrantLock在初始化時,需要設定該鎖的公平性:
- 如果在時間上,先對鎖進行獲取的請求,一定先被滿足,這個鎖就是公平的,不滿足,就是非公平的
- 非公平的效率一般來講更高
ReentrantLock的特性如下:
1. 可重入
synchronized和ReentrantLock均有可重入性,即一個線程請求得到一個對象鎖后再次請求此對象鎖,可以再次得到該對象鎖。
在使用synchronized時,當一個線程已經進入到synchronized方法/塊中時,可以進入到本類的其他synchronized方法/塊中。
2. 可中斷
在lockInterruptibly()鎖定的同時,還可以響應中斷通知。一旦接收到中斷通知,就會拋出InterruptedException異常。
這點與synchronized不同,在synchronized加鎖的代碼中,無法獲取中斷通知。
3. 可設置超時
ReentrantLock.tryLock()方法用於嘗試鎖定。參數為等待時間。該方法返回boolean值。若鎖定成功,則返回true。鎖定失敗,則返回false。tryLock方法在超時不能獲得鎖時,就返回false,不會永久等待構成死鎖。
4. 公平鎖
ReentrantLock內部利用AQS的線程隊列,可以實現公平鎖,但是性能相比非公平鎖會差一點。
在構造方法中,ReentrantLock(boolean fair),fair默認為false,當設置為true時,及表示當前構造的鎖是公平鎖。
當需要可定時的、可輪詢的與可中斷的鎖獲取操作,公平隊列,或者非塊結構的鎖,建議使用ReentrantLock。否則,請使用synchronized。在Java 1.6之后,ReentrantLock和synchronized性能相差不大,所以一般情況下,使用synchronized就足夠了,只有當有特定需求時,可以使用可重入鎖。
4.3 Lock與Condition實現消息傳遞
利用Lock和Condition可以實現消息的等待和通知,這里我們利用ReentrantLock來進行舉例。
注意在使用condition時,需要首先lock.newCondition來獲取Condition對象,如果有多個條件,需要針對不同的條件來獲取condition。
發送信號,調用condition.signal()方法;等待,調用condition.await()方法。
注意與notify與wait的區別,后者Object的方法,一般用在一個對象上進行等待,等待的線程和某個特定的對象綁定。當需要notify所有線程時,為了保證我們的消息被所有線程接收到,通常使用notifyAll發送消息。但是使用condition對象,await和signal操作都是在condition對象是進行的,所以使用signal通知時,不會存在等待其他消息的線程阻止消息傳遞,所以通常使用signal而不是signalAll。
public class ExpressCond {
public final static String CITY = "ShangHai";
private int km;/*快遞運輸里程數*/
private String site;/*快遞到達地點*/
private Lock lock = new ReentrantLock();
private Condition keCond = lock.newCondition();
private Condition siteCond = lock.newCondition();
public ExpressCond() {
}
public ExpressCond(int km, String site) {
this.km = km;
this.site = site;
}
/* 變化公里數,然后通知處於wait狀態並需要處理公里數的線程進行業務處理*/
public void changeKm(){
lock.lock();
try {
this.km = 101;
keCond.signal();
}finally {
lock.unlock();
}
}
/* 變化地點,然后通知處於wait狀態並需要處理地點的線程進行業務處理*/
public void changeSite(){
lock.lock();
try {
this.site = "BeiJing";
siteCond.signal();
}finally {
lock.unlock();
}
}
/*當快遞的里程數大於100時更新數據庫*/
public void waitKm(){
lock.lock();
try {
while(this.km<=100) {
try {
keCond.await();
System.out.println("check km thread["+Thread.currentThread().getId()
+"] is be notifed.");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}finally {
lock.unlock();
}
System.out.println("the Km is "+this.km+",I will change db");
}
/*當快遞到達目的地時通知用戶*/
public void waitSite(){
lock.lock();
try {
while(CITY.equals(this.site)) {
try {
siteCond.await();
System.out.println("check site thread["+Thread.currentThread().getId()
+"] is be notifed.");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}finally {
lock.unlock();
}
System.out.println("the site is "+this.site+",I will call user");
}
}
下面是測試函數,將會喚醒一個等待km變化的線程。
public class TestCond {
private static ExpressCond express = new ExpressCond(0,ExpressCond.CITY);
/*檢查里程數變化的線程,不滿足條件,線程一直等待*/
private static class CheckKm extends Thread{
@Override
public void run() {
express.waitKm();
}
}
/*檢查地點變化的線程,不滿足條件,線程一直等待*/
private static class CheckSite extends Thread{
@Override
public void run() {
express.waitSite();
}
}
public static void main(String[] args) throws InterruptedException {
for(int i=0;i<3;i++){
new CheckSite().start();
}
for(int i=0;i<3;i++){
new CheckKm().start();
}
Thread.sleep(1000);
express.changeKm();//快遞里程變化
}
}
4.4 ReadWriteLock 和 ReentrantReadWriteLock
ReadWriteLock接口提供了單獨的讀鎖和寫鎖,
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
ReentrantReadWriteLock類是ReadWriteLock接口的一個實現,它與ReentrantLock類一樣提供了公平競爭與不公平競爭兩種機制,默認也是使用非公平競爭機制。
ReentrantReadWriteLock的可以被多個讀者訪問和一個寫者訪問,提供了讀寫分離功能:
- 讀-讀不互斥:讀讀之間不阻塞。
- 讀-寫互斥:讀阻塞寫,寫也會阻塞讀。
- 寫-寫互斥:寫寫阻塞。
ReentrantReadWriteLock在讀多寫少的場景下,具有很強的性能優勢。
WriteLock VS ReadLock
1.重入方面其內部的WriteLock可以獲取ReadLock,但是反過來ReadLock無法獲得WriteLock。
2.WriteLock可以降級為ReadLock,順序是:先獲得WriteLock再獲得ReadLock,然后釋放WriteLock,這時候線程將保持Readlock的持有。反過來ReadLock想要升級為WriteLock則不可能。
4.不管是ReadLock還是WriteLock都支持Interrupt,語義與ReentrantLock一致。
5.WriteLock支持Condition並且與ReentrantLock語義一致,而ReadLock則不能使用Condition,否則拋出UnsupportedOperationException異常。
ReentrantLock VS ReentrantReadWriteLock
-
ReentrantLock是排他鎖,使用非公平競爭機制時,搶占的機會相對還是比較少的,只有當新請求恰逢鎖釋放時才有機會搶占,所以發生線程飢餓的現象幾乎很少。
-
ReentrantReadWriteLock是共享鎖,或者說讀讀共享,並且經常使用於讀多寫少的場景,即請求讀操作的線程多而頻繁而請求寫操作的線程極少且間隔長,在這種場景下,使用非公平競爭機制極有可能造成寫線程飢餓。比如,R1線程此時持有讀鎖且在進行讀取操作,W1線程請求寫鎖所以需要排隊等候,在R1釋放鎖之前,如果R2,R3,...,Rn 不斷的到來請求讀鎖,因為讀讀共享,所以他們不用等待馬上可以獲得鎖,如此下去W1永遠無法獲得寫鎖,一直處於飢餓狀態。
參考鏈接:
4.5 LockSupport
LockSupport是一個方便的線程阻塞工具,它可以在線程的任何位置讓線程阻塞。與Thread.suspend()方法相比,它彌補了由於resume()方法導致線程無法繼續執行的情況。和Object.wait()方法相比,它不需要先獲得某個對象的鎖,也不會拋出InterruptedException異常。
LockSupport主要有兩個方法,
- LockSupport.park()
park()方法會阻塞當前線程(線程進入Waiting狀態),除非它獲取了"許可證"。
- LockSupport.unpark(Thread t)
unpark(Thread t)方法會給線程t頒發一個"許可證"。
LockSupport使用了類似信號量的機制,它為每一個線程准備了一個許可,如果許可可用,park()方法會立刻返回,並且消費這個許可(也就是將許可變為不可用);如果許可不可用,就會阻塞,而unpack()方法則使得一個許可變為可用(但是和信號量不同的是,許可不可累加,永遠只能擁有不超過一個許可)。
4.6 AQS
AQS:AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其他同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),是JUC並發包中的核心基礎組件。JUC並發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。
AQS解決了實現同步器時涉及到的大量細節問題,例如獲取同步狀態、FIFO同步隊列。基於AQS來構建同步器可以帶來很多好處。它不僅能夠極大地減少實現工作,而且也不必處理在多個位置上發生的競爭問題。
AQS通過內置的FIFO同步隊列來完成資源獲取線程的排隊工作,如果當前線程獲取同步狀態失敗(鎖)時,AQS則會將當前線程以及等待狀態等信息構造成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。
AQS使用了模板方法設計模式,子類通過繼承同步器並實現它的抽象方法來管理同步狀態。
AQS模板方法
AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了如下三個方法來對同步狀態state進行操作,當然AQS可以確保對state的操作是安全的。
-
getState():返回同步狀態的當前值;
-
setState(int newState):設置當前同步狀態;
-
compareAndSetState(int expect, int update):使用CAS設置當前狀態,該方法能夠保證狀態設置的原子性;
獨占式獲取:
tryAcquire(int arg):獨占式獲取同步狀態,獲取同步狀態成功后,其他線程需要等待該線程釋放同步狀態才能獲取同步狀態
tryAcquireNanos(int arg,long nanos):超時獲取同步狀態,如果當前線程在nanos時間內沒有獲取到同步狀態,那么將會返回false,已經獲取則返回true;
tryRelease(int arg):獨占式釋放同步狀態;
acquire(int arg):獨占式獲取同步狀態,如果當前線程獲取同步狀態成功,則由該方法返回,否則,將會進入同步隊列等待,該方法將會調用可重寫的tryAcquire(int arg)方法;
acquireInterruptibly(int arg):與acquire(int arg)相同,但是該方法響應中斷,當前線程為獲取到同步狀態而進入到同步隊列中,如果當前線程被中斷,則該方法會拋出InterruptedException異常並返回;
isHeldExclusively():當前同步器是否在獨占式模式下被線程占用,一般該方法表示是否被當前線程所獨占;
共享式獲取:
tryAcquireShared(int arg):共享式獲取同步狀態,返回值大於等於0則表示獲取成功,否則獲取失敗;
tryReleaseShared(int arg):共享式釋放同步狀態;
acquireShared(int arg):共享式獲取同步狀態,如果當前線程未獲取到同步狀態,將會進入同步隊列等待,與獨占式的主要區別是在同一時刻可以有多個線程獲取到同步狀態;
acquireSharedInterruptibly(int arg):共享式獲取同步狀態,響應中斷;
tryAcquireSharedNanos(int arg, long nanosTimeout):共享式獲取同步狀態,增加超時限制;
獨占式釋放鎖:
release(int arg):獨占式釋放同步狀態,該方法會在釋放同步狀態之后,將同步隊列中第一個節點包含的線程喚醒;
共享式釋放鎖:
releaseShared(int arg):共享式釋放同步狀態;
當在實現自己的lock類時,需要子類覆蓋如下方法,
獨占式獲取 tryAcquire
獨占式釋放 tryRelease
共享式獲取 tryAcquireShared
共享式釋放 tryReleaseShared
這個同步器是否處於獨占模式 isHeldExclusively
CLH同步隊列
CLH同步隊列是一個FIFO雙向隊列,AQS依賴它來完成同步狀態的管理,當前線程如果獲取同步狀態失敗時,AQS則會將當前線程以及等待狀態等信息打包成一個節點(Node),並將其加入到CLH同步隊列,同時會阻塞當前線程。當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。

在CLH同步隊列中,一個節點表示一個線程,它保存着線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、后繼節點(next),
- CANCELLED,值為1 。場景:當該線程等待超時或者被中斷,需要從同步隊列中取消等待,則該線程被置1,即被取消(這里該線程在取消之前是等待狀態)。節點進入了取消狀態則不再變化;
- SIGNAL,值為-1。場景:后繼的節點處於等待狀態,當前節點的線程如果釋放了同步狀態或者被取消(當前節點狀態置為-1),將會通知后繼節點,使后繼節點的線程得以運行;
- CONDITION,值為-2。場景:節點處於等待隊列中,節點線程等待在Condition上,當其他線程對Condition調用了signal()方法后,該節點從等待隊列中轉移到同步隊列中,加入到對同步狀態的獲取中;
- PROPAGATE,值為-3。場景:表示下一次的共享狀態會被無條件的傳播下去;
- INITIAL,值為0,初始狀態。
其定義如下:
static final class Node {
/** 共享 */
static final Node SHARED = new Node();
/** 獨占 */
static final Node EXCLUSIVE = null;
/**
* 因為超時或者中斷,節點會被設置為取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變為其他狀態;
*/
static final int CANCELLED = 1;
/**
* 后繼節點的線程處於等待狀態,而當前節點的線程如果釋放了同步狀態或者被取消,將會通知后繼節點,使后繼節點的線程得以運行
*/
static final int SIGNAL = -1;
/**
* 節點在等待隊列中,節點線程等待在Condition上,當其他線程對Condition調用了signal()后,該節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中
*/
static final int CONDITION = -2;
/**
* 表示下一次共享式同步狀態獲取將會無條件地傳播下去
*/
static final int PROPAGATE = -3;
/** 等待狀態 */
volatile int waitStatus;
/** 前驅節點 */
volatile Node prev;
/** 后繼節點 */
volatile Node next;
/** 獲取同步狀態的線程 */
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
對於CLH同步隊列,一般有如下幾種操作:
1. 節點加入到同步隊列
隊列的主要變化是tail指向新節點、新節點的prev指向當前最后的節點,當前最后一個節點的next指向當前節點。
整個流程圖如下:

具體實現可以查看addWaiter(Node node)方法:
private Node addWaiter(Node mode) {
//新建Node
Node node = new Node(Thread.currentThread(), mode);
//快速嘗試添加尾節點
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS設置尾節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//多次嘗試
enq(node);
return node;
}
addWaiter(Node node)先通過快速嘗試設置尾節點,如果失敗,則調用enq(Node node)方法設置尾節點
private Node enq(final Node node) {
//多次嘗試,直到成功為止
for (;;) {
Node t = tail;
//tail不存在,設置為首節點
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//設置為尾節點
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
兩個方法都是通過一個CAS方法compareAndSetTail(Node expect, Node update)來設置尾節點,該方法可以確保節點是線程安全添加的。在enq(Node node)方法中,AQS通過“死循環”的方式來保證節點可以正確添加,只有成功添加后,當前線程才會從該方法返回,否則會一直執行下去。
2. 首節點移出同步隊列
首節點的線程釋放同步狀態后,將會喚醒它的后繼節點(next),而后繼節點將會在獲取同步狀態成功時將自己設置為首節點,這個過程非常簡單,head執行該節點並斷開原首節點的next和當前節點的prev即可,注意在這個過程是不需要使用CAS來保證的,因為只有一個線程能夠成功獲取到同步狀態。
流程圖如下:

獨占式同步狀態獲取與釋放
AQS提供了acquire(int arg)方法來進行獨占式同步狀態獲取,實現如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其中相關函數的定義為:
- tryAcquire:去嘗試獲取鎖,獲取成功則設置鎖狀態並返回true,否則返回false。該方法自定義同步組件自己實現,該方法必須要保證線程安全的獲取同步狀態。
- addWaiter:如果tryAcquire返回FALSE(獲取同步狀態失敗),則調用該方法將當前線程加入到CLH同步隊列尾部。
- acquireQueued:當前線程會根據公平性原則來進行阻塞等待(自旋),直到獲取鎖為止;並且返回當前線程在等待過程中有沒有中斷過。
- selfInterrupt:產生一個中斷。
acquireQueued方法為一個自旋的過程,當前線程(Node)進入同步隊列后,就會進入一個自旋的過程,當條件滿足,獲取到同步狀態后,就可以從這個自旋過程中退出,否則會一直執行下去。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//中斷標志
boolean interrupted = false;
/*
* 自旋過程,其實就是一個死循環而已
*/
for (;;) {
//當前線程的前驅節點
final Node p = node.predecessor();
//當前線程的前驅節點是頭結點,且同步狀態成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//獲取失敗,線程等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
當前線程會一直嘗試獲取同步狀態,當然前提是只有其前驅節點為頭結點才能夠嘗試獲取同步狀態,主要是為了保持FIFO同步隊列原則。頭節點釋放同步狀態后,將會喚醒其后繼節點,后繼節點被喚醒后需要檢查自己是否為頭節點。
在上面的流程中,當獲取失敗時,需要判斷是否阻塞當前線程,
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
在獲取同步狀態失敗后,線程並不是立馬進行阻塞,需要檢查該線程的狀態,檢查狀態的方法為 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,該方法主要靠前驅節點判斷當前線程是否應該被阻塞,代碼如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驅節點
int ws = pred.waitStatus;
//狀態為signal,表示當前線程處於等待狀態,直接放回true
if (ws == Node.SIGNAL)
return true;
//前驅節點狀態 > 0 ,則為Cancelled,表明該節點已經超時或者被中斷了,需要從同步隊列中取消
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
//前驅節點狀態為Condition、propagate
else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
這段代碼主要檢查當前線程是否需要被阻塞,具體規則如下:
-
如果當前線程的前驅節點狀態為SIGNAL,則表明當前線程需要被阻塞,直接返回true,當前線程阻塞
-
如果當前線程的前驅節點狀態為CANCELLED(ws > 0),則表明該線程的前驅節點已經等待超時或者被中斷了,則需要從CLH隊列中將該前驅節點刪除掉,直到回溯到前驅節點狀態 <= 0 ,返回false
-
如果前驅節點非SIGNAL,非CANCELLED,則通過CAS的方式將其前驅節點設置為SIGNAL,返回false
如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,則調用parkAndCheckInterrupt()方法阻塞當前線程:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
parkAndCheckInterrupt() 方法主要是把當前線程掛起,從而阻塞住線程的調用棧,同時返回當前線程的中斷狀態。其內部則是調用LockSupport工具類的park()方法來阻塞該方法。
當線程釋放同步狀態后,則需要喚醒該線程的后繼節點:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒后繼節點
unparkSuccessor(h);
return true;
}
return false;
}
調用unparkSuccessor(Node node)喚醒后繼節點:
private void unparkSuccessor(Node node) {
//當前節點狀態
int ws = node.waitStatus;
//當前狀態 < 0 則設置為 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//當前節點的后繼節點
Node s = node.next;
//后繼節點為null或者其狀態 > 0 (超時或者被中斷了)
if (s == null || s.waitStatus > 0) {
s = null;
//從tail節點來找可用節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//喚醒后繼節點
if (s != null)
LockSupport.unpark(s.thread);
}
可能會存在當前線程的后繼節點為null,超時、被中斷的情況,如果遇到這種情況了,則需要跳過該節點,但是為何是從tail尾節點開始,而不是從node.next開始呢?原因在於node.next仍然可能會存在null或者取消了,所以采用tail回溯辦法找第一個可用的線程。最后調用LockSupport的unpark(Thread thread)方法喚醒該線程。
以上就是整個獨占式獲取和釋放的過程,流程圖如下:

獨占式獲取響應中斷
AQS提供了acquire(int arg)方法以供獨占式獲取同步狀態,但是該方法對中斷不響應,對線程進行中斷操作后,該線程會依然位於CLH同步隊列中等待着獲取同步狀態。為了響應中斷,AQS提供了acquireInterruptibly(int arg)方法,該方法在等待獲取同步狀態時,如果當前線程被中斷了,會立刻響應中斷拋出異常InterruptedException。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
首先校驗該線程是否已經中斷了,如果是則拋出InterruptedException,否則執行tryAcquire(int arg)方法獲取同步狀態,如果獲取成功,則直接返回,否則執行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定義如下:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireInterruptibly(int arg)方法與acquire(int arg)方法僅有兩個差別。
1.方法聲明拋出InterruptedException異常。
2.在中斷方法處不再是使用interrupted標志,而是直接拋出InterruptedException異常。
獨占式超時獲取
AQS除了提供上面兩個方法外,還提供了一個增強版的方法:tryAcquireNanos(int arg,long nanos)。該方法為acquireInterruptibly方法的進一步增強,它除了響應中斷外,還有超時控制。即如果當前線程沒有在指定時間內獲取同步狀態,則會返回false,否則返回true。
共享式同步狀態獲取與釋放
AQS提供acquireShared(int arg)方法共享式獲取同步狀態:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
//獲取失敗,自旋獲取同步狀態
doAcquireShared(arg);
}
從上面程序可以看出,方法首先是調用tryAcquireShared(int arg)方法嘗試獲取同步狀態,如果獲取失敗則調用doAcquireShared(int arg)自旋方式獲取同步狀態,共享式獲取同步狀態的標志是返回 >= 0 的值表示獲取成功。
private void doAcquireShared(int arg) {
/共享式節點
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//前驅節點
final Node p = node.predecessor();
//如果其前驅節點,獲取同步狀態
if (p == head) {
//嘗試獲取同步
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireShared(int arg)方法嘗試獲取同步狀態,返回值為int,當其 >= 0 時,表示能夠獲取到同步狀態,這個時候就可以從自旋過程中退出。
默認AQS沒有提供tryAcquireShared的實現,需要子類自己實現該方法,
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
注意到獨占式獲取鎖不同的是,如果tryAcquireShared的返回值大於0,會進行setHeadAndPropagate的操作,下面是該方法的實現,可以看到當某個節點被設置為head之后,如果它的后繼節點是SHARED狀態的,那么將繼續通過doReleaseShared方法嘗試往后喚醒節點,實現了共享狀態的向后傳播。doReleaseShared后面會仔細分析。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
獲取同步狀態后,完成相應的任務之后,需要調用release(int arg)方法釋放同步狀態,方法如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
在doReleaseShared中,如果頭節點的狀態為SIGNAL,則通過CAS將頭節點的狀態設置為0,並且喚醒后續阻塞的線程;接着再通過CAS設置節點的狀態為Node.PROPAGATE。
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
關於doReleaseShared的幾點分析:
- 調用該方法的線程可能有很多:在共享鎖中,持有共享鎖的線程可以有多個,這些線程都可以調用releaseShared方法釋放鎖;而這些線程想要獲得共享鎖,則它們必然曾經成為過頭節點,或者就是現在的頭節點。因此,如果是在releaseShared方法中調用的doReleaseShared,可能此時調用方法的線程已經不是頭節點所代表的線程了,頭節點可能已經被易主好幾次了。
- 調用該方法的目的:無論是在acquireShared中調用,還是在releaseShared方法中調用,該方法的目的都是在當前共享鎖是可獲取的狀態時,喚醒head節點的下一個節點。這一點看上去和獨占鎖似乎一樣,但是它們的一個重要的差別是——在共享鎖中,當頭節點發生變化時,是會回到循環中再立即喚醒head節點的下一個節點的。也就是說,在當前節點完成喚醒后繼節點的任務之后將要退出時,如果發現被喚醒后繼節點已經成為了新的頭節點,則會立即觸發喚醒head節點的下一個節點的操作,如此周而復始。
- 只有在當前head沒有易主時,才會退出,否則繼續循環。因為當前可能有多個線程在隊列中,比如A -> B -> C -> D, 如果A喚醒B,則B成為新的頭節點,接着B會調用doReleaseShared去喚醒C,此時A線程中的head變成了C,因此也加入到了喚醒D的隊伍中,此時可能出現A、B、C同時喚醒D的情況,提高了系統效率。當隊列中的所有線程都喚醒之后,此時程序退出。
參考:
Condition實現
在之前的例子中,使用Condition和Lock實現了消息的等待和通知,這節介紹Condiition在AQS中的實現。
JDK的Object對象提供了wait/notify的機制,也能實現消息的等待與通知,Condition與之的差別主要體現在以下幾點:
- 調用wait方法的線程首先必須是已經進入了同步代碼塊,即已經獲取了監視器鎖;與之類似,調用await方法的線程首先必須獲得lock鎖。
- 調用wait方法的線程會釋放已經獲得的監視器鎖,進入當前監視器鎖的等待隊列(wait set)中;與之類似,調用await方法的線程會釋放已經獲得的lock鎖,進入到當前Condtion對應的條件隊列中。
- 調用監視器鎖的notify方法會喚醒等待在該監視器鎖上的線程,這些線程將開始參與鎖競爭,並在獲得鎖后,從wait方法處恢復執行;與之類似,調用Condtion的signal方法會喚醒對應的條件隊列中的線程,這些線程將開始參與鎖競爭,並在獲得鎖后,從await方法處開始恢復執行。
在AQS的Condition實現中,和獨占鎖的爭奪類似的是,每創建一個Condtion對象就會對應一個Condtion隊列,每一個調用了Condtion對象的await方法的線程都會被包裝成Node扔進一個條件隊列中,就像這樣:

同樣的,在Condition中也會用到之前介紹的同步隊列,當等待隊列中的節點獲得信號通知時,會將等待隊列的節點移到同步隊列。
以下是await時節點的變化,

以下是signal信號發出時節點的變化,

Condition的整個await/signal流程如下:
1、Condition提供了await()方法將當前線程阻塞,並提供signal()方法支持另外一個線程將已經阻塞的線程喚醒。
2、Condition需要結合Lock使用
3、線程調用await()方法前必須獲取鎖,調用await()方法時,將線程構造成節點加入等待隊列,同時釋放鎖,並掛起當前線程
4、其他線程調用signal()方法前也必須獲取鎖,當執行signal()方法時將等待隊列的節點移入到同步隊列,當線程退出臨界區釋放鎖的時候,喚醒同步隊列的首個節點

下面結合源代碼進行分析:
await實現
調用await阻塞當前線程
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//將當前線程封裝成Node加入到等待隊列尾部
Node node = addConditionWaiter();
//釋放鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
//判斷當前節點是否已經在同步隊列中,如果是則退出循環,如果不是就阻塞當前線程
//其他線程如果發出了signal信號之后,會把等待隊列的線程移入同步隊列,此時就會退出循環,進入下面的重新獲取鎖的acquireQueued
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//其他發出signal信號的線程釋放鎖之后,該線程被喚醒並重新競爭鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//線程加入等待隊列尾部
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {//清除cancell態的節點
unlinkCancelledWaiters();
t = lastWaiter;//t指向最后一個狀態正確的節點
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)//列表為空,初始化為第一個節點
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
signal/signalAll實現
將等待隊列的節點移入同步隊列(signalAll只是循環執行signal而已)
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;//得到firstWaiter
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//將節點從等待隊列移入同步隊列
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;//cas節點狀態錯誤,說明已經cancell了,直接返回false
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);//加入同步隊列
int ws = p.waitStatus;
//設置前置節點狀態為signal,可重入鎖那篇文章分析過,為了喚醒線程而設置
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);//特殊情況下喚醒線程並重新同步,一般情況下這里不會執行
return true;
}
參考:
ReentrantReadWriteLock實現
ReentrantReadWriteLock在內部也是利用了AQS進行鎖的競爭與釋放,同時也實現了ReadWriteLock接口。
為了同時保存讀鎖和寫鎖的狀態,在內部用一個int保存讀和寫的狀態。讀狀態從高16位讀出,寫狀態從低16位讀出,在保證讀寫鎖互斥的前提下,直接利用了AQS現有的數據結構。
static final int SHARED_SHIFT = 16;
//實際是65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//最大值 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 同樣是65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 獲取讀的狀態 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 獲取寫鎖的獲取狀態 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

寫鎖為獨占式的,因此讀鎖的獲取和釋放和AQS原生的實現一致。
讀鎖是共享式的,獲取讀鎖的狀態,並且加1.
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false; //寫鎖被其他線程獲取了,直接返回false
int r = sharedCount(c); //獲取讀鎖的狀態
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { //嘗試獲取讀鎖
if (r == 0) { //說明第一個獲取到了讀鎖
firstReader = current; //標記下當前線程是第一個獲取的
firstReaderHoldCount = 1; //重入次數
} else if (firstReader == current) {
firstReaderHoldCount++; //次數+1
} else {
//cachedHoldCounter 為緩存最后一個獲取鎖的線程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get(); //緩存最后一個獲取鎖的線程
else if (rh.count == 0)// 當前線程獲取到了鎖,但是重入次數為0,那么把當前線程存入進去
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
讀鎖的釋放:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)//如果是首次獲取讀鎖,那么第一次獲取讀鎖釋放后就為空了
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) { //表示全部釋放完畢
readHolds.remove(); //釋放完畢,那么久把保存的記錄次數remove掉
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
// nextc 是 state 高 16 位減 1 后的值
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) //CAS設置狀態
return nextc == 0; //這個判斷如果高 16 位減 1 后的值==0,那么就是讀狀態和寫狀態都釋放了
}
}
鎖降級
鎖降級算是獲取讀鎖的特例,如在A線程已經獲取寫鎖的情況下,再調取讀鎖加鎖函數則可以直接獲取讀鎖,但此時其他線程仍然無法獲取讀鎖或寫鎖,在A線程釋放寫鎖后,如果有節點等待則會喚醒后續節點,后續節點可見的狀態為目前有A線程獲取了讀鎖。
AQS實戰-實現三元共享鎖
下面的例子里,利用AQS實現了三元共享鎖,也就是當前鎖只能被三個線程獲取。
public class TripleLock implements Lock {
//為3表示允許兩個線程同時獲得鎖
private final Sync sync = new Sync(3);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
public int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
public boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
final ConditionObject newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
測試程序中,主線程每隔一秒鍾打印換行,工作線程直接打印當前的線程名,從結果可以看到,每一個時刻只有三個工作線程在同時運行。
public class testTripleLock {
public void test() {
final Lock lock = new TripleLock();
class Worker extends Thread {
public void run() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName());
SleepTools.second(2);
} finally {
lock.unlock();
}
SleepTools.second(2);
}
}
// 啟動10個子線程
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.setDaemon(true);
w.start();
}
// 主線程每隔1秒換行
for (int i = 0; i < 10; i++) {
SleepTools.second(1);
System.out.println();
}
}
public static void main(String[] args) {
testTripleLock testMyLock = new testTripleLock();
testMyLock.test();
}
}
本文由『后端精進之路』原創,首發於博客 http://teckee.github.io/ , 轉載請注明出處
搜索『后端精進之路』關注公眾號,立刻獲取最新文章和價值2000元的BATJ精品面試課程。

