另一種鎖:ReentractLock


1,概述

前面我們說了volatile關鍵字,主要是為了處理並發讀寫場景,解決可見性的問題。然后又講了synchronized鎖,處理並發寫的場景,解決原子性的問題。接着又說了Atomic原子系列,它是基於基本數據類型或者引用來解決原子問題,底層是基於CAS(compare and set),無鎖化(樂觀鎖)。這篇文章主要來說說ReentractLock,先演示一個demo,對ReentractLock有個基本的了解。

public class ReentractLockDemo {
    #獲取一個lock鎖
    static ReentrantLock lock = new ReentrantLock();
    #可修改的共享變量
    static volatile int flag = 0;
    public static void main(String[] args) {
        new Thread() {
            @Override
            public void run() {
                while (true) {
                    //獲取鎖
                    lock.lock();
                    flag++;
                    System.out.println(Thread.currentThread().getName() + "線程修改變后的變量為" + flag);
//釋放鎖 lock.unlock();
try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); new Thread() { @Override public void run() { while (true) {
//獲取鎖 lock.lock(); flag
++; System.out.println(Thread.currentThread().getName() + "線程修改變后的變量為" + flag);
//釋放鎖 lock.unlock();
try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }

 2,AQS(Abstract Queued Synchronizer)

2.1 簡述

抽象隊列同步器。經常聽到這個詞,本文就要分析下這到底是啥含義。並發包的鎖都是基於AQS來實現的,一般我們開發是不直接接觸的,它是並發的基礎,java並發包底層的API。我們先畫個圖,了解下AQS的含義:

過程簡述一下,當第一個線程獲取鎖時,將state狀態+1,變成了1。此時當前加鎖線程為線程1.然后線程2來獲取鎖,發現state不等於0,也就是有人占有了鎖,此時線程2就到一個隊列中排隊。這時候線程3,線程N會依次來排隊掛起。線程1處理任務完畢,將會喚醒隊列中的線程,然后線程就去爭取鎖,獲取到鎖的線程就會出隊列,重新改變state的值,將當前線程變為自己的。

 

大體過程如上所示,不過這樣說起來還是太抽象了。我們從源碼中去探索,看看是為啥叫AQS,隊列是怎么實現的,怎么就實現了鎖的功能了。。。

 

2.2 源碼剖析

2.2.1首先看看ReentrantLock的構造函數。

    public ReentrantLock() {
#當我們點進構造函數以后,發現里面構造了NonfairSync()對象。通過這里我們就能發現,ReentrantLock應該是一個外殼,真正執行功能的對象應該就是NonfairSync了。 sync
= new NonfairSync(); }
#我們順勢就點開NonfairSync的構造函數了。發現它也是ReentrantLock的內部類,繼承自Sync對象,
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }

通過上面的源代碼,就知道了ReentrantLock里面有內部類:NonfairSync、FairSync、Sync。NonfairSync、FairSync是Sync的子類,其中NonfailSync是默認構造函數實現的(也就是非公平鎖,后面會講解它和FairSync的區別)。而且ReentrantLock就是一個外殼,真正功能的實現就是Sync這些對象。我們再看看Sync這個類,發現他是一個抽象類,繼承了AbstractQueuedSynchronizer這個類。源碼如下:

abstract static class Sync extends AbstractQueuedSynchronizer

 這時候發現AbstractQueuedSynchronizer。也就是我們之前說的AQS,抽象隊列同步器。很明顯了,核心的方法都應該在AbstractQueuedSynchronizer類里面了。來看看AbstractQueuedSynchronizer的核心的字段:

private transient Thread exclusiveOwnerThread;

#這是一個指針的頭部
private transient volatile Node head; #指針的尾部 private transient volatile Node tail; #狀態變量state private volatile int state; #這個應該似曾相識了,之前Atomic原子系列的CAS,就是基於unsafe來實現的。這里大概也能猜測到了,ReentractLock底層是基於CAS無鎖化來實現的了 private static final Unsafe unsafe = Unsafe.getUnsafe();
#下面是一些變量的內存指針。
private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset;

 看了上面的核心成員變量,大概能知道:之前所謂的隊列應該就是一個雙向鏈表實現的,獲取到鎖之后就更新state的狀態。Node是AQS實現的一種數據結構,我們可以看看:

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

 2.2.2 lock()方法。

 //調用lock方法之后,底層調用的是Sync的lock方法
public void lock() { sync.lock(); } //發現Sync只是聲明了這個方法,具體實現是子類來實現的 abstract void lock();

#這是默認的子類NonfairSync的lock()方法的實現。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
compareAndSetState(0,1):
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

這是采用了CAS操作來判斷state字段,如果state=0,就設置為1:表示某個線程第一次獲取鎖,將state狀態從0到1,並將當前占用線程設置為自己:

setExclusiveOwnerThread(Thread.currentThread())
否則,acquire(1):
public final void acquire(int arg) {
#在這里點進tryAcquire()方法,發現在AQS里面也是沒有實現,所以查看了
NonfairSync的tryAcquire()方法了。
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
NonfairSync的tryAcquire()方法:
     final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
#首先獲取state的狀態
int c = getState();
#如果等於0,說明沒有線程獲取到鎖,
if (c == 0) {
#此時該線程獲取到鎖,並將自己設為當前線程
if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
#否則說明有人占有鎖,那么再判斷下占有鎖的線程是否是自己,是的話將state+1.這是什么意思呢??就是同一個線程可以獲取到多次鎖,也就是可重入鎖。
else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

以上條件表示的就是獲取鎖成功的情況。如果獲取鎖失敗會怎樣??回到方法:

if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

如果獲取鎖失敗,那么第一個條件就是true了。進入第二個條件:acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
先看看addWaiter方法源碼是怎么走的:
    
#該方法是構建一個node節點,然后調用enq方法
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

 enq源碼:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

最后執行acquireQueued:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

上面幾個方法其實就是基於鏈表來操作的,我覺得還是畫個圖來比較直觀。

場景:首先是線程1來獲取鎖,獲取成功,然后線程2,線程3來獲取鎖,獲取失敗,加入到隊列的過程:

線程1獲取鎖成功:

線程2獲取鎖失敗,加入隊列的過程:

 以上方法是加入到隊列,至於線程掛起的源碼如下:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
//此時線程2會走到這里.
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

里面讓線程2掛起的源碼:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

線程3獲取鎖失敗,加入隊列:

 通過上面基於源碼來畫圖,就可以知道了。首先線程1,獲取到了鎖,其他線程就會形成一個雙向鏈表,也就是所謂的隊列操作,並且同步阻塞了。同時這里還有一點,就是在掛起之前,會再次嘗試獲取鎖,這也就是為啥叫非公平鎖了。比如線程3在掛起的時候就獲取到了鎖,那么提前等待的線程2,就明顯不公平了。至於怎么喚醒這些隊列的線程,后面再分析。這里還有個值得學習的地方LockSupport.park(this):表示對當前線程的掛起。

 2.2.3 unlock()方法。

    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
//AQS也是沒有實現的,在ReentrantLock里面有具體的實現。
if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

tryRelease的實現:

        protected final boolean tryRelease(int releases) {
//首先將state的變量值-1
int c = getState() - releases;
//不是占有的線程去釋放,就報異常
if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

上面這段代碼的意思是,當線程執行unlock()方法之后,state就一直減1,直到0為止,然后將當前加鎖線程設為NULL。從隊列中喚醒的源碼就是這一段了:

            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;

核心在unparkSuccessor()方法,我們去看一下:

    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //按照上文的分析,這個s其實就是線程2
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

從這里可以看出來當線程1執行unpark()方法的時候,就會喚醒隊列的頭結點的next節點,也就是線程2所在的節點了。當線程2喚醒以后,就從上文掛起的地方活過來,又開始爭搶鎖的操作。回到源碼這一段中:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

此時應該從掛起的地方蘇醒過來,重新執行上文中的for循環了。然后線程2就會獲取到鎖,當前加鎖的線程=線程2.然后將鏈表的head下移一位,鏈表的線程2設置為NULL。這樣就相當於線程2從隊列中彈出了。最后再來回顧一下前面的操作:lock()方法,就是獲取鎖的操作,當線程1來的時候,沒有人持有鎖,所以線程1獲取鎖成功,修改state狀態,設置當前線程為線程1;這時候線程2,線程3依次過來那么線程2,線程3會形成一個雙向鏈表(隊列),並且會同步掛起。當線程1執行了unlock()方法,其實就是修改state狀態,將當前線程置為NULL,然后最重要的一步是喚醒了隊列頭的線程。這就是為啥能滿足先進先出的原因了。此時線程2被喚醒以后,修改state狀態,將當前線程=線程2,然后將鏈表head的線程與整個鏈表斷開,然后把鏈表中的線程2設置為NULL的Node。整個鎖的操作設計的很巧妙,通過一個state狀態來控制,非常值得大家學習這個過程。

3,公平鎖和非公平鎖

 前面我們知道默認情況下是使用非公平鎖來實現的,而且非公平鎖的性能高。但是這是為什么呢?我們通過源碼來剖析原因。

ReentractLock有兩種構造函數

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

前面我們用的都是默認的構造函數。當傳入參數true,則使用的是公平鎖的方式了。同理我們也要再次看lock()方法和unlock()方法了。因為前面詳細分析過,這部分我們主要查看有哪些不同。

FairSync方法實現的lock()方法中,

        final void lock() {
            acquire(1);
        }

這和之前的非公平鎖是不一樣的,非公平鎖的實現再貼一下:

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

區別是什么??非公平鎖上來就是直接判斷state是否等於0,等於0就嘗試獲取鎖。意味着什么呢?意味着就算沒有進入隊列的線程,也可以直接參與爭搶鎖。類比很多人在排隊買東西,有個人直接過來就買了,不排隊了,,不公平!!

繼續看下去,下面就是方法acquire():

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

這個方法前面都分析過。這里我們唯一需要看的就是tryAcquire()方法,因為它是具體子類的實現,我們之前看的都是NonfairSync,這里應該看FairSync了:

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
#如果state=0,也就是沒有人占有鎖的情況下,會先去查看線程是否在隊列中,這樣就能徹底實現公平了。
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

這里也就發現了。公平鎖和非公平鎖的差別就是在爭搶鎖的時候是否會判斷隊列。公平鎖會嚴格的去判斷隊列里是否含有該線程,所以會有遍歷鏈表的操作,這也是為啥性能沒有非公平鎖高了。。但是公平鎖嚴格遵循公平的原則了,所以使用哪一個就要根據場景了。一般情況下是使用非公平鎖,提高性能。

 4,讀寫鎖

 分析完了ReentrantLock以后,再來談談讀寫鎖。它是將讀寫進行了分離,讀線程可以同時多個並行,寫請求只有一個線程,讀寫是互斥的。先寫個demo,然后方便分析讀寫鎖的源碼:

public class ReentractReadWriteLockDemo {

    
    static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    //獲取寫鎖
    static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    //獲取讀鎖
    static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    public static void main(String[] args) {
        writeLock.lock();

        writeLock.unlock();

        readLock.lock();

        readLock.unlock();

    }
}

首先看看ReentrantReadWriteLock的構造函數的內容:

public ReentrantReadWriteLock() {
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
//默認的是非公平鎖 sync
= fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }

//通過這里可以看出來,所謂的readLock或者是WriteLock底層的實現其實就是sync,ReentrantReadWriteLock就是一個包裝的外殼。
ReadLock和WriteLock都是ReentrantReadWriteLock都是其內部類。
   protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
 

寫鎖的lock()方法剖析:

        public void lock() {
            sync.acquire(1);
        }
       #這個方法是不是很熟悉。就是AQS類的方法,tryAcquire()方法在AQS沒有實現,所以我們應該看看你它的具體實現。
       public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

那么重點就是看砍tryAcquire()方法了,通過之前的分析已經很清楚了,這個方法其實就是加鎖的操作,具體看看它的實現:

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
//獲取state的值
int c = getState();
//這是重點:讀寫鎖是巧妙的運用了state是int類型,32位,將前面的16位(高位)作為讀鎖,后面的16位(低位)作為寫鎖的標志。這就是獲取低16位,寫鎖。
int w = exclusiveCount(c);
//說明是加鎖了
if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0)
//說明是加了讀鎖或者是加了寫鎖,但是 不是該線程。 if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; }
//前一個方法表示是公平鎖實現還是非公平鎖實現的,公平鎖還要再判斷是否在隊列里,非公平鎖則不需要。
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }

寫鎖的整個流程還是很清晰的,我們可以根據源碼畫圖,場景:線程1第一次來獲取鎖,線程1重入加鎖,線程2來獲取鎖,但是沒有成功,入隊列:

 

加讀鎖,源碼如下:

        public void lock() {
            sync.acquireShared(1);
        }


     public final void acquireShared(int arg) {
//可以發現AQS也沒有實現tryAcquireShared()方法,那么看子類是如何實現的,也是重點剖析的
if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
tryAcquireShared()方法的具體實現:
        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
//判斷別的線程是否占有了寫鎖
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1;
//獲取高16位,判斷是否加了讀鎖
int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }

假設這時候線程3來加讀鎖,如果獲取失敗,進入方法:

doAcquireShared:
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這時候的圖形是:

 

 釋放鎖的過程也是雷同的。只要一步一步去分析,就ok了。主要是學會根據源碼畫圖,會分析這個流程,我覺得這個技能是必備的。

 5,condition

condition的await()和signal(),和wait(),notify()其實是實現相同的功能的。我們看看這里看看await()和signal()是如何基於AQS來實現這種效果的呢??先給個演示的demo。

public class ConditionDemo {

    static ReentrantLock lock = new ReentrantLock();

    static Condition condition = lock.newCondition();

    public static void main(String[] args) {


        new Thread() {
            @Override
            public void run() {

                lock.lock();
                System.out.println("第一個線程await之前的操作............");

                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("第一個線程await之后的操作............");
                lock.unlock();
            }
        }.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread() {
            @Override
            public void run() {
                lock.lock();
                System.out.println("第二個線程喚醒線程1之前............");
                condition.signal();
                System.out.println("第二個線程喚醒線程1之后............");
                lock.unlock();
            }
        }.start();
    }
}

我們基於這個demo來畫出演示圖,源碼就不粘貼了。當線程1執行完await()方法,線程2獲取到鎖的時候:

 

當線程2執行完signal()操作會怎樣呢?

 其實可以發現,當線程2執行完signal()方法,其實是將線程1從condition隊列放到了阻塞等待隊列。這樣線程1又可以等待喚醒,來繼續獲取鎖了。這兩個圖就能完全說明await()和signal()方法的功能了。總結下:線程1執行await()方法其實是將自己放入到一個condition隊列,並將自己掛起。然后線程2執行signal()方法,會從condition隊列轉到阻塞等待隊列。然后線程2釋放鎖,就會喚醒線程1來執行了。



 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM