Java中AQS基本實現原理


一、AQS概述

AQS全名AbstractQueuedSynchronizer,意為抽象隊列同步器,JUC(java.util.concurrent包)下面的Lock和其他一些並發工具類都是基於它來實現的。AQS維護了一個volatile的state和一個CLH(FIFO)雙向隊列。

二、分析

state

state是一個由volatile修飾的int變量,它的訪問方式有三種:

  • getState()
  • setState(int newState)
  • compareAndSetState(int expect, int update)
    /**
     * 由volatile修飾的state
     */
    private volatile int state;

    /**
     * 基於內存可見性的讀
     */
    protected final int getState() {
        return state;
    }

    /**
     * 基於內存可見性的寫
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * 使用CAS+volatile,基於原子性與可見性的對state進行設值
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // 使用Unsafe類,調用JNI方法
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

資源獲取主要有兩種形式:

  • 獨占式(EXCLUSIVE)

僅有一個線程能在同一時刻獲取到資源並處理,如ReentrantLock的實現。

  • 共享式(SHARED)

多個線程可以同時獲取到資源並處理,如Semaphore/CountDownLatch等。

AQS中大部分邏輯已經被實現,集成類只需要重寫state的獲取(acquire)與釋放(release)方法,因為在AQS中,這些方法默認定義的實現方式都是拋出不支持操作異常,所以按需實現即可。

其中需要繼承類重寫的方法有:

  • tryAcquire(int arg)

此方法是獨占式的獲取資源方法,成功則返回true,失敗返回false。

  • tryRelease(int arg)

此方法是獨占式的釋放資源方法,成功則返回true,失敗返回false。

  • tryAcquireShared(int arg)

此方法是共享式的獲取資源方法,返回負數表示失敗,0表示獲取成功,但是沒有可用資源,正數表示獲取成功,且有可用資源。

  • tryReleaseShared(int arg)

此方法是共享式的釋放資源方法,如果允許喚醒后續等待線程則返回true,不允許則返回false。

  • isHeldExclusively()

判斷當前線程是否正在獨享資源,是則返回true,否則返回false。

CLH(FIFO)隊列

AQS中是通過內部類Node來維護一個CLH隊列的。源碼如下:

 static final class Node {
        /** 標記共享式訪問 */
        static final Node SHARED = new Node();
        /** 標記獨占式訪問 */
        static final Node EXCLUSIVE = null;

        /** 字段waitStatus的值,表示當前節點已取消等待 */
        static final int CANCELLED =  1;
        /**字段waitStatus的值,表示當前節點取消或釋放資源后,通知下一個節點 */
        static final int SIGNAL    = -1;
        /** 表示正在等待觸發條件 */
        static final int CONDITION = -2;
        /**
         * 表示下一個共享獲取應無條件傳播
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     表示當前節點取消或釋放資源后,通知下一個節點
         *   CANCELLED:  表示當前節點已取消等待
         *   CONDITION:  表示正在等待觸發條件
         *   PROPAGATE:  表示下一個共享獲取應無條件傳播
         */
        volatile int waitStatus;

        /**
         * 前節點
         */
        volatile Node prev;

        /**
         * 下一個節點
         */
        volatile Node next;

        /**
         * 節點對應線程
         */
        volatile Thread thread;

        /**
         * 下一個等待的節點
         */
        Node nextWaiter;

        /**
         * 是否是共享式訪問
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回前節點
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // 共享式訪問的構造函數
        }

        Node(Thread thread, Node mode) {     // 用於被添加等待者使用
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // 用於Condition使用
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

獨占模式-獲取資源

使用AQS中的acquire(int arg)方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

該方法分為4個部分:

  • tryAcquire()

需要自己實現的方法,如果獲取到資源使用權,則返回true,反之fasle。如果獲取到資源,返回true,!true為false,根據&&的短路性,則不會執行后續方法,直接跳過程序。如果未獲取到資源,返回false,!false為true,則進入后續方法。

  • addWaiter()

如果未獲取到資源使用權,則首先會調用此方法。上源碼:

    private Node addWaiter(Node mode) {
        // 封裝當前線程和獨占模式
        Node node = new Node(Thread.currentThread(), mode);
        // 獲取尾部節點
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // CAS設置尾部節點
            if (compareAndSetTail(pred, node)) {
                // 將為節點的下一節點指向當前node
                pred.next = node;
                return node;
            }
        }
        // 如果尾結點為空或者設置尾結點失敗
        enq(node);
        return node;
    }
 private Node enq(final Node node) {
        // 如果CAS設置未成功則死循環
        for (;;) {
            // 獲得尾結點
            Node t = tail;
            // 如果尾節點為空,說明CLH隊列為空,需要初始化
            if (t == null) { 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 設置當前節點的前驅節點
                node.prev = t;
                // CAS設置當前節點為尾結點
                if (compareAndSetTail(t, node)) {
                    // 設置后驅節點
                    t.next = node;
                    return t;
                }
            }
        }
    }
  • acquiredQueued()
    final boolean acquireQueued(final Node node, int arg) {
        // 標識資源獲取是否失敗
        boolean failed = true;
        try {
            // 標識線程是否中斷
            boolean interrupted = false;
            for (;;) {
                // 獲得當前節點的前驅節點
                final Node p = node.predecessor();
                // 如果前驅節點為頭結點,說明快到當前節點了,嘗試獲取資源
                if (p == head && tryAcquire(arg)) {
                    // 獲取資源成功
                    // 設置當前節點為頭結點
                    setHead(node);
                    // 取消前驅節點(以前的頭部)的后節點,方便GC回收
                    p.next = null; // help GC
                    // 標識未失敗
                    failed = false;
                    // 返回中斷標志
                    return interrupted;
                }
                // 如果當前節點的前驅節點不是頭結點或獲取資源失敗
                // 需要用shouldParkAfterFailedAcquire函數判斷是否需要阻塞該節點持有的線程
                // 如果需要阻塞,則執行parkAndCheckInterrupt方法,並設置被中斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 如果最終獲取資源失敗
            if (failed)
                // 當前節點取消獲取資源
                cancelAcquire(node);
        }
    }
  • selfInterrupt()

中斷當前線程

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

獨占模式-釋放資源

release()  釋放資源並喚醒后繼線程

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            // 獲取頭結點
            Node h = head;
            // 頭結點不為空且等待狀態值不為0
            if (h != null && h.waitStatus != 0)
                // 喚醒后續等待線程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        // 如果等待狀態值小於0
        if (ws < 0)
            // 使用CAS將waitStatus設置為0
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        // 如果當前節點沒有后繼節點或者后繼節點放棄競爭資源
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 從隊列尾部循環直到當前節點,找到最近的且等待狀態值小於0的節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 如果找到的后繼節點不為空,則喚醒其持有的線程
        if (s != null)
            LockSupport.unpark(s.thread);
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM