【Java並發】詳解 AbstractQueuedSynchronizer


前言

隊列同步器 AbstractQueuedSynchronizer(以下簡稱 AQS),是用來構建鎖或者其他同步組件的基礎框架。它使用一個 int 成員變量來表示同步狀態,通過 CAS 操作對同步狀態進行修改,確保狀態的改變是安全的。通過內置的 FIFO (First In First Out)隊列來完成資源獲取線程的排隊工作。更多關於 Java 多線程的文章可以轉到 這里

AQS 和 synchronized

在介紹 AQS 的使用之前,需要首先說明一點,AQS 同步和 synchronized 關鍵字同步(以下簡稱 synchronized 同步)是采用的兩種不同的機制。首先看下 synchronized 同步,synchronized 關鍵字經過編譯之后,會在同步塊的前后分別形成 monitorenter 和 monitorexit 這兩個字節碼指令,這兩個字節碼需要關聯到一個監視對象,當線程執行 monitorenter 指令時,需要首先獲得獲得監視對象的鎖,這里監視對象鎖就是進入同步塊的憑證,只有獲得了憑證才可以進入同步塊,當線程離開同步塊時,會執行 monitorexit 指令,釋放對象鎖。

在 AQS 同步中,使用一個 int 類型的變量 state 來表示當前同步塊的狀態。以獨占式同步(一次只能有一個線程進入同步塊)為例,state 的有效值有兩個 0 和 1,其中 0 表示當前同步塊中沒有線程,1 表示同步塊中已經有線程在執行。當線程要進入同步塊時,需要首先判斷 state 的值是否為 0,假設為 0,會嘗試將 state 修改為 1,只有修改成功了之后,線程才可以進入同步塊。注意上面提到的兩個條件:

  • state 為 0,證明當前同步塊中沒有線程在執行,所以當前線程可以嘗試獲得進入同步塊的憑證,而這里的憑證就是是否成功將 state 修改為 1(在 synchronized 同步中,我們說的憑證是對象鎖,但是對象鎖的最終實現是否和這種方式類似,沒有找到相關的資料)
  • 成功將 state 修改為 1,通過使用 CAS 操作,我們可以確保即便有多個線程同時修改 state,也只有一個線程會修改成功。關於 CAS 的具體解釋會在后面提到。

當線程離開同步塊時,會修改 state 的值,將其設為 0,並喚醒等待的線程。所以在 AQS 同步中,我們說線程獲得了鎖,實際上是指線程成功修改了狀態變量 state,而線程釋放了鎖,是指線程將狀態變量置為了可修改的狀態(在獨占式同步中就是置為了 0),讓其他線程可以再次嘗試修改狀態變量。在下面的表述中,我們說線程獲得和釋放了鎖,就是上述含義, 這與 synchronized 同步中說的獲得和釋放鎖的含義不同,需要區別理解。

基本使用

本節摘自 Java 並發編程的藝術

AQS 的設計是基於模板方法的,使用者需要繼承 AQS 並重寫指定的方法。在后續的流程中,AQS 提供的模板方法會調用重寫的方法。一般來說,我們需要重寫的方法主要有下面 5 個:

方法名稱 描述
protected boolean tryAcquire(int) 獨占式獲取鎖,實現該方法需要查詢當前狀態並判斷同步狀態是否和預期值相同,然后使用 CAS 操作設置同步狀態
protected boolean tryRelease(int) 獨占式釋放鎖,實際也是修改同步變量
protected int tryAcquireShared(int) 共享式獲取鎖,返回大於等於 0 的值,表示獲取鎖成功,反之獲取失敗
protected boolean tryReleaseShared(int) 共享式釋放鎖
protected boolean isHeldExclusively() 判斷調用該方法的線程是否持有互斥鎖

在自定義的同步組件中,我們一般會調用 AQS 提供的模板方法。AQS 提供的模板方法基本上分為 3 類: 獨占式獲取與釋放鎖、共享式獲取與釋放鎖以及查詢同步隊列中的等待線程情況。下面是相關的模板方法:

方法名稱 描述
void acquire(int) 獨占式獲取鎖,如果當前線程成功獲取鎖,那么方法就返回,否則會將當前線程放入同步隊列等待。該方法會調用重寫的 tryAcquire(int arg) 方法判斷是否可以獲得鎖
void acquireInterruptibly(int) 和 acquire(int) 相同,但是該方法響應中斷,當線程在同步隊列中等待時,如果線程被中斷,會拋出 InterruptedException 異常並返回。
boolean tryAcquireNanos(int, long) 在 acquireInterruptibly(int) 基礎上添加了超時控制,同時支持中斷和超時,當在指定時間內沒有獲得鎖時,會返回 false,獲取到了返回 true
void acquireShared(int) 共享式獲得鎖,如果成功獲得鎖就返回,否則將當前線程放入同步隊列等待,與獨占式獲取鎖的不同是,同一時刻可以有多個線程獲得共享鎖,該方法調用 tryAcquireShared(int)
acquireSharedInterruptibly(int) 與 acquireShared(int) 相同,該方法響應中斷
tryAcquireSharedNanos(int, long) 在 acquireSharedInterruptibly(int) 基礎上添加了超時控制
boolean release(int) 獨占式釋放鎖,該方法會在釋放鎖后,將同步隊列中第一個等待節點喚醒
boolean releaseShared(int) 共享式釋放鎖
Collection getQueuedThreads() 獲得同步隊列中等待的線程集合

自定義組件通過使用同步器提供的模板方法來實現自己的同步語義。下面我們通過兩個示例,看下如何借助於 AQS 來實現鎖的同步語義。我們首先實現一個獨占鎖(排它鎖),獨占鎖就是說在某個時刻內,只能有一個線程持有獨占鎖,只有持有鎖的線程釋放了獨占鎖,其他線程才可以獲取獨占鎖。下面是具體實現:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Created by Jikai Zhang on 2017/4/6.
 * <p>
 * 自定義獨占鎖
 */
public class Mutex implements Lock {

    // 通過繼承 AQS,自定義同步器
    private static class Sync extends AbstractQueuedSynchronizer {

        // 當前線程是否被獨占
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;

        }

        // 嘗試獲得鎖
        @Override
        protected boolean tryAcquire(int arg) {
            // 只有當 state 的值為 0,並且線程成功將 state 值修改為 1 之后,線程才可以獲得獨占鎖
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            // state 為 0 說明當前同步塊中沒有鎖了,無需釋放
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            // 將獨占的線程設為 null
            setExclusiveOwnerThread(null);
            // 將狀態變量的值設為 0,以便其他線程可以成功修改狀態變量從而獲得鎖
            setState(0);
            return true;
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    // 將操作代理到 Sync 上
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public static void withoutMutex() throws InterruptedException {
        System.out.println("Without mutex: ");
        int threadCount = 2;
        final Thread threads[] = new Thread[threadCount];
        for (int i = 0; i < threads.length; i++) {
            final int index = i;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 100000; j++) {
                        if (j % 20000 == 0) {
                            System.out.println("Thread-" + index + ": j =" + j);
                        }
                    }
                }
            });
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++) {
            threads[i].join();
        }
    }

    public static void withMutex() {
        System.out.println("With mutex: ");
        final Mutex mutex = new Mutex();
        int threadCount = 2;
        final Thread threads[] = new Thread[threadCount];
        for (int i = 0; i < threads.length; i++) {
            final int index = i;
            threads[i] = new Thread(new Runnable() {

                @Override
                public void run() {

                    mutex.lock();
                    try {
                        for (int j = 0; j < 100000; j++) {
                            if (j % 20000 == 0) {
                                System.out.println("Thread-" + index + ": j =" + j);
                            }
                        }
                    } finally {
                        mutex.unlock();
                    }
                }
            });
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        withoutMutex();
        System.out.println();
        withMutex();

    }
}

程序的運行結果如下面所示。我們看到使用了 Mutex 之后,線程 0 和線程 1 不會再交替執行,而是當一個線程執行完,另外一個線程再執行。

Without mutex:
Thread-0: j =0
Thread-1: j =0
Thread-0: j =20000
Thread-1: j =20000
Thread-0: j =40000
Thread-1: j =40000
Thread-0: j =60000
Thread-1: j =60000
Thread-1: j =80000
Thread-0: j =80000

With mutex:
Thread-0: j =0
Thread-0: j =20000
Thread-0: j =40000
Thread-0: j =60000
Thread-0: j =80000
Thread-1: j =0
Thread-1: j =20000
Thread-1: j =40000
Thread-1: j =60000
Thread-1: j =80000

下面在看一個共享鎖的示例。在該示例中,我們定義兩個共享資源,即同一時間內允許兩個線程同時執行。我們將同步變量的初始狀態 state 設為 2,當一個線程獲取了共享鎖之后,將 state 減 1,線程釋放了共享鎖后,將 state 加 1。狀態的合法范圍是 0、1 和 2,其中 0 表示已經資源已經用光了,此時線程再要獲得共享鎖就需要進入同步序列等待。下面是具體實現:

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Created by Jikai Zhang on 2017/4/9.
 * <p>
 * 自定義共享鎖
 */
public class TwinsLock implements Lock {

    private static class Sync extends AbstractQueuedSynchronizer {

        public Sync(int resourceCount) {
            if (resourceCount <= 0) {
                throw new IllegalArgumentException("resourceCount must be larger than zero.");
            }
            // 設置可以共享的資源總數
            setState(resourceCount);
        }


        @Override
        protected int tryAcquireShared(int reduceCount) {
            // 使用嘗試獲得資源,如果成功修改了狀態變量(獲得了資源)
            // 或者資源的總量小於 0(沒有資源了),則返回。
            for (; ; ) {
                int lastCount = getState();
                int newCount = lastCount - reduceCount;
                if (newCount < 0 || compareAndSetState(lastCount, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int returnCount) {
            // 釋放共享資源,因為可能有多個線程同時執行,所以需要使用 CAS 操作來修改資源總數。
            for (; ; ) {
                int lastCount = getState();
                int newCount = lastCount + returnCount;
                if (compareAndSetState(lastCount, newCount)) {
                    return true;
                }
            }
        }
    }

    // 定義兩個共享資源,說明同一時間內可以有兩個線程同時運行
    private final Sync sync = new Sync(2);

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }

    public static void main(String[] args) {
        final Lock lock = new TwinsLock();
        int threadCounts = 10;
        Thread threads[] = new Thread[threadCounts];
        for (int i = 0; i < threadCounts; i++) {
            final int index = i;
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 5; i++) {
                        lock.lock();
                        try {
                            TimeUnit.SECONDS.sleep(1);
                            System.out.println(Thread.currentThread().getName());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                        }

                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }

        for (int i = 0; i < threadCounts; i++) {
            threads[i].start();
        }
    }
}

運行程序,我們會發現程序每次都會同時打印兩條語句,如下面的形式,證明同時有兩個線程在執行。

Thread-0
Thread-1
Thread-3
Thread-2
Thread-8
Thread-4
Thread-3
Thread-6

CAS 操作

CAS(Compare and Swap),比較並交換,通過利用底層硬件平台的特性,實現原子性操作。CAS 操作涉及到3個操作數,內存值 V,舊的期望值 A,需要修改的新值 B。當且僅當預期值 A 和 內存值 V 相同時,才將內存值 V 修改為 B,否則什么都不做。CAS 操作類似於執行了下面流程

if(oldValue == memory[valueAddress]) {
    memory[valueAddress] = newValue;
}

在上面的流程中,其實涉及到了兩個操作,比較以及替換,為了確保程序正確,需要確保這兩個操作的原子性(也就是說確保這兩個操作同時進行,中間不會有其他線程干擾)。現在的 CPU 中,提供了相關的底層 CAS 指令,即 CPU 底層指令確保了比較和交換兩個操作作為一個原子操作進行(其實在這一點上還是有排他鎖的. 只是比起用synchronized, 這里的排他時間要短的多.),Java 中的 CAS 函數是借助於底層的 CAS 指令來實現的。更多關於 CPU 底層實現的原理可以參考 這篇文章。我們來看下 Java 中對於 CAS 函數的定義:

/**
 * Atomically update Java variable to x if it is currently
 * holding expected.
 * @return true if successful
 */
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);

/**
 * Atomically update Java variable to x if it is currently
 * holding expected.
 * @return true if successful
 */
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);

/**
 * Atomically update Java variable to x if it is currently
 * holding expected.
 * @return true if successful
 */
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);

上面三個函數定義在 sun.misc.Unsafe 類中,使用該類可以進行一些底層的操作,例如直接操作原生內存,更多關於 Unsafe 類的文章可以參考 這篇。以 compareAndSwapInt 為例,我們看下如何使用 CAS 函數:

import sun.misc.Unsafe;

import java.lang.reflect.Field;

/**
 * Created by Jikai Zhang on 2017/4/8.
 */
public class CASIntTest {
    private volatile int count = 0;

    private static final Unsafe unsafe = getUnsafe();
    private static final long offset;

    // 獲得 count 屬性在 CASIntTest 中的偏移量(內存地址偏移)
    static {
        try {
            offset = unsafe.objectFieldOffset(CASIntTest.class.getDeclaredField("count"));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }
    // 通過反射的方式獲得 Unsafe 類
    public static Unsafe getUnsafe() {
        Unsafe unsafe = null;
        try {
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            unsafe = (Unsafe) theUnsafe.get(null);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }
        return unsafe;
    }

    public void increment() {
        int previous = count;
        unsafe.compareAndSwapInt(this, offset, previous, previous + 1);
    }

    public static void main(String[] args) {
        CASIntTest casIntTest = new CASIntTest();
        casIntTest.increment();
        System.out.println(casIntTest.count);
    }
}

在 CASIntTest 類中,我們定義一個 count 變量,其中 increment 方法是將 count 的值加 1。下面是 increase 方法的代碼:

int previous = count;
unsafe.compareAndSwapInt(this, offset, previous, previous + 1);

在沒有線程競爭的條件下,該代碼執行的結果是將 count 變量的值加 1(多個線程競爭可能會有線程執行失敗),但是在 compareAndSwapInt 函數中,我們並沒有傳入 count 變量,那么函數是如何修改的 count 變量值?其實我們往 compareAndSwapInt 函數中傳入了 count 變量在堆內存中的地址,函數直接修改了 count 變量所在內存區域。count 屬性在堆內存中的地址是由 CASIntTest 實例的起始內存地址和 count 屬性相對於起始內存的偏移量決定的。其中對象屬性在對象中的偏移量通過 objectFieldOffset 函數獲得,函數原型如下所示。該函數接受一個 Filed 類型的參數,返回該 Filed 屬性在對象中的偏移量。

/**
 * Report the location of a given static field, in conjunction with {@link
 * #staticFieldBase}.
 * Do not expect to perform any sort of arithmetic on this offset;
 * it is just a cookie which is passed to the unsafe heap memory accessors.
 *
 * Any given field will always have the same offset, and no two distinct
 * fields of the same class will ever have the same offset.
 *
 * As of 1.4.1, offsets for fields are represented as long values,
 * although the Sun JVM does not use the most significant 32 bits.
 * It is hard to imagine a JVM technology which needs more than
 * a few bits to encode an offset within a non-array object,
 * However, for consistency with other methods in this class,
 * this method reports its result as a long value.
 */
public native long objectFieldOffset(Field f);

下面我們再看一下 compareAndSwapInt 的函數原型。我們知道 CAS 操作需要知道 3 個信息:內存中的值,期望的舊值以及要修改的新值。通過前面的分析,我們知道通過 o 和 offset 我們可以確定屬性在內存中的地址,也就是知道了屬性在內存中的值。expected 對應期望的舊址,而 x 就是要修改的新值。

public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);

compareAndSwapInt 函數首先比較一下 expected 是否和內存中的值相同,如果不同證明其他線程修改了屬性值,那么就不會執行更新操作,但是程序如果就此返回了,似乎不太符合我們的期望,我們是希望程序可以執行更新操作的,如果其他線程先進行了更新,那么就在更新后的值的基礎上進行修改,所以我們一般使用循環配合 CAS 函數,使程序在更新操作完成之后再返回,如下所示:

long before = counter;
while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) {
    before = counter;
}

下面是使用 CAS 函數實現計數器的一個實例:

import sun.misc.Unsafe;

import java.lang.reflect.Field;

/**
 * Created by Jikai Zhang on 2017/4/8.
 */
public class CASCounter {

    // 通過反射的方式獲得 Unsafe 類
    public static Unsafe getUnsafe() {
        Unsafe unsafe = null;
        try {
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            unsafe = (Unsafe) theUnsafe.get(null);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }
        return unsafe;
    }

    private volatile long counter = 0;
    private static final long offset;
    private static final Unsafe unsafe = getUnsafe();

    static {
        try {
            offset = unsafe.objectFieldOffset(CASCounter.class.getDeclaredField("counter"));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }

    public void increment() {
        long before = counter;
        while (!unsafe.compareAndSwapLong(this, offset, before, before + 1)) {
            before = counter;
        }
    }

    public long getCounter() {
        return counter;
    }

    private static long intCounter = 0;

    public static void main(String[] args) throws InterruptedException {
        int threadCount = 10;
        Thread threads[] = new Thread[threadCount];
        final CASCounter casCounter = new CASCounter();

        for (int i = 0; i < threadCount; i++) {
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {

                    for (int i = 0; i < 10000; i++) {
                        casCounter.increment();
                        intCounter++;
                    }
                }
            });
            threads[i].start();
        }

        for(int i = 0; i < threadCount; i++) {
            threads[i].join();
        }
        System.out.printf("CASCounter is %d \nintCounter is %d\n", casCounter.getCounter(), intCounter);
    }
}

在 AQS 中,對原始的 CAS 函數封裝了一下,省去了獲得變量地址的步驟,如下面的形式:

private static final long waitStatusOffset;

static {
    try {
        waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
    } catch (Exception ex) {
        throw new Error(ex);
    }
}

private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}

同步隊列

AQS 依賴內部的同步隊列(一個 FIFO的雙向隊列)來完成同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構造成一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把隊列中第一個等待節點線程喚醒(下圖中的 Node1),使其再次嘗試獲取同步狀態。同步隊列的結構如下所示:

圖片來自 http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer

Head 節點本身不保存等待線程的信息,它通過 next 變量指向第一個保存線程等待信息的節點(Node1)。當線程被喚醒之后,會刪除 Head 節點,而喚醒線程所在的節點會設置為 Head 節點(Node1 被喚醒之后,Node1會被置為 Head 節點)。下面我們看下 JDK 中同步隊列的實現。

Node 類

首先看在節點所對應的 Node 類:

static final class Node {

    /**
     * 標志是獨占式模式還是共享模式
     */
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    /**
     * 線程等待狀態的有效值
     */
    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /**
     * 線程狀態,合法值為上面 4 個值中的一個
     */
    volatile int waitStatus;

    /**
     * 當前節點的前置節點
     */
    volatile Node prev;

    /**
     * 當前節點的后置節點
     */
    volatile Node next;

    /**
     * 當前節點所關聯的線程
     */
    volatile Thread thread;

    /**
     * 指向下一個在某個條件上等待的節點,或者指向 SHARE 節點,表明當前處於共享模式
     */
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() { // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) { // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

在 Node 類中定義了四種等待狀態:

  • CANCELED: 1,因為等待超時 (timeout)或者中斷(interrupt),節點會被置為取消狀態。處於取消狀態的節點不會再去競爭鎖,也就是說不會再被阻塞。節點會一直保持取消狀態,而不會轉換為其他狀態。處於 CANCELED 的節點會被移出隊列,被 GC 回收。
  • SIGNAL: -1,表明當前的后繼結點正在或者將要被阻塞(通過使用 LockSupport.pack 方法),因此當前的節點被釋放(release)或者被取消時(cancel)時,要喚醒它的后繼結點(通過 LockSupport.unpark 方法)。
  • CONDITION: -2,表明當前節點在條件隊列中,因為等待某個條件而被阻塞。
  • PROPAGATE: -3,在共享模式下,可以認為資源有多個,因此當前線程被喚醒之后,可能還有剩余的資源可以喚醒其他線程。該狀態用來表明后續節點會傳播喚醒的操作。需要注意的是只有頭節點才可以設置為該狀態(This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.)。
  • 0:新創建的節點會處於這種狀態

獨占鎖的獲取和釋放

我們首先看下獨占鎖的獲取和釋放過程

獨占鎖獲取

下面是獲取獨占鎖的流程圖:

我們通過 acquire 方法來獲取獨占鎖,下面是方法定義

public final void acquire(int arg) {
    // 首先嘗試獲取鎖,如果獲取失敗,會先調用 addWaiter 方法創建節點並追加到隊列尾部
    // 然后調用 acquireQueued 阻塞或者循環嘗試獲取鎖
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        // 在 acquireQueued 中,如果線程是因為中斷而退出的阻塞狀態會返回 true
        // 這里的 selfInterrupt 主要是為了恢復線程的中斷狀態
        selfInterrupt();
    }
}

acquire 會首先調用 tryAcquire 方法來獲得鎖,該方法需要我們來實現,這個在前面已經提過了。如果沒有獲取鎖,會調用 addWaiter 方法創建一個和當前線程關聯的節點追加到同步隊列的尾部,我們調用 addWaiter 時傳入的是 Node.EXCLUSIVE,表明當前是獨占模式。下面是 addWaiter 的具體實現

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // tail 指向同步隊列的尾節點
    Node pred = tail;
    // Try the fast path of enq; backup to full enq on failure
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

addWaiter 方法會首先調用 if 方法,來判斷能否成功將節點添加到隊列尾部,如果添加失敗,再調用 enq 方法(使用循環不斷重試)進行添加,下面是 enq 方法的實現:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 同步隊列采用的懶初始化(lazily initialized)的方式,
        // 初始時 head 和 tail 都會被設置為 null,當一次被訪問時
        // 才會創建 head 對象,並把尾指針指向 head。
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter 僅僅是將節點加到了同步隊列的末尾,並沒有阻塞線程,線程阻塞的操作是在 acquireQueued 方法中完成的,下面是 acquireQueued 的實現:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果當前節點的前繼節點是 head,就使用自旋(循環)的方式不斷請求鎖
            if (p == head && tryAcquire(arg)) {
                // 成功獲得鎖,將當前節點置為 head 節點,同時刪除原 head 節點
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }

            // shouldParkAfterFailedAcquire 檢查是否可以掛起線程,
            // 如果可以掛起進程,會調用 parkAndCheckInterrupt 掛起線程,
            // 如果 parkAndCheckInterrupt 返回 true,表明當前線程是因為中斷而退出掛起狀態的,
            // 所以要將 interrupted 設為 true,表明當前線程被中斷過
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued 會首先檢查當前節點的前繼節點是否為 head,如果為 head,將使用自旋的方式不斷的請求鎖,如果不是 head,則調用 shouldParkAfterFailedAcquire 查看是否應該掛起當前節點關聯的線程,下面是 shouldParkAfterFailedAcquire 的實現:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 當前節點的前繼節點的等待狀態
    int ws = pred.waitStatus;
    // 如果前繼節點的等待狀態為 SIGNAL 我們就可以將當前節點對應的線程掛起
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        // ws 大於 0,表明當前線程的前繼節點處於 CANCELED 的狀態,
        // 所以我們需要從當前節點開始往前查找,直到找到第一個不為
        // CAECELED  狀態的節點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire 會檢查前繼節點的等待狀態,如果前繼節點狀態為 SIGNAL,則可以將當前節點關聯的線程掛起,如果不是 SIGNAL,會做一些其他的操作,在當前循環中不會掛起線程。如果確定了可以掛起線程,就調用 parkAndCheckInterrupt 方法對線程進行阻塞:

private final boolean parkAndCheckInterrupt() {
    // 掛起當前線程
    LockSupport.park(this);
    // 可以通過調用 interrupt 方法使線程退出 park 狀態,
    // 為了使線程在后面的循環中還可以響應中斷,會重置線程的中斷狀態。
    // 這里使用 interrupted 會先返回線程當前的中斷狀態,然后將中斷狀態重置為 false,
    // 線程的中斷狀態會返回給上層調用函數,在線程獲得鎖后,
    // 如果發現線程曾被中斷過,會將中斷狀態重新設為 true
    return Thread.interrupted();
}

獨占鎖釋放

下面是釋放獨占鎖的流程:

通過 release 方法,我們可以釋放互斥鎖。下面是 release 方法的實現:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // waitStatus 為 0,證明是初始化的空隊列或者后繼結點已經被喚醒了
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

在獨占模式下釋放鎖時,是沒有其他線程競爭的,所以處理會簡單一些。首先嘗試釋放鎖,如果失敗就直接返回(失敗不是因為多線程競爭,而是線程本身就不擁有鎖)。如果成功的話,會檢查 h 的狀態,然后調用 unparkSuccessor 方法來喚醒后續線程。下面是 unparkSuccessor 的實現:

private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    // 將 head 節點的狀態置為 0,表明當前節點的后續節點已經被喚醒了,
    // 不需要再次喚醒,修改 ws 狀態主要作用於 release 的判斷
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

在 unparkSuccessor 方法中,如果發現頭節點的后繼結點為 null 或者處於 CANCELED 狀態,會從尾部往前找(在節點存在的前提下,這樣一定能找到)離頭節點最近的需要喚醒的節點,然后喚醒該節點。

共享鎖獲取和釋放

獨占鎖的流程和原理比較容易理解,因為只有一個鎖,但是共享鎖的處理就相對復雜一些了。在獨占鎖中,只有在釋放鎖之后,才能喚醒等待的線程,而在共享模式中,獲取鎖和釋放鎖之后,都有可能喚醒等待的線程。如果想要理清共享鎖的工作過程,必須將共享鎖的獲取和釋放結合起來看。這里我們先看一下共享鎖的釋放過程,只有明白了釋放過程做了哪些工作,才能更好的理解獲取鎖的過程。

共享鎖釋放

下面是釋放共享鎖的流程:

通過 releaseShared 方法會釋放共享鎖,下面是具體的實現:

public final boolean releaseShared(int releases) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

releases 是要釋放的共享資源數量,其中 tryReleaseShared 的方法由我們自己重寫,該方法的主要功能就是修改共享資源的數量(state + releases),因為可能會有多個線程同時釋放資源,所以實現的時候,一般采用循環加 CAS 操作的方式,如下面的形式:

protected boolean tryReleaseShared(int releases) {
    // 釋放共享資源,因為可能有多個線程同時執行,所以需要使用 CAS 操作來修改資源總數。
    for (;;) {
        int lastCount = getState();
        int newCount = lastCount + releases;
        if (compareAndSetState(lastCount, newCount)) {
            return true;
        }
    }
}

當共享資源數量修改了之后,會調用 doReleaseShared 方法,該方法主要喚醒同步隊列中的第一個等待節點(head.next),下面是具體實現:

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        // head = null 說明沒有初始化,head = tail 說明同步隊列中沒有等待節點
        if (h != null && h != tail) {
            // 查看當前節點的等待狀態
            int ws = h.waitStatus;
            // 我們在前面說過,SIGNAL說明有后續節點需要喚醒
            if (ws == Node.SIGNAL) {

                /*
                 * 將當前節點的值設為 0,表明已經喚醒了后繼節點
                 * 可能會有多個線程同時執行到這一步,所以使用 CAS 保證只有一個線程能修改成功,
                 * 從而執行 unparkSuccessor,其他的線程會執行 continue 操作
                 */
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                unparkSuccessor(h);
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                /*
                 * ws 等於 0,說明無需喚醒后繼結點(后續節點已經被喚醒或者當前節點沒有被阻塞的后繼結點),
                 * 也就是這一次的調用其實並沒有執行喚醒后繼結點的操作。就類似於我只需要一張優惠券,
                 * 但是我的兩個朋友,他們分別給我了一張,因此我就剩余了一張。然后我就將這張剩余的優惠券
                 * 送(傳播)給其他人使用,因此這里將節點置為可傳播的狀態(PROPAGATE)
                 */
                continue; // loop on failed CAS
            }
        }
        if (h == head) // loop if head changed
            break;
    }
}

從上面的實現中,doReleaseShared 的主要作用是用來喚醒阻塞的節點並且一次只喚醒一個,讓該節點關聯的線程去重新競爭鎖,它既不修改同步隊列,也不修改共享資源。

當多個線程同時釋放資源時,可以確保兩件事:

  1. 共享資源的數量能正確的累加
  2. 至少有一個線程被喚醒,其實只要確保有一個線程被喚醒就可以了,即便喚醒了多個線程,在同一時刻,也只能有一個線程能得到競爭鎖的資格,在下面我們會看到。

所以釋放鎖做的主要工作還是修改共享資源的數量。而有了多個共享資源后,如何確保同步隊列中的多個節點可以獲取鎖,是由獲取鎖的邏輯完成的。下面看下共享鎖的獲取。

共享鎖的獲取

下面是獲取共享鎖的流程

通過 acquireShared 方法,我們可以申請共享鎖,下面是具體的實現:

public final void acquireShared(int arg) {
    // 如果返回結果小於 0,證明沒有獲取到共享資源
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

如果沒有獲取到共享資源,就會執行 doAcquireShared 方法,下面是該方法的具體實現:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

從上面的代碼中可以看到,只有前置節點為 head 的節點才有可能去競爭鎖,這點和獨占模式的處理是一樣的,所以即便喚醒了多個線程,也只有一個線程能進入競爭鎖的邏輯,其余線程會再次進入 park 狀態,當線程獲取到共享鎖之后,會執行 setHeadAndPropagate 方法,下面是具體的實現:

private void setHeadAndPropagate(Node node, long propagate) {
    // 備份一下頭節點
    Node h = head; // Record old head for check below
    /*
     * 移除頭節點,並將當前節點置為頭節點
     * 當執行完這一步之后,其實隊列的頭節點已經發生改變,
     * 其他被喚醒的線程就有機會去獲取鎖,從而並發的執行該方法,
     * 所以上面備份頭節點,以便下面的代碼可以正確運行
     */
    setHead(node);

    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */

     /*
      * 判斷是否需要喚醒后繼結點,propagate > 0 說明共享資源有剩余,
      * h.waitStatus < 0,表明當前節點狀態可能為 SIGNAL,CONDITION,PROPAGATE
      */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 只有 s 不處於獨占模式時,才去喚醒后繼結點
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

判斷后繼結點是否需要喚醒的條件是十分寬松的,也就是一定包含必要的喚醒,但是也有可能會包含不必要的喚醒。從前面我們可以知道 doReleaseShared 函數的主要作用是喚醒后繼結點,它既不修改共享資源,也不修改同步隊列,所以即便有不必要的喚醒也是不影響程序正確性的。如果沒有共享資源,節點會再次進入等待狀態。

到了這里,脈絡就比較清晰了,當一個節點獲取到共享鎖之后,它除了將自身設為 head 節點之外,還會判斷一下是否滿足喚醒后繼結點的條件,如果滿足,就喚醒后繼結點,后繼結點獲取到鎖之后,會重復這個過程,直到判斷條件不成立。就類似於考試時從第一排往最后一排傳卷子,第一排先留下一份,然后將剩余的傳給后一排,后一排會重復這個過程。如果傳到某一排卷子沒了,那么位於這排的人就要等待,直到老師又給了他新的卷子。

中斷

在獲取鎖時還可以設置響應中斷,獨占鎖和共享鎖的處理邏輯類似,這里我們以獨占鎖為例。使用 acquireInterruptibly 方法,在獲取獨占鎖時可以響應中斷,下面是具體的實現:

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                // 這里會拋出異常
                throw new InterruptedException();
            }
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

從上面的代碼中我們可以看出,acquireInterruptibly 和 acquire 的邏輯類似,只是在下面的代碼處有所不同:當線程因為中斷而退出阻塞狀態時,會直接拋出 InterruptedException 異常。

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
    // 這里會拋出異常
    throw new InterruptedException();
}

我們知道,不管是拋出異常還是方法返回,程序都會執行 finally 代碼,而 failed 肯定為 true,所以拋出異常之后會執行 cancelAcquire 方法,cancelAcquire 方法主要將節點從同步隊列中移除。下面是具體的實現:

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // 跳過前面的已經取消的節點
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // 保存下 pred 的后繼結點,以便 CAS 操作使用
    // 因為可能存在已經取消的節點,所以 pred.next 不一等於 node
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 將節點狀態設為 CANCELED
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

從上面的代碼可以看出,節點的刪除分為三種情況:

  • 刪除節點為尾節點,直接將該節點的第一個有效前置節點置為尾節點
  • 刪除節點的前置節點為頭節點,則對該節點執行 unparkSuccessor 操作
  • 刪除節點為中間節點,結果如下圖所示。下圖中(1)表示同步隊列的初始狀態,假設刪除 node2, node1 是正常節點(非 CANCELED),(2)就是刪除 node2 后同步隊列的狀態,此時 node1 節點的后繼已經變為 node3,也就是說當 node1 變為 head 之后,會直接喚醒 node3。當另外的一個節點中斷之后再次執行 cancelAcquire,在執行下面的代碼時,會使同步隊列的狀態由(2)變為(3),此時 node2 已經沒有外界指針了,可以被回收了。如果一直沒有另外一個節點中斷,也就是同步隊列一直處於(2)狀態,那么需要等 node3 被回收之后,node2 才可以被回收。
Node pred = node.prev;
while (pred.waitStatus > 0)
    node.prev = pred = pred.prev;

超時

超時是在中斷的基礎上加了一層時間的判斷,這里我們還是以獨占鎖為例。 tryAcquireNanos 支持獲取鎖的超時處理,下面是具體實現:

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}

當獲取鎖失敗之后,會執行 doAcquireNanos 方法,下面是具體實現:

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0 L)
        return false;

    // 線程最晚結束時間
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 判斷是否超時,如果超時就返回
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0 L)
                return false;

            // 這里如果設定了一個閾值,如果超時的時間比閾值小,就認為
            // 當前線程沒必要阻塞,再執行幾次 for 循環估計就超時了
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);

            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

當線程超時返回時,還是會執行 cancelAcquire 方法,cancelAcquire 的邏輯已經在前面說過了,這里不再贅述。

參考文章


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM