前言
隊列同步器 AbstractQueuedSynchronizer(以下簡稱 AQS),是用來構建鎖或者其他同步組件的基礎框架。它使用一個 int 成員變量來表示同步狀態,通過 CAS 操作對同步狀態進行修改,確保狀態的改變是安全的。通過內置的 FIFO (First In First Out)隊列來完成資源獲取線程的排隊工作。更多關於 Java 多線程的文章可以轉到 這里
AQS 和 synchronized
在介紹 AQS 的使用之前,需要首先說明一點,AQS 同步和 synchronized 關鍵字同步(以下簡稱 synchronized 同步)是采用的兩種不同的機制。首先看下 synchronized 同步,synchronized 關鍵字經過編譯之后,會在同步塊的前后分別形成 monitorenter 和 monitorexit 這兩個字節碼指令,這兩個字節碼需要關聯到一個監視對象,當線程執行 monitorenter 指令時,需要首先獲得獲得監視對象的鎖,這里監視對象鎖就是進入同步塊的憑證,只有獲得了憑證才可以進入同步塊,當線程離開同步塊時,會執行 monitorexit 指令,釋放對象鎖。
在 AQS 同步中,使用一個 int 類型的變量 state 來表示當前同步塊的狀態。以獨占式同步(一次只能有一個線程進入同步塊)為例,state 的有效值有兩個 0 和 1,其中 0 表示當前同步塊中沒有線程,1 表示同步塊中已經有線程在執行。當線程要進入同步塊時,需要首先判斷 state 的值是否為 0,假設為 0,會嘗試將 state 修改為 1,只有修改成功了之后,線程才可以進入同步塊。注意上面提到的兩個條件:
- state 為 0,證明當前同步塊中沒有線程在執行,所以當前線程可以嘗試獲得進入同步塊的憑證,而這里的憑證就是是否成功將 state 修改為 1(在 synchronized 同步中,我們說的憑證是對象鎖,但是對象鎖的最終實現是否和這種方式類似,沒有找到相關的資料)
- 成功將 state 修改為 1,通過使用 CAS 操作,我們可以確保即便有多個線程同時修改 state,也只有一個線程會修改成功。關於 CAS 的具體解釋會在后面提到。
當線程離開同步塊時,會修改 state 的值,將其設為 0,並喚醒等待的線程。所以在 AQS 同步中,我們說線程獲得了鎖,實際上是指線程成功修改了狀態變量 state,而線程釋放了鎖,是指線程將狀態變量置為了可修改的狀態(在獨占式同步中就是置為了 0),讓其他線程可以再次嘗試修改狀態變量。在下面的表述中,我們說線程獲得和釋放了鎖,就是上述含義, 這與 synchronized 同步中說的獲得和釋放鎖的含義不同,需要區別理解。
基本使用
本節摘自 Java 並發編程的藝術
AQS 的設計是基於模板方法的,使用者需要繼承 AQS 並重寫指定的方法。在后續的流程中,AQS 提供的模板方法會調用重寫的方法。一般來說,我們需要重寫的方法主要有下面 5 個:
方法名稱 | 描述 |
---|---|
protected boolean tryAcquire(int) | 獨占式獲取鎖,實現該方法需要查詢當前狀態並判斷同步狀態是否和預期值相同,然后使用 CAS 操作設置同步狀態 |
protected boolean tryRelease(int) | 獨占式釋放鎖,實際也是修改同步變量 |
protected int tryAcquireShared(int) | 共享式獲取鎖,返回大於等於 0 的值,表示獲取鎖成功,反之獲取失敗 |
protected boolean tryReleaseShared(int) | 共享式釋放鎖 |
protected boolean isHeldExclusively() | 判斷調用該方法的線程是否持有互斥鎖 |
在自定義的同步組件中,我們一般會調用 AQS 提供的模板方法。AQS 提供的模板方法基本上分為 3 類: 獨占式獲取與釋放鎖、共享式獲取與釋放鎖以及查詢同步隊列中的等待線程情況。下面是相關的模板方法:
方法名稱 | 描述 |
---|---|
void acquire(int) | 獨占式獲取鎖,如果當前線程成功獲取鎖,那么方法就返回,否則會將當前線程放入同步隊列等待。該方法會調用重寫的 tryAcquire(int arg) 方法判斷是否可以獲得鎖 |
void acquireInterruptibly(int) | 和 acquire(int) 相同,但是該方法響應中斷,當線程在同步隊列中等待時,如果線程被中斷,會拋出 InterruptedException 異常並返回。 |
boolean tryAcquireNanos(int, long) | 在 acquireInterruptibly(int) 基礎上添加了超時控制,同時支持中斷和超時,當在指定時間內沒有獲得鎖時,會返回 false,獲取到了返回 true |
void acquireShared(int) | 共享式獲得鎖,如果成功獲得鎖就返回,否則將當前線程放入同步隊列等待,與獨占式獲取鎖的不同是,同一時刻可以有多個線程獲得共享鎖,該方法調用 tryAcquireShared(int) |
acquireSharedInterruptibly(int) | 與 acquireShared(int) 相同,該方法響應中斷 |
tryAcquireSharedNanos(int, long) | 在 acquireSharedInterruptibly(int) 基礎上添加了超時控制 |
boolean release(int) | 獨占式釋放鎖,該方法會在釋放鎖后,將同步隊列中第一個等待節點喚醒 |
boolean releaseShared(int) | 共享式釋放鎖 |
Collection
|
獲得同步隊列中等待的線程集合 |
自定義組件通過使用同步器提供的模板方法來實現自己的同步語義。下面我們通過兩個示例,看下如何借助於 AQS 來實現鎖的同步語義。我們首先實現一個獨占鎖(排它鎖),獨占鎖就是說在某個時刻內,只能有一個線程持有獨占鎖,只有持有鎖的線程釋放了獨占鎖,其他線程才可以獲取獨占鎖。下面是具體實現:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* Created by Jikai Zhang on 2017/4/6.
* <p>
* 自定義獨占鎖
*/
public class Mutex implements Lock {
// 通過繼承 AQS,自定義同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 當前線程是否被獨占
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 嘗試獲得鎖
@Override
protected boolean tryAcquire(int arg) {
// 只有當 state 的值為 0,並且線程成功將 state 值修改為 1 之后,線程才可以獲得獨占鎖
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
// state 為 0 說明當前同步塊中沒有鎖了,無需釋放
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
// 將獨占的線程設為 null
setExclusiveOwnerThread(null);
// 將狀態變量的值設為 0,以便其他線程可以成功修改狀態變量從而獲得鎖
setState(0);
return true;
}
Condition newCondition() {
return new ConditionObject();
}
}
// 將操作代理到 Sync 上
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public static void withoutMutex() throws InterruptedException {
System.out.println("Without mutex: ");
int threadCount = 2;
final Thread threads[] = new Thread[threadCount];
for (int i = 0; i < threads.length; i++) {
final int index = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 100000; j++) {
if (j % 20000 == 0) {
System.out.println("Thread-" + index + ": j =" + j);
}
}
}
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
threads[i].join();
}
}
public static void withMutex() {
System.out.println("With mutex: ");
final Mutex mutex = new Mutex();
int threadCount = 2;
final Thread threads[] = new Thread[threadCount];
for (int i = 0; i < threads.length; i++) {
final int index = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
mutex.lock();
try {
for (int j = 0; j < 100000; j++) {
if (j % 20000 == 0) {
System.out.println("Thread-" + index + ": j =" + j);
}
}
} finally {
mutex.unlock();
}
}
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
}
public static void main(String[] args) throws InterruptedException {
withoutMutex();
System.out.println();
withMutex();
}
}
程序的運行結果如下面所示。我們看到使用了 Mutex 之后,線程 0 和線程 1 不會再交替執行,而是當一個線程執行完,另外一個線程再執行。
Without mutex:
Thread-0: j =0
Thread-1: j =0
Thread-0: j =20000
Thread-1: j =20000
Thread-0: j =40000
Thread-1: j =40000
Thread-0: j =60000
Thread-1: j =60000
Thread-1: j =80000
Thread-0: j =80000
With mutex:
Thread-0: j =0
Thread-0: j =20000
Thread-0: j =40000
Thread-0: j =60000
Thread-0: j =80000
Thread-1: j =0
Thread-1: j =20000
Thread-1: j =40000
Thread-1: j =60000
Thread-1: j =80000
下面在看一個共享鎖的示例。在該示例中,我們定義兩個共享資源,即同一時間內允許兩個線程同時執行。我們將同步變量的初始狀態 state 設為 2,當一個線程獲取了共享鎖之后,將 state 減 1,線程釋放了共享鎖后,將 state 加 1。狀態的合法范圍是 0、1 和 2,其中 0 表示已經資源已經用光了,此時線程再要獲得共享鎖就需要進入同步序列等待。下面是具體實現:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* Created by Jikai Zhang on 2017/4/9.
* <p>
* 自定義共享鎖
*/
public class TwinsLock implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
public Sync(int resourceCount) {
if (resourceCount <= 0) {
throw new IllegalArgumentException("resourceCount must be larger than zero.");
}
// 設置可以共享的資源總數
setState(resourceCount);
}
@Override
protected int tryAcquireShared(int reduceCount) {
// 使用嘗試獲得資源,如果成功修改了狀態變量(獲得了資源)
// 或者資源的總量小於 0(沒有資源了),則返回。
for (; ; ) {
int lastCount = getState();
int newCount = lastCount - reduceCount;
if (newCount < 0 || compareAndSetState(lastCount, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int returnCount) {
// 釋放共享資源,因為可能有多個線程同時執行,所以需要使用 CAS 操作來修改資源總數。
for (; ; ) {
int lastCount = getState();
int newCount = lastCount + returnCount;
if (compareAndSetState(lastCount, newCount)) {
return true;
}
}
}
}
// 定義兩個共享資源,說明同一時間內可以有兩個線程同時運行
private final Sync sync = new Sync(2);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public static void main(String[] args) {
final Lock lock = new TwinsLock();
int threadCounts = 10;
Thread threads[] = new Thread[threadCounts];
for (int i = 0; i < threadCounts; i++) {
final int index = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
lock.lock();
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
for (int i = 0; i < threadCounts; i++) {
threads[i].start();
}
}
}
運行程序,我們會發現程序每次都會同時打印兩條語句,如下面的形式,證明同時有兩個線程在執行。
Thread-0
Thread-1
Thread-3
Thread-2
Thread-8
Thread-4
Thread-3
Thread-6
CAS 操作
CAS(Compare and Swap),比較並交換,通過利用底層硬件平台的特性,實現原子性操作。CAS 操作涉及到3個操作數,內存值 V,舊的期望值 A,需要修改的新值 B。當且僅當預期值 A 和 內存值 V 相同時,才將內存值 V 修改為 B,否則什么都不做。CAS 操作類似於執行了下面流程
if(oldValue == memory[valueAddress]) {
memory[valueAddress] = newValue;
}
在上面的流程中,其實涉及到了兩個操作,比較以及替換,為了確保程序正確,需要確保這兩個操作的原子性(也就是說確保這兩個操作同時進行,中間不會有其他線程干擾)。現在的 CPU 中,提供了相關的底層 CAS 指令,即 CPU 底層指令確保了比較和交換兩個操作作為一個原子操作進行(其實在這一點上還是有排他鎖的. 只是比起用synchronized, 這里的排他時間要短的多.),Java 中的 CAS 函數是借助於底層的 CAS 指令來實現的。更多關於 CPU 底層實現的原理可以參考 這篇文章。我們來看下 Java 中對於 CAS 函數的定義:
/**
* Atomically update Java variable to x if it is currently
* holding expected.
* @return true if successful
*/
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
/**
* Atomically update Java variable to x if it is currently
* holding expected.
* @return true if successful
*/
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
/**
* Atomically update Java variable to x if it is currently
* holding expected.
* @return true if successful
*/
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);
上面三個函數定義在 sun.misc.Unsafe 類中,使用該類可以進行一些底層的操作,例如直接操作原生內存,更多關於 Unsafe 類的文章可以參考 這篇。以 compareAndSwapInt 為例,我們看下如何使用 CAS 函數:
import sun.misc.Unsafe;
import java.lang.reflect.Field;
/**
* Created by Jikai Zhang on 2017/4/8.
*/
public class CASIntTest {
private volatile int count = 0;
private static final Unsafe unsafe = getUnsafe();
private static final long offset;
// 獲得 count 屬性在 CASIntTest 中的偏移量(內存地址偏移)
static {
try {
offset = unsafe.objectFieldOffset(CASIntTest.class.getDeclaredField("count"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
// 通過反射的方式獲得 Unsafe 類
public static Unsafe getUnsafe() {
Unsafe unsafe = null;
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
return unsafe;
}
public void increment() {
int previous = count;
unsafe.compareAndSwapInt(this, offset, previous, previous + 1);
}
public static void main(String[] args) {
CASIntTest casIntTest = new CASIntTest();
casIntTest.increment();
System.out.println(casIntTest.count);
}
}
在 CASIntTest 類中,我們定義一個 count 變量,其中 increment 方法是將 count 的值加 1。下面是 increase 方法的代碼:
int previous = count;
unsafe.compareAndSwapInt(this, offset, previous, previous + 1);
在沒有線程競爭的條件下,該代碼執行的結果是將 count 變量的值加 1(多個線程競爭可能會有線程執行失敗),但是在 compareAndSwapInt 函數中,我們並沒有傳入 count 變量,那么函數是如何修改的 count 變量值?其實我們往 compareAndSwapInt 函數中傳入了 count 變量在堆內存中的地址,函數直接修改了 count 變量所在內存區域。count 屬性在堆內存中的地址是由 CASIntTest 實例的起始內存地址和 count 屬性相對於起始內存的偏移量決定的。其中對象屬性在對象中的偏移量通過 objectFieldOffset
函數獲得,函數原型如下所示。該函數接受一個 Filed 類型的參數,返回該 Filed 屬性在對象中的偏移量。
/**
* Report the location of a given static field, in conjunction with {@link
* #staticFieldBase}.
* Do not expect to perform any sort of arithmetic on this offset;
* it is just a cookie which is passed to the unsafe heap memory accessors.
*
* Any given field will always have the same offset, and no two distinct
* fields of the same class will ever have the same offset.
*
* As of 1.4.1, offsets for fields are represented as long values,
* although the Sun JVM does not use the most significant 32 bits.
* It is hard to imagine a JVM technology which needs more than
* a few bits to encode an offset within a non-array object,
* However, for consistency with other methods in this class,
* this method reports its result as a long value.
*/
public native long objectFieldOffset(Field f);
下面我們再看一下 compareAndSwapInt 的函數原型。我們知道 CAS 操作需要知道 3 個信息:內存中的值,期望的舊值以及要修改的新值。通過前面的分析,我們知道通過 o 和 offset 我們可以確定屬性在內存中的地址,也就是知道了屬性在內存中的值。expected 對應期望的舊址,而 x 就是要修改的新值。
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
compareAndSwapInt 函數首先比較一下 expected 是否和內存中的值相同,如果不同證明其他線程修改了屬性值,那么就不會執行更新操作,但是程序如果就此返回了,似乎不太符合我們的期望,我們是希望程序可以執行更新操作的,如果其他線程先進行了更新,那么就在更新后的值的基礎上進行修改,所以我們一般使用循環配合 CAS 函數,使程序在更新操作完成之后再返回,如下所示:
long before = counter;
while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) {
before = counter;
}
下面是使用 CAS 函數實現計數器的一個實例:
import sun.misc.Unsafe;
import java.lang.reflect.Field;
/**
* Created by Jikai Zhang on 2017/4/8.
*/
public class CASCounter {
// 通過反射的方式獲得 Unsafe 類
public static Unsafe getUnsafe() {
Unsafe unsafe = null;
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
return unsafe;
}
private volatile long counter = 0;
private static final long offset;
private static final Unsafe unsafe = getUnsafe();
static {
try {
offset = unsafe.objectFieldOffset(CASCounter.class.getDeclaredField("counter"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
public void increment() {
long before = counter;
while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) {
before = counter;
}
}
public long getCounter() {
return counter;
}
private static long intCounter = 0;
public static void main(String[] args) throws InterruptedException {
int threadCount = 10;
Thread threads[] = new Thread[threadCount];
final CASCounter casCounter = new CASCounter();
for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
casCounter.increment();
intCounter++;
}
}
});
threads[i].start();
}
for(int i = 0; i < threadCount; i++) {
threads[i].join();
}
System.out.printf("CASCounter is %d \nintCounter is %d\n", casCounter.getCounter(), intCounter);
}
}
在 AQS 中,對原始的 CAS 函數封裝了一下,省去了獲得變量地址的步驟,如下面的形式:
private static final long waitStatusOffset;
static {
try {
waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
} catch (Exception ex) {
throw new Error(ex);
}
}
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
同步隊列
AQS 依賴內部的同步隊列(一個 FIFO的雙向隊列)來完成同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構造成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把隊列中第一個等待節點線程喚醒(下圖中的 Node1),使其再次嘗試獲取同步狀態。同步隊列的結構如下所示:
圖片來自 http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer
Head 節點本身不保存等待線程的信息,它通過 next 變量指向第一個保存線程等待信息的節點(Node1)。當線程被喚醒之后,會刪除 Head 節點,而喚醒線程所在的節點會設置為 Head 節點(Node1 被喚醒之后,Node1會被置為 Head 節點)。下面我們看下 JDK 中同步隊列的實現。
Node 類
首先看在節點所對應的 Node 類:
static final class Node {
/**
* 標志是獨占式模式還是共享模式
*/
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
/**
* 線程等待狀態的有效值
*/
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* 線程狀態,合法值為上面 4 個值中的一個
*/
volatile int waitStatus;
/**
* 當前節點的前置節點
*/
volatile Node prev;
/**
* 當前節點的后置節點
*/
volatile Node next;
/**
* 當前節點所關聯的線程
*/
volatile Thread thread;
/**
* 指向下一個在某個條件上等待的節點,或者指向 SHARE 節點,表明當前處於共享模式
*/
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
在 Node 類中定義了四種等待狀態:
- CANCELED: 1,因為等待超時 (timeout)或者中斷(interrupt),節點會被置為取消狀態。處於取消狀態的節點不會再去競爭鎖,也就是說不會再被阻塞。節點會一直保持取消狀態,而不會轉換為其他狀態。處於 CANCELED 的節點會被移出隊列,被 GC 回收。
- SIGNAL: -1,表明當前的后繼結點正在或者將要被阻塞(通過使用 LockSupport.pack 方法),因此當前的節點被釋放(release)或者被取消時(cancel)時,要喚醒它的后繼結點(通過 LockSupport.unpark 方法)。
- CONDITION: -2,表明當前節點在條件隊列中,因為等待某個條件而被阻塞。
- PROPAGATE: -3,在共享模式下,可以認為資源有多個,因此當前線程被喚醒之后,可能還有剩余的資源可以喚醒其他線程。該狀態用來表明后續節點會傳播喚醒的操作。需要注意的是只有頭節點才可以設置為該狀態(This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.)。
- 0:新創建的節點會處於這種狀態
獨占鎖的獲取和釋放
我們首先看下獨占鎖的獲取和釋放過程
獨占鎖獲取
下面是獲取獨占鎖的流程圖:
我們通過 acquire 方法來獲取獨占鎖,下面是方法定義
public final void acquire(int arg) {
// 首先嘗試獲取鎖,如果獲取失敗,會先調用 addWaiter 方法創建節點並追加到隊列尾部
// 然后調用 acquireQueued 阻塞或者循環嘗試獲取鎖
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
// 在 acquireQueued 中,如果線程是因為中斷而退出的阻塞狀態會返回 true
// 這里的 selfInterrupt 主要是為了恢復線程的中斷狀態
selfInterrupt();
}
}
acquire 會首先調用 tryAcquire 方法來獲得鎖,該方法需要我們來實現,這個在前面已經提過了。如果沒有獲取鎖,會調用 addWaiter 方法創建一個和當前線程關聯的節點追加到同步隊列的尾部,我們調用 addWaiter 時傳入的是 Node.EXCLUSIVE,表明當前是獨占模式。下面是 addWaiter 的具體實現
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// tail 指向同步隊列的尾節點
Node pred = tail;
// Try the fast path of enq; backup to full enq on failure
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter 方法會首先調用 if 方法,來判斷能否成功將節點添加到隊列尾部,如果添加失敗,再調用 enq 方法(使用循環不斷重試)進行添加,下面是 enq 方法的實現:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 同步隊列采用的懶初始化(lazily initialized)的方式,
// 初始時 head 和 tail 都會被設置為 null,當一次被訪問時
// 才會創建 head 對象,並把尾指針指向 head。
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter 僅僅是將節點加到了同步隊列的末尾,並沒有阻塞線程,線程阻塞的操作是在 acquireQueued 方法中完成的,下面是 acquireQueued 的實現:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果當前節點的前繼節點是 head,就使用自旋(循環)的方式不斷請求鎖
if (p == head && tryAcquire(arg)) {
// 成功獲得鎖,將當前節點置為 head 節點,同時刪除原 head 節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire 檢查是否可以掛起線程,
// 如果可以掛起進程,會調用 parkAndCheckInterrupt 掛起線程,
// 如果 parkAndCheckInterrupt 返回 true,表明當前線程是因為中斷而退出掛起狀態的,
// 所以要將 interrupted 設為 true,表明當前線程被中斷過
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued 會首先檢查當前節點的前繼節點是否為 head,如果為 head,將使用自旋的方式不斷的請求鎖,如果不是 head,則調用 shouldParkAfterFailedAcquire 查看是否應該掛起當前節點關聯的線程,下面是 shouldParkAfterFailedAcquire 的實現:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 當前節點的前繼節點的等待狀態
int ws = pred.waitStatus;
// 如果前繼節點的等待狀態為 SIGNAL 我們就可以將當前節點對應的線程掛起
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// ws 大於 0,表明當前線程的前繼節點處於 CANCELED 的狀態,
// 所以我們需要從當前節點開始往前查找,直到找到第一個不為
// CAECELED 狀態的節點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire 會檢查前繼節點的等待狀態,如果前繼節點狀態為 SIGNAL,則可以將當前節點關聯的線程掛起,如果不是 SIGNAL,會做一些其他的操作,在當前循環中不會掛起線程。如果確定了可以掛起線程,就調用 parkAndCheckInterrupt 方法對線程進行阻塞:
private final boolean parkAndCheckInterrupt() {
// 掛起當前線程
LockSupport.park(this);
// 可以通過調用 interrupt 方法使線程退出 park 狀態,
// 為了使線程在后面的循環中還可以響應中斷,會重置線程的中斷狀態。
// 這里使用 interrupted 會先返回線程當前的中斷狀態,然后將中斷狀態重置為 false,
// 線程的中斷狀態會返回給上層調用函數,在線程獲得鎖后,
// 如果發現線程曾被中斷過,會將中斷狀態重新設為 true
return Thread.interrupted();
}
獨占鎖釋放
下面是釋放獨占鎖的流程:
通過 release 方法,我們可以釋放互斥鎖。下面是 release 方法的實現:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// waitStatus 為 0,證明是初始化的空隊列或者后繼結點已經被喚醒了
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在獨占模式下釋放鎖時,是沒有其他線程競爭的,所以處理會簡單一些。首先嘗試釋放鎖,如果失敗就直接返回(失敗不是因為多線程競爭,而是線程本身就不擁有鎖)。如果成功的話,會檢查 h 的狀態,然后調用 unparkSuccessor 方法來喚醒后續線程。下面是 unparkSuccessor 的實現:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 將 head 節點的狀態置為 0,表明當前節點的后續節點已經被喚醒了,
// 不需要再次喚醒,修改 ws 狀態主要作用於 release 的判斷
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
在 unparkSuccessor 方法中,如果發現頭節點的后繼結點為 null 或者處於 CANCELED 狀態,會從尾部往前找(在節點存在的前提下,這樣一定能找到)離頭節點最近的需要喚醒的節點,然后喚醒該節點。
共享鎖獲取和釋放
獨占鎖的流程和原理比較容易理解,因為只有一個鎖,但是共享鎖的處理就相對復雜一些了。在獨占鎖中,只有在釋放鎖之后,才能喚醒等待的線程,而在共享模式中,獲取鎖和釋放鎖之后,都有可能喚醒等待的線程。如果想要理清共享鎖的工作過程,必須將共享鎖的獲取和釋放結合起來看。這里我們先看一下共享鎖的釋放過程,只有明白了釋放過程做了哪些工作,才能更好的理解獲取鎖的過程。
共享鎖釋放
下面是釋放共享鎖的流程:
通過 releaseShared 方法會釋放共享鎖,下面是具體的實現:
public final boolean releaseShared(int releases) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releases 是要釋放的共享資源數量,其中 tryReleaseShared 的方法由我們自己重寫,該方法的主要功能就是修改共享資源的數量(state + releases),因為可能會有多個線程同時釋放資源,所以實現的時候,一般采用循環加 CAS 操作的方式,如下面的形式:
protected boolean tryReleaseShared(int releases) {
// 釋放共享資源,因為可能有多個線程同時執行,所以需要使用 CAS 操作來修改資源總數。
for (;;) {
int lastCount = getState();
int newCount = lastCount + releases;
if (compareAndSetState(lastCount, newCount)) {
return true;
}
}
}
當共享資源數量修改了之后,會調用 doReleaseShared 方法,該方法主要喚醒同步隊列中的第一個等待節點(head.next),下面是具體實現:
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
// head = null 說明沒有初始化,head = tail 說明同步隊列中沒有等待節點
if (h != null && h != tail) {
// 查看當前節點的等待狀態
int ws = h.waitStatus;
// 我們在前面說過,SIGNAL說明有后續節點需要喚醒
if (ws == Node.SIGNAL) {
/*
* 將當前節點的值設為 0,表明已經喚醒了后繼節點
* 可能會有多個線程同時執行到這一步,所以使用 CAS 保證只有一個線程能修改成功,
* 從而執行 unparkSuccessor,其他的線程會執行 continue 操作
*/
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
/*
* ws 等於 0,說明無需喚醒后繼結點(后續節點已經被喚醒或者當前節點沒有被阻塞的后繼結點),
* 也就是這一次的調用其實並沒有執行喚醒后繼結點的操作。就類似於我只需要一張優惠券,
* 但是我的兩個朋友,他們分別給我了一張,因此我就剩余了一張。然后我就將這張剩余的優惠券
* 送(傳播)給其他人使用,因此這里將節點置為可傳播的狀態(PROPAGATE)
*/
continue; // loop on failed CAS
}
}
if (h == head) // loop if head changed
break;
}
}
從上面的實現中,doReleaseShared 的主要作用是用來喚醒阻塞的節點並且一次只喚醒一個,讓該節點關聯的線程去重新競爭鎖,它既不修改同步隊列,也不修改共享資源。
當多個線程同時釋放資源時,可以確保兩件事:
- 共享資源的數量能正確的累加
- 至少有一個線程被喚醒,其實只要確保有一個線程被喚醒就可以了,即便喚醒了多個線程,在同一時刻,也只能有一個線程能得到競爭鎖的資格,在下面我們會看到。
所以釋放鎖做的主要工作還是修改共享資源的數量。而有了多個共享資源后,如何確保同步隊列中的多個節點可以獲取鎖,是由獲取鎖的邏輯完成的。下面看下共享鎖的獲取。
共享鎖的獲取
下面是獲取共享鎖的流程
通過 acquireShared 方法,我們可以申請共享鎖,下面是具體的實現:
public final void acquireShared(int arg) {
// 如果返回結果小於 0,證明沒有獲取到共享資源
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
如果沒有獲取到共享資源,就會執行 doAcquireShared 方法,下面是該方法的具體實現:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
從上面的代碼中可以看到,只有前置節點為 head 的節點才有可能去競爭鎖,這點和獨占模式的處理是一樣的,所以即便喚醒了多個線程,也只有一個線程能進入競爭鎖的邏輯,其余線程會再次進入 park 狀態,當線程獲取到共享鎖之后,會執行 setHeadAndPropagate 方法,下面是具體的實現:
private void setHeadAndPropagate(Node node, long propagate) {
// 備份一下頭節點
Node h = head; // Record old head for check below
/*
* 移除頭節點,並將當前節點置為頭節點
* 當執行完這一步之后,其實隊列的頭節點已經發生改變,
* 其他被喚醒的線程就有機會去獲取鎖,從而並發的執行該方法,
* 所以上面備份頭節點,以便下面的代碼可以正確運行
*/
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
/*
* 判斷是否需要喚醒后繼結點,propagate > 0 說明共享資源有剩余,
* h.waitStatus < 0,表明當前節點狀態可能為 SIGNAL,CONDITION,PROPAGATE
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 只有 s 不處於獨占模式時,才去喚醒后繼結點
if (s == null || s.isShared())
doReleaseShared();
}
}
判斷后繼結點是否需要喚醒的條件是十分寬松的,也就是一定包含必要的喚醒,但是也有可能會包含不必要的喚醒。從前面我們可以知道 doReleaseShared 函數的主要作用是喚醒后繼結點,它既不修改共享資源,也不修改同步隊列,所以即便有不必要的喚醒也是不影響程序正確性的。如果沒有共享資源,節點會再次進入等待狀態。
到了這里,脈絡就比較清晰了,當一個節點獲取到共享鎖之后,它除了將自身設為 head 節點之外,還會判斷一下是否滿足喚醒后繼結點的條件,如果滿足,就喚醒后繼結點,后繼結點獲取到鎖之后,會重復這個過程,直到判斷條件不成立。就類似於考試時從第一排往最后一排傳卷子,第一排先留下一份,然后將剩余的傳給后一排,后一排會重復這個過程。如果傳到某一排卷子沒了,那么位於這排的人就要等待,直到老師又給了他新的卷子。
中斷
在獲取鎖時還可以設置響應中斷,獨占鎖和共享鎖的處理邏輯類似,這里我們以獨占鎖為例。使用 acquireInterruptibly 方法,在獲取獨占鎖時可以響應中斷,下面是具體的實現:
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
// 這里會拋出異常
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
從上面的代碼中我們可以看出,acquireInterruptibly 和 acquire 的邏輯類似,只是在下面的代碼處有所不同:當線程因為中斷而退出阻塞狀態時,會直接拋出 InterruptedException 異常。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
// 這里會拋出異常
throw new InterruptedException();
}
我們知道,不管是拋出異常還是方法返回,程序都會執行 finally 代碼,而 failed 肯定為 true,所以拋出異常之后會執行 cancelAcquire 方法,cancelAcquire 方法主要將節點從同步隊列中移除。下面是具體的實現:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// 跳過前面的已經取消的節點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 保存下 pred 的后繼結點,以便 CAS 操作使用
// 因為可能存在已經取消的節點,所以 pred.next 不一等於 node
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 將節點狀態設為 CANCELED
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
從上面的代碼可以看出,節點的刪除分為三種情況:
- 刪除節點為尾節點,直接將該節點的第一個有效前置節點置為尾節點
- 刪除節點的前置節點為頭節點,則對該節點執行 unparkSuccessor 操作
- 刪除節點為中間節點,結果如下圖所示。下圖中(1)表示同步隊列的初始狀態,假設刪除 node2, node1 是正常節點(非 CANCELED),(2)就是刪除 node2 后同步隊列的狀態,此時 node1 節點的后繼已經變為 node3,也就是說當 node1 變為 head 之后,會直接喚醒 node3。當另外的一個節點中斷之后再次執行 cancelAcquire,在執行下面的代碼時,會使同步隊列的狀態由(2)變為(3),此時 node2 已經沒有外界指針了,可以被回收了。如果一直沒有另外一個節點中斷,也就是同步隊列一直處於(2)狀態,那么需要等 node3 被回收之后,node2 才可以被回收。
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
超時
超時是在中斷的基礎上加了一層時間的判斷,這里我們還是以獨占鎖為例。 tryAcquireNanos 支持獲取鎖的超時處理,下面是具體實現:
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
當獲取鎖失敗之后,會執行 doAcquireNanos 方法,下面是具體實現:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0 L)
return false;
// 線程最晚結束時間
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 判斷是否超時,如果超時就返回
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0 L)
return false;
// 這里如果設定了一個閾值,如果超時的時間比閾值小,就認為
// 當前線程沒必要阻塞,再執行幾次 for 循環估計就超時了
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
當線程超時返回時,還是會執行 cancelAcquire 方法,cancelAcquire 的邏輯已經在前面說過了,這里不再贅述。