[Java並發] AQS抽象隊列同步器源碼解析--獨占鎖獲取過程


[Java並發] AQS抽象隊列同步器源碼解析--獨占鎖釋放過程
要深入了解java並發知識,AbstractQueuedSynchronizer(AQS)是必須要拿出來深入學習的,AQS可以說是貫穿了整個JUC並發包,例如ReentrantLock,CountDownLatch,CyclicBarrier等並發類都涉及到了AQS。接下來就對AQS的實現原理進行分析。

在開始分析之前,勢必先將CLH同步隊列了解一下

CLH同步隊列

CLH自旋鎖: CLH(Craig, Landin, and Hagersten locks): 是一個自旋鎖,能確保無飢餓性,提供先來先服務的公平性。CLH自旋鎖是一種基於隱式鏈表(節點里面沒有next指針)的可擴展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅的狀態,如果發現前驅釋放了鎖就結束自旋。

AQS中的CLH同步隊列:AQS中CLH同步隊列是對CLH自旋鎖進行了優化,其主要從兩方面進行了改造:節點的結構與節點等待機制。

1.在結構上引入了頭結點和尾節點,他們分別指向隊列的頭和尾,嘗試獲取鎖、入隊列、釋放鎖等實現都與頭尾節點相關,並且每個節點都引入前驅節點和后后續節點的引用;

2.在等待機制上由原來的自旋改成阻塞喚醒。

源碼中CLH的簡單表示

*      +------+  prev +-----+       +-----+
* head |      | <---- |     | <---- |     |  tail
*      +------+       +-----+       +-----+

Node就是實現CLH同步隊列的數據結構,計算下就了解下該類的相關字段屬性

AQS中重要的內部類Node

static final class Node {
    // 共享模式
    static final Node SHARED = new Node();
    // 獨占模式
    static final Node EXCLUSIVE = null;

    // 如果屬性waitStatus == Node.CANCELLED,則表明該節點已經被取消
    static final int CANCELLED =  1;
    // 如果屬性waitStatus == Node.SIGNAL,則表明后繼節點等待被喚醒
    static final int SIGNAL    = -1;
    // 如果屬性waitStatus == Node.CONDITION,則表明是Condition模式中的節點等待條件喚醒
    static final int CONDITION = -2; 
    // 如果屬性waitStatus == Node.PROPAGATE,在共享模式下,傳播式喚醒后繼節點
    static final int PROPAGATE = -3; 
    // 用於標記當前節點的狀態,取值為1,-1,-2,-3,分別對應以上4個取值
    volatile int waitStatus;
    // 前驅節點
    volatile Node prev;
    // 后繼節點
    volatile Node next;
    // 當前節點對應的線程,阻塞與喚醒的線程
    volatile Thread thread;
    // 使用Condtion時(共享模式下)的后繼節點,在獨占模式中不會使用
	Node nextWaiter;
    final boolean isShared() {
            return nextWaiter == SHARED;
    }
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

下面就開始着重對AQS中的重要方法進行分析說明

獲取鎖

1.acquire 開始獲取鎖

public final void acquire(int arg) {
    //如果tryAcquire返回true,即獲取到鎖就停止執行,否則繼續向下執行向同步隊列尾部添加節點
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire是用於獲取鎖的方法,在AQS中默認沒有實現具體邏輯,由子類自定義實現。

如果返回true則說明獲取到鎖,否則需要將當前線程封裝為Node節點添加到同步隊列尾部。

2.當前節點入隊列

將當前執行的線程封裝為Node節點並加入到隊尾

private Node addWaiter(Node mode) {// 首先嘗試快速添加到隊尾,失敗再正常執行添加到隊尾
    Node node = new Node(Thread.currentThread(), mode);
    // 快速方式嘗試直接添加到隊尾
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果快速添加到隊尾失敗則執行enq(node)添加到隊尾
    enq(node);
    return node;
}

enq方法循環遍歷添加到隊尾

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
        	// 添加到隊列尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter(Node mode)方法執行完后,接下來執行acquireQueued方法, 返回的是該線程是否需要中斷,該方法也是不停地循環獲取鎖,如果前節點是頭節點,則嘗試獲取鎖,獲取鎖成功則返回是否需要中斷標志,如果獲取鎖失敗,則判斷是否需要阻塞並阻塞線程

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 標記是否需要被中斷
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果前驅節點是頭節點,並且獲取鎖成功,則返回
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 判斷獲取鎖失敗后是否需要阻塞當前線程,如果阻塞線程后再判斷是否需要被中斷線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire(p, node)方法判斷是否需要阻塞當前線程

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 如果ws == Node.SIGNAL,則說明當前線程已經准備好被喚醒,因此現在可以被阻塞,之后等待被喚醒
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        // 如果ws > 0,說明當前節點已經被取消,因此循環剔除ws>0的前驅節點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //如果ws<=0,則將標志位設置為Node.SIGNAL,當還不可被阻塞,需要的等待下次執行shouldParkAfterFailedAcquire判斷是否需要阻塞
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

如果shouldParkAfterFailedAcquire(p, node)方法返回true,說明需要阻塞當前線程,則執行parkAndCheckInterrupt方法阻塞線程,並返回阻塞過程中線程是否被中斷

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 阻塞線程,等待unpark()或interrupt()喚醒自己
    // 線程被喚醒后查看是否被中斷過。
    return Thread.interrupted();
}

那么重新回到獲取鎖的方法acquire方法,如果acquireQueued(final Node node, int arg)返回true,也即是阻塞過程中線程被中斷,則執行中斷線程操作selfInterrupt()

public final void acquire(int arg) {
    //如果tryAcquire返回true,即獲取到鎖就停止執行,否則繼續向下執行向同步隊列尾部添加節點,然后判斷是否被中斷過,是則執行中斷
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

中斷當前線程

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

小結

AQS獲取鎖的過程:

1.執行tryAcquire方法獲取鎖,如果獲取鎖成功則直接返回,否則執行步驟2

2.執行addWaiter方法將當前線程封裝位Node節點並添加到同步隊列尾部,執行步驟3

3.執行acquireQueued循環嘗試獲取鎖,,如果獲取鎖成功,則判斷返回中斷標志位,如果獲取鎖失敗則調用shouldParkAfterFailedAcquire方法判斷是否需要阻塞當前線程,如果需要阻塞線程則調用parkAndCheckInterrupt阻塞線程,並在被喚醒后再判斷再阻塞過程中是否被中斷過。

4.如果acquireQueued返回true,說明在阻塞過程中線程被中斷過,則執行selfInterrupt中斷線程

好了,以上就是AQS的鎖獲取過程,關於鎖釋放的分析會在后續繼續輸出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM