AbstractQueuedSynchronizer原理分析


 

  AbstractQueuedSynchronized 以下簡稱AQS,是用來構建鎖或者其他同步組件的基礎框架。

  在AQS中,為鎖的獲取和釋放提供了一些模板方法,而實現鎖的類(AQS的子類)需要實現這些模板方法中的同步方法。

  這些方法包括:

  ·tryAcquire():嘗試以獨占模式獲取鎖

  ·tryRelease():嘗試釋放獨占模式的鎖

  ·tryAcquireShared():嘗試以共享模式獲取鎖

  ·tryReleaseShared():嘗試釋放共享模式的鎖

  ·isHeldExclusiverly():返回是否以獨占模式獲有鎖

 

  在分析AQS的原理之前,我們先看看LockSupportLockCondition、AQS、ReentrantLok等等之間的關系和使用方式。

  關系圖

             

 

 

  使用方式有兩種:一種是不帶條件的普通的鎖,另一種是帶條件的鎖。

  不帶條件

     /*
         * ReentrantLock是Lock接口的實現類之一
         * 實現的是一種可重入的鎖
         */
        Lock lock  = new ReentrantLock();
        lock.lock();
        try {
            //同步處理
        }finally {
            lock.unlock();
        }

  帶條件的鎖

Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();//創建和該鎖關聯的條件鎖
    
    public void conditionWait() throws InterruptedException{
        lock.lock();
        try {
            condition.await();
        }finally {
            lock.unlock();
        }
    }
    public void ConditionSignal() throws InterruptedException{
        lock.lock();
        try {
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

  從使用方式可以看到,主要調用的方法就是:lock.lock()、lock.unlock()、lock.newCondition()、condition.await()、condition.signal() 

  

下面分別來看看這幾個方法

  lock.lock():由於Lock是一個接口,所以需要通過其子類來實例化,所以lock.lock()其實調用的是子類的lock()方法,在上面的例子中自然調用的就是ReentrantLock.lock()方法。

    public void lock() {
        sync.lock();
    }

   可以看到,ReentrantLock.lock()方法調用的是同步器內部類的lock()方法,而ReentrantLock的內部同步器類Sync又分為FairSync和NoFairSync。

  源碼如下(省略了一部分內容):

   abstract static class Sync extends AbstractQueuedSynchronizer {
        
        abstract void lock();
   }
    static final class NonfairSync extends Sync {
     
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    }

    static final class FairSync extends Sync {
     final void lock() {
            acquire(1);
        }
    }

  可以看到在非公平的獲取鎖的方法中,會首先嘗試直接去獲取鎖,而不會通過同步隊列去排隊獲取鎖。否則的話,就通過AQS同步器的acquire(1)去獲取鎖。

  ※RetrantLock在初始化的時候可以通過參數指定是公平的還是不公平的鎖。默認是非公平的鎖。

 

  lock.unlock():調用的是同步器中的release(1)方法,也就是AQS中的release(1)方法。

    public void unlock() {
        sync.release(1);
    }

 

  lock.newCondition():該方法調用的也是內部同步器類中的newCondition()方法。

    public Condition newCondition() {
        return sync.newCondition();
    }

  在ReentrantLock的內部同步器類中的newCondition()方法如下:

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

  可以看到,該方法直接返回了一個AQS中的內部類ConditonObject對象。所以,在每次生成一個條件鎖的時候,都會創建一個ConditionObject對象,

  而每個ConditionObject對象都會在內部維護一個條件等待隊列。

 

  ConditonObject實現分析點此參考

 

  conditon.await():通過調用LockSupport中的park()方法,將當前掛起。

  源碼如下:

       public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

 

  condition.signal():通過調用LockSupport的unpark()方法,來喚醒線程。

  源碼如下:

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
  final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

 

  了解完上面的概述之后,我們知道了,在不帶條件的鎖中,主要是通過調用AQS中的acquire(1)herelease(1)方法來獲取鎖和釋放鎖的。

下面就來具體看看AQS中的實現

  在AQS中通過一個雙向FIFO同步隊列來維護獲取對象鎖的線程,當獲取鎖失敗的時候,會用當前線程構建一個Node節點,加入到同步隊列中去。

  在AQS中維護了同步隊列的head和tail節點,和同步狀態。。

  Node節點中維護了當前線程的status和前驅節點、后繼節點、下一個等待節點(條件等待的時候用)。

  waitStatus包含如下狀態默認值為0:

    CANCELLED = 1 : 表示當前線程在等待的時候已經超時了或者被取消了

    SIGNAL = -1  :當前線程釋放了同步狀態或者被取消的時候會通知后繼節點,使后繼節點得以運行

    CONDITINO = -2:節點在等待隊列中,等待Condition,當其他線程在Condition上調用signal的時候,該線程會從等待隊列轉移到同步隊列中去,加入到同步狀態的獲取。

    PROPAGATE = -3:表示下一次共享式同步狀態會無條件的傳播下去

 

  AQS中同步器的結構如下:

  

  當有節點加入到同步隊列的時候,只需要對tail節點重新指向就可以了

 

   同步隊列是一個FIFO的隊列,在獲取鎖的時候總是首節點是獲取同步狀態的節點,首節點的線程在釋放同步狀態時,將會喚醒后繼節點,而后繼節點在獲取同步狀態成功時,會將自己設置為首節點。

 

 下面就來具體看看在獲取鎖的時候lock.lock()調用的同步器中的acquire(1)方法的具體實現

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

 

  以獨占模式獲取鎖,並且不相應中斷。在AQS中還有以下acquire方法:

    acquireInterruptibly(int arg) :以獨占模式獲取鎖,並且響應中斷

    acquireShared(int arg):以共享模式獲取鎖,並且不響應中斷

    acquireSharedInterruptibly(int arg):以共享模式獲取鎖,並且響應中斷

 

  該方法首先會調用tryAcquire(arg)來嘗試獲取鎖。從源碼可以看到,在AQS中該方法只是單純的拋出一個UnsupportedOperationException異常,該方法需要實現AQS同步器的具體類中實現。

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

 

  我們看看ReentrantLock中的具體實現。

  非公平鎖:

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {  //如果當前的同步狀態為0,就嘗試直接設置同步狀態和設置獨占的線程為自己,來強制獲取鎖
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {  //如果當前同步狀態不為0,就判斷是不是自己獲取了鎖,這里是實現可重入的
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

 

  公平鎖:

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {  
                if (!hasQueuedPredecessors() &&  //當前state為0 ,並且同步隊列中沒有前繼節點,就嘗試設置自己為獲得鎖的線程
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {  //實現可重入
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

   可以看到,不論是在公平鎖還是非公平鎖的tryAcquire中,當獲取到鎖的時候返回的都是true,否則返回false。

  所以,如果當前線程沒有獲取到鎖的時候,則會繼續執行后面的acquireQueued(addWaiter(Node.EXCLUSIVE),arg))

  我們先看看addWaiter(Node.EXCLUSIVE)的實現。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 首先為當前線程以指定模式構建一個Node節點,然后嘗試快速入隊的方式加入到同步隊列中去
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);  //否則的話就調用enq(node)方法來加入到同步隊列中去
        return node;
    }

 

  再看看enq(node)的源碼實現

    private Node enq(final Node node) {
        for (;;) {  //通過一個無條件的循環,知道將構建一個空的head,然后將當前節點加入到空的head的后面,構成一個同步隊列
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

 

  再看看acquireQueued(node,int)的源碼實現

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {  //判斷前一個節點是不是head節點,如果是的話,則會再次嘗試去獲取鎖
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&  //如果前一個節點不是head節點,則設置前一個非取消節點的狀態是signal,以確保在前一個線程釋放鎖的時候能喚醒當前線程
                    parkAndCheckInterrupt())  //掛起當前線程,並且返回當前線程是否被中斷過(會清空中斷狀態,只有在被喚醒的時候才能從park()方法返回)
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);  //如果被中斷過,則把該節點從同步隊列中刪除
        }
    }

 

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

 

   acquire(1)的總結

   ①首先會去調用子類實現的具體tryAcquire(1)方法來獲取鎖,根據子類不同,實現的也不同,例如ReentrantLock中的實現就分為公平鎖和非公平鎖。非公平鎖就是不會去同步隊列中排隊,

  而是直接去獲取鎖。如果獲取失敗的話,就會跟公平鎖一樣,進入FIFO的同步隊列排隊。

   ②加入同步隊列的時候,首先會判斷tail節點是否為空,如果不為空,則會嘗試快速入隊,如果為空的話,則會先創建一個空的head節點,然后在將當前線程的節點加入到同步隊列中去

   ③加入到同步隊列之后,會再次判斷前一個節點是不是head節點,如果是的話,則會再次嘗試去獲取鎖,如果獲取失敗的話,則會掛起當前線程。

   ④直到當前線程被喚醒的時候,會判斷當前線程是否被中斷過,如果被中斷過,則會從同步隊列中刪除當前線程節點。並且中斷當前線程。

 

  下面在看看lock.unlock()方法調用的同步器中的sync.release(1)方法的具體實現

 

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 

  首先會調用tryRelease(arg)方法來釋放鎖。然后喚醒后面的線程。

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

  tryRelease()同樣也是子類需要實現的方法。ReentrantLock中的實現如下:

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

 

  

   private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

 

  

  release(1)的總結

  ①首先會判斷當前線程是不是獨占的擁有鎖。如果是的話,則會去釋放鎖。如果當前線程都已經退出了獲取鎖(可重入的原因),則會設置當前線程的state為0,獨占線程為null

  ②在釋放鎖之后,會喚醒下一個等待中的線程。

 

  

  AQS中條件隊列的實現方式參考ConditonObject實現分析點此參考

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM