AQS使用及原理


1、AQS是AbstractQueuedSynchronizer的簡稱。提供用於實現阻塞鎖和同步器框架(信號量、事件等),依靠先入先出(FIFO)等待隊列。AQS為一系列同步器依賴於一個單獨的原子變量(state)的同步器提供了一個非常有用的基礎。AQS對於state的操作都是基於CAS操作,保證了state的原子性和可見性。

state使用

AQS 提供了三種操作state的方法

  • getState()
  • setState(int)
  • compareAndSetState(int, int)

具體源碼如下

protected final int getState() {
    return state;
}
    
protected final void setState(int newState) {
state = newState;
}
    
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS資源

AQS定義了兩種獲取資源的方式,獨占(exclusive)和共享(Shared)兩種方式,使用AQS值需要子類實現了以下方法的一部分或全部,如果沒有實現而調用會報UnsupportedOperationException異常

  • tryAcquire(int) 嘗試獲取獨占資源
  • tryRelease(int) 嘗試釋放獨占資源
  • tryAcquireShared(int) 嘗試獲取共享資源
  • tryReleaseShared(int) 嘗試釋放共享資源
  • isHeldExclusively() 該線程是否正在獨占資源。只有用到condition才需要去實現它。
AQS的其他重要方法
  • acquire(int arg) 在獨占模式中獲得,忽略中斷。
  1. 調用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
  2. 沒成功,則addWaiter()將該線程加入等待隊列的尾部,並標記為獨占模式;
  3. acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
  4. 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 如果沒有拿到鎖
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //提供中斷當前線程的方法
        selfInterrupt();
}

/**將當前線程加入到等待隊列的隊尾,並返回代表當前線程的節點
 * Creates and enqueues node for current thread and given mode.
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);//EXCLUSIVE(獨占)和SHARED(共享)
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {// 直接放到隊尾。
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);// 按正常邏輯入隊列
    return node;
}


/**將node加入隊尾
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) { // CAS + 循環 = 自旋
        Node t = tail;
        if (t == null) { // Must initialize 隊列為空
            if (compareAndSetHead(new Node()))// 創建一個空的標志結點作為head結點
                tail = head; // tail 和 head都是同一個節點
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) { // 放入tail尾部
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * 線程阻塞等待獲取鎖
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false; // 當前線程釋放中斷的標志位
        for (;;) {// 循環搶鎖
            final Node p = node.predecessor(); // 獲取前一個節點
            if (p == head && tryAcquire(arg)) { // 如果前一個節點是head,就嘗試搶鎖,並且嘗試搶鎖成功
                setHead(node); // 更換head,即當前Node出隊列
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&// 檢查狀態,是否需要掛起線程
                parkAndCheckInterrupt())   // 掛起
                interrupted = true;     // 如果出現中斷,則修改標記
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • release(int) 釋放獨占模式。
  1. 一般來說,獨占都是可以釋放成功,但是得判斷該線程釋放完全釋放
public final boolean release(int arg) {
    if (tryRelease(arg)) {//嘗試釋放
        Node h = head; // 從頭開始找
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); // 喚醒下一個線程
        return true;
    }
    return false;
}

/** 喚醒等待者
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus; // 正在釋放鎖的線程節點狀態
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0); // 修改當前節點狀態

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next; // 找下一個節點
    if (s == null || s.waitStatus > 0) { // 如果不存在或者被取消了,繼續尋找合適的下一個節點
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) // 如果找到了合適的節點,就喚醒它
        LockSupport.unpark(s.thread);
}
  • acquireShared(int) 在共享模式中獲取,忽略中斷。
  1. 只要沒有獨占資源,該方法都應該成功,需要注意喚醒線程時需要判斷隊列中的下個線程是否時共享資源,如果時,也要同時喚醒。
/** 線程獲取共享資源的入口
 */
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0) // 判斷量夠不夠
        doAcquireShared(arg); // 沒拿到資源,需要等待
}

/**等待..
 * Acquires in shared uninterruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED); // 以共享模式加入隊列尾部
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { // 自旋
            final Node p = node.predecessor(); // 前置節點
            if (p == head) { // 如果前置為head
                int r = tryAcquireShared(arg); // 嘗試獲取資源,返回資源剩余的數量
                if (r >= 0) { // 拿到資源
                    setHeadAndPropagate(node, r); // 修改head節點
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

  • releaseShared(int)釋放共享資源
  1. 釋放共享資源都能成功
/** 線程釋放共享資源
*/
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//嘗試
        doReleaseShared();//執行
        return true;
    }
    return false;
}

/** 共享模式下 - 喚醒當前head節點的后續節點
 */
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) { // 判定是否還有后續節點
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { // 如果狀態為SIGNAL,代表需要通知后續節點
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 修改狀態為0,通知一次
                    continue;            // loop to recheck cases 修改失敗,代表已經通知,繼續處理
                unparkSuccessor(h); // 喚醒
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 通知過后,修改節點狀態為PROPAGATE
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed 知道其他的節點,把這個head擠下來,它才跳出循環
            break;
    }
}

參考:https://www.jianshu.com/p/da9d051dcc3d


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM