AQS之獨占鎖實現原理


 

一:AQS概念

  AQS是java.util.concurrent包的一個同步器,它實現了鎖的基本抽象功能,支持獨占鎖與共享鎖兩張方式,

獨占鎖:同一時刻只允許一個線程方法加鎖資源,例如:ReentrantLock 

共享鎖:同一時刻允許多個線程方法資源,例如:countDownLatch

 

二:數據結構

  AQS 隊列內部維護的是一個 FIFO 的雙向鏈表,這種結構的特點是每個數據結構都有兩個指針,分別指向直接的后繼節點和直接前驅節點。所以雙向鏈表可以從任

意一個節點開始很方便的訪問前驅和后繼。每個 Node 其實是由線程封裝,當線程爭搶鎖失敗后會封裝成 Node 加入到 AQS 隊列中去;當獲取鎖的線程釋放鎖以
后,會從隊列中喚醒一個阻塞的節點(線程)。
 
三:ReentrantLock 實現原理分析
使用方式:
創建一個ReentrantLock鎖,然后加鎖,在加鎖之后執行獨占資源,然后在fiinally塊中釋放鎖
ReentrantLock lock = new ReentrantLock();
        lock.lock();
        try {
            System.out.println("do something......");
        } finally {
            lock.unlock();
        }

  

看一下new ReentrantLock()

默認創建了一個非公平鎖,ReentrantLock內部維護了一個Sync同步器,

 public ReentrantLock() {
        sync = new NonfairSync();
    }

  

 

 

看一下lock方法:

 

首先是比較狀態,同步器類AbstractQueuedSynchronizer維護了一個同步狀態的字段

private volatile int state;
當狀態為0時,表示資源沒有被占用,大於0則表示被占用

來一個線程首先判斷當前狀態是否是0,如果是則把state設置為1,然后把當前線程設置為資源擁有者
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

  

 因為這個同步器實現的鎖是非公平鎖,所以即使第一次沒有獲取到鎖,還會嘗試獲取鎖,非公平鎖相對於公平鎖而言效率更高,
因為在一個線程被喚醒到真正的執行任務還有一段時間,所以正在獲取鎖的線程有機會獲取並執行完任務,然后被喚醒的線程開始執行
任務。
例如:A線程持有鎖,然后B線程獲取鎖失敗進入對列等待,那么C線程來了,第一次獲取失敗,因為A沒有釋放鎖,這時A釋放鎖,喚醒了B,
但是B並沒有執行任務,C線程這時也可以去獲取鎖,執行任務,如果C的任務耗時小,可能C剛好執行完,那么B線程開始執行任務,這就是一種雙贏的局面。

 

 

這里的acquire都是同步器實現的:

 

 

 

 如果當前的狀態是0,則說明此時沒有線程占用鎖,那么設置同步器狀態,並把當前線程設置為資源擁有者

如果當前狀態不是0,則判斷當前線程是否是資源擁有者(因為支持可重入,所以可能大於0),如果是則累加狀態值

如果不是則返回失敗

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

  

 如果獲取鎖失敗,則入隊列

 

 

先看一下addWaiter方法:

CLH隊列底層維護的是一個雙向鏈表結構,每一個節點Node維護當前線程引用,前一個節點和后一個節點的引用,Node節點

還具有狀態

 static final class Node {
         
        static final Node SHARED = new Node();
        
        static final Node EXCLUSIVE = null;
         
        static final int CANCELLED =  1;
        
        static final int SIGNAL    = -1;
         
        static final int CONDITION = -2;
       
        static final int PROPAGATE = -3;
       
        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

    
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

         
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

  

看一下addWaiter方法:

第一次進來tail為null,所以會調用enq這個方法:

 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

  

來看一下enq方法:

如果tail為null,則新建一個node節點,並設置為head,然后將head引用賦值給tail,這樣head和tail都指向一個空節點Node

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

  

 

 初始化完成,新加進來的Node會被設置為tail尾部節點,然后之前最后一個節點建立pre、next連接

下面框里tail不為null,就不是第一個被放進來的node節點,直接把node設置到tail尾部。

 

 

 看一下獲取對列方法acquireQueued:

前面已經將獲取鎖失敗的線程以node節點的形式放到了CLH對列的tail尾部,這里的node就是維護當前線程的node,

如果node的前驅節點為head(head為正在執行的線程的節點),那么會再次嘗試獲取鎖。

獲取鎖成功:把讓對列的head指向node,然后將node節點維護的前驅和線程置位null,在這里拿到鎖,其實並不需要前驅節點的線程喚醒,

因為當前線程並沒有阻塞。

獲取鎖失敗:如果前驅節點不是head或者是head獲取鎖失敗,那么就會park當前線程。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  

 

 看一下shouldParkAfterFailedAcquire:

如果前驅節點的狀態為signal,則可以安全的park阻塞當前線程,因為前驅為signal狀態,說明當前驅節點維護的線程釋放鎖后,會通知當前線程。

如果狀態大於0,就是已取消,則向前遍歷,直到找到一個未取消的,已取消狀態,可能該任務已經中斷或者超時。

如果不是signal狀態,也沒有取消,那么就把前驅節點的waitStatus設置為signal,然后該節點就可以安全的park了。

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

  

 

 這里很簡單,就是把當前線程park阻塞。然后當前線程就會在這里阻塞,直到被前驅節點的線程喚醒。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

  

如果此時前一個線程執行完畢,執行unpark,那么該線程就會被喚醒。

喚醒之后,又會進入for死循環,爭搶鎖資源(因為是非公平鎖),獲取到鎖則執行代碼,獲取不到,還會執行

到park方法阻塞,如果該線程在阻塞過程中被中斷,那么喚醒后會執行中斷方法。

 

 

 

上面是非公平鎖的實現,下面來看一下公平鎖的實現,公平鎖的實現應該更簡單一些,

就是獲取鎖失敗后,直接進入對列等待,不會在獲取鎖,進入對列的時候獲取鎖,非公平鎖,在阻塞之前

有三次獲取鎖的機會。

ReentrantLock的構造方法是有參數,可以設置是否采用公平鎖:

 

 

 

看lock方法:

 

 

 

 

 公平鎖與非公平鎖的區別:

非公平鎖獲取失敗后,會再次嘗試獲取,而公平鎖直接獲取。

公平鎖:

非公平鎖:

 

 

 如果當前狀態為0,當前沒有線程占有鎖,它會先判斷對列中是否有前驅節點,如果有則不會獲取鎖,然后進入對列中

等待。

 

 

 

鎖的釋放:

 

 

 

如果線程的狀態為0,則把排他線程設置為null,如果重入次數過多,那么就需要多次unlock才可以,到最后一次unlock才會

釋放鎖

 

 

 喚醒下一個線程:

unparkSuccessor(h);

 把head節點設置為0狀態,然后下一個節點不為null,則喚醒下一個節點,如果為null,

則從tail往前遍歷,找到node下面不能為null且最近的沒有被取消的節點,然后喚醒。

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM