並發編程之:AQS源碼解析


大家好,我是小黑,一個在互聯網苟且偷生的農民工。

在Java並發編程中,經常會用到鎖,除了Synchronized這個JDK關鍵字以外,還有Lock接口下面的各種鎖實現,如重入鎖ReentrantLock,還有讀寫鎖ReadWriteLock等,他們在實現鎖的過程中都是依賴與AQS來完成核心的加解鎖邏輯的。那么AQS具體是什么呢?

提供一個框架,用於實現依賴先進先出(FIFO)等待隊列的阻塞鎖和相關同步器(信號量,事件等)。 該類被設計為大多數類型的同步器的有用依據,這些同步器依賴於單個原子int值來表示狀態。 子類必須定義改變此狀態的受保護方法,以及根據該對象被獲取或釋放來定義該狀態的含義。 給定這些,這個類中的其他方法執行所有排隊和阻塞機制。 子類可以保持其他狀態字段,但只以原子方式更新int使用方法操縱值getState() , setState(int)和compareAndSetState(int, int)被跟蹤相對於同步。

上述內容來自JDK官方文檔。

簡單來說,AQS是一個先進先出(FIFO)的等待隊列,主要用在一些線程同步場景,需要通過一個int類型的值來表示同步狀態。提供了排隊和阻塞機制。

類圖結構

image-20210904200925904

從類圖可以看出,在ReentrantLock中定義了AQS的子類Sync,可以通過Sync實現對於可重入鎖的加鎖,解鎖。

AQS通過int類型的狀態state來表示同步狀態。

AQS中主要提供的方法:

acquire(int) 獨占方式獲取鎖

acquireShared(int) 共享方式獲取鎖

release(int) 獨占方式釋放鎖

releaseShared(int) 共享方式釋放鎖

獨占鎖和共享鎖

關於獨占鎖和共享鎖先給大家普及一下這個概念。

獨占鎖指該鎖只能同時被一個線程持有;

共享鎖指該鎖可以被多個線程同時持有。

舉個生活中的例子,比如我們使用打車軟件打車,獨占鎖就好比我們打快車或者專車,一輛車只能讓一個客戶打到,不能兩個客戶同時打到一輛車;共享鎖就好比打拼車,可以有多個客戶一起打到同一輛車。

AQS內部結構

我們簡單通過一張圖先來了解下AQS的內部結構。其實就是有一個隊列,這個隊列的頭結點head代表當前正在持有鎖的線程,后續的其他節點代表當前正在等待的線程。

image-20210904201020029


接下來我們通過源碼來看看AQS的加鎖和解鎖過程。先來看看獨占鎖是如何進行加解鎖的。

獨占鎖加鎖過程

ReentrantLock lock = new ReentrantLock();
lock.lock();
public void lock() {
    // 調用sync的lock方法
    sync.lock();
}

可以看到在ReentrantLock的lock方法中,直接調用了sync這個AQS子類的lock方法。

final void lock() {
    // 獲取鎖
    acquire(1);
}
public final void acquire(int arg) {
    // 1.先嘗試獲取,如果獲取成功,則直接返回,代表加鎖成功
    if (!tryAcquire(arg) &&
        // 2.如果獲取失敗,則調用addWaiter在等待隊列中增加一個節點
        // 3. 調用acquireQueued告訴前一個節點,在解鎖之后喚醒自己,然后線程進入等待狀態
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果在等待過程中被中斷,則當前線程中斷
        selfInterrupt();
}

在獲取鎖時,基本可以分為3步:

  1. 嘗試獲取,如果成功則返回,如果失敗,執行下一步;
  2. 將當前線程放入等待隊列尾部;
  3. 標記前面等待的線程執行完之后喚醒當前線程。
/**
 * 嘗試獲取鎖(公平鎖實現)
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
	// 獲取state,初始值為0,每次加鎖成功會+1,解鎖成功-1
    int c = getState();
    // 當前沒有線程占用
    if (c == 0) { 
        // 判斷是否有其他線程排隊在本線程之前
        if (!hasQueuedPredecessors() &&
            // 如果沒有,通過CAS進行加鎖
            compareAndSetState(0, acquires)) {
            // 將當前線程設置為AQS的獨占線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果當前線程是正在獨占的線程(已持有鎖,重入)
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;  
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // state+1
        setState(nextc);
        return true;
    }
    return false;
}
private Node addWaiter(Node mode) {
    // 創建一個當前線程的Node節點
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 如果等待隊列的尾節點!=null
    if (pred != null) {
        // 將本線程對應節點的前置節點設置為原來的尾節點
        node.prev = pred;
        // 通過CAS將本線程節點設置為尾節點
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //尾節點為空,或者在CAS時失敗,則通過enq方法重新加入到尾部。(本方法內部采用自旋)
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 尾節點為空,代表等待隊列還沒有被初始化過
        if (t == null) { 
            // 創建一個空的Node對象,通過CAS賦值給Head節點,如果失敗,則重新自旋一次,如果成功,將Head節點賦值給尾節點
            if (compareAndSetHead(new Node()))
                tail = head; 
        } else {
            // 尾節點不為空的情況,說明等待隊列已經被初始化過,將當前節點的前置節點指向尾節點
            node.prev = t;
            // 將當前節點CAS賦值給尾節點
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
final boolean acquireQueued(final Node node, int arg) {
    // 標識是否加鎖失敗
    boolean failed = true;
    try {
        // 是否被中斷
        boolean interrupted = false;
        for (;;) {
            // 取出來當前節點的前一個節點
            final Node p = node.predecessor();
            // 如果前一個節點是head節點,那么自己就是老二,這個時候再嘗試獲取一次鎖
            if (p == head && tryAcquire(arg)) {
                // 如果獲取成功,把當前節點設置為head節點
                setHead(node);
                p.next = null; // help GC
                failed = false; // 標識加鎖成功
                return interrupted;
            }
            // shouldParkAfterFailedAcquire 檢查並更新前置節點p的狀態,如果node節點應該阻塞就返回true
            // 如果返回false,則自旋一次。
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 當前線程阻塞,在阻塞被喚醒時,判斷是否被中斷
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) // 如果加鎖成功,則取消獲取鎖
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // ws == -1
        /*
		 * 這個節點已經設置了請求釋放的狀態,所以它可以在這里安全park.
         */
        return true;
    if (ws > 0) {
        /*
         * 前置節點被取消了,跳過前置節點重試
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 將前置節點的狀態設置為請求釋放
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

在整個加鎖過程可以通過下圖更清晰的理解。

image-20210904201038010

獨占鎖解鎖過程

public void unlock() {
    sync.release(1);
}

同樣解鎖時也是直接調用AQS子類sync的release方法。

public final boolean release(int arg) {
    // 嘗試解鎖
    if (tryRelease(arg)) {
        Node h = head;
        // 解鎖成功,如果head!=null並且head.ws不等0,代表有其他線程排隊
        if (h != null && h.waitStatus != 0)
            // 喚醒后續等待的節點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

解鎖過程如下:

  1. 先嘗試解鎖,解鎖失敗則直接返回false。(理論上不會解鎖失敗,因為正在執行解鎖的線程一定是持有鎖的線程)
  2. 解鎖成功之后,如果有head節點並且狀態不是0,代表有線程被阻塞等待,則喚醒下一個等待的線程。
protected final boolean tryRelease(int releases) {
    // state - 1
    int c = getState() - releases;
    // 如果當前線程不是獨占AQS的線程,但是這時候又來解鎖,這種情況肯定是非法的。
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { // 如果狀態歸零,代表鎖釋放了,將獨占線程設置為null
        free = true;
        setExclusiveOwnerThread(null);
    }
	// 將減1之后的狀態設置為state
    setState(c);
    return free;
}
private void unparkSuccessor(Node node) {
    /*
     * 如果節點的ws小於0,將ws設置為0
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 從等待隊列的尾部往前找,直到第二個節點,ws<=0的節點。
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果存在符合條件的節點,unpark喚醒這個節點的線程。
    if (s != null)
        LockSupport.unpark(s.thread);
}

共享鎖加鎖過程

為了實現共享鎖,AQS中專門有一套和排他鎖不同的實現,我們來看一下源碼具體是怎么做的。

public void lock() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    // tryAcquireShared 嘗試獲取共享鎖許可,如果返回負數標識獲取失敗
    // 返回0表示成功,但是已經沒有多余的許可可用,后續不能再成功,返回正數表示后續請求也可以成功
    if (tryAcquireShared(arg) < 0)
       //  申請失敗,則加入到共享等待隊列
        doAcquireShared(arg);
}

tryAcquireShared嘗試獲取共享許可,本方法需要在子類中進行實現。不同的實現類實現方式不一樣。

下面的代碼是ReentrentReadWriteLock中的實現。

 protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 當前有獨占線程正在持有許可,並且獨占線程不是當前線程,則返回失敗(-1)
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    // 沒有獨占線程,或者獨占線程是當前線程。
    // 獲取已使用讀鎖的個數
    int r = sharedCount(c);
  	// 判斷當前讀鎖是否應該阻塞 
    if (!readerShouldBlock() &&
        // 已使用讀鎖小於最大數量
        r < MAX_COUNT &&
        // CAS設置state,每次加SHARED_UNIT標識共享鎖+1
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) { // 標識第一次加讀鎖
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            // 重入加讀鎖
            firstReaderHoldCount++;
        } else {
            // 並發加讀鎖,記錄當前線程的讀的次數,HoldCounter中是一個ThreadLocal。
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 否則自旋嘗試獲取共享鎖
    return fullTryAcquireShared(current);
}

本方法可以總結為三步:

  1. 如果有寫線程獨占,則失敗,返回-1
  2. 沒有寫線程或者當前線程就是寫線程重入,則判斷是否讀線程阻塞,如果不用阻塞則CAS將已使用讀鎖個數+1
  3. 如果第2步失敗,失敗原因可能是讀線程應該阻塞,或者讀鎖達到上限,或者CAS失敗,則調用fullTryAcquireShared方法。
private void doAcquireShared(int arg) {
    // 加入同步等待隊列,指定是SHARED類型
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 取到當前節點的前一個節點
            final Node p = node.predecessor();
            // 如果前一個節點是頭節點,則當前節點是第二個節點。
            if (p == head) {
                // 因為是FIFO隊列,所以當前節點這時可以再嘗試獲取一次。
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 獲取成功,把當前節點設置為頭節點。並且判斷是否需要喚醒后面的等待節點。
                    // 如果條件允許,就會喚醒后面的節點
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 如果前置節點不是頭結點,說明當前節點線程需要阻塞等待,並告知前一個節點喚醒
            // 檢查並更新前置節點p的狀態,如果node節點應該阻塞就返回true
            // 當前線程被喚醒之后,會從parkAndCheckInterrupt()執行
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) 
            cancelAcquire(node);
    }
}

共享鎖釋放過程

public void unlock() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    //tryReleaseShared()嘗試釋放許可,這個方法在AQS中默認拋出一個異常,需要在子類中實現
    if (tryReleaseShared(arg)) {
        // 喚醒線程,設置傳播狀態 WS
        doReleaseShared();
        return true;
    }
    return false;
}

AQS是很多並發場景下同步控制的基石,其中的實現相對要復雜很多,還需要多看多琢磨才能完全理解。本文也是和大家做一個初探,給大家展示了核心的代碼邏輯,希望能有所幫助。


好的,本期內容就到這里,我們下期見;關注公眾號【小黑說Java】更多干貨。
image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM