《java.util.concurrent 包源碼閱讀》03 鎖


Condition接口
應用場景:一個線程因為某個condition不滿足被掛起,直到該Condition被滿足了。
類似與Object的wait/notify,因此Condition對象應該是被多線程共享的,需要使用鎖保護其狀態的一致性
 
示例代碼:
class BoundedBuffer {
     final Lock lock = new ReentrantLock();
     final Condition notFull     = lock.newCondition();
     final Condition notEmpty = lock.newCondition();

     final Object[] items = new Object[100];
     int putptr, takeptr, count;

     public void put(Object x) throws InterruptedException {
          lock.lock();
          try {
               while (count == items.length)
                    notFull.await();
               items[putptr] = x;
               if (++putptr == items.length) putptr = 0;
               ++count;
               notEmpty.signal();
          } finally {
               lock.unlock();
          }
     }         

     public Object take() throws InterruptedException {
          lock.lock();
          try {
               while (count == 0)
                    notEmpty.await();
               Object x = items[takeptr];
               if (++takeptr == items.length) takeptr = 0;
               --count;
               notFull.signal();
               return x;
          } finally {
               lock.unlock();
          }
     }
}

 

以上代碼可以很清楚的看出Condition是如何使用的,后面的BlockingXXX類型的數據結構都會使用到Condition。
 
在使用signal(類似於notify)通知的時候需要實現按照什么樣的順序來通知。
 
三種等待方式:不中斷,一定時間間隔,等到某個時間點
 
Lock和ReadWriteLock
兩個接口,后者不是前者的子接口,通過以下ReadWriteLock的代碼就可以看出兩者的聯系了:
 
public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading.
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing.
     */
    Lock writeLock();
}

 

 
兩個接口都有一個可重入(ReentrantLock, ReentrantReadWriteLock)的實現,后面分析
 
LockSupport
工具類,操作對象是線程,基於Unsafe類實現。
基本操作park和unpark。park會把使得當前線程失效(沒有提供操作其他線程的,其實是可以實現的),暫時掛起,直到出現以下幾種情況中的一種:
1)其他線程調用unpark方法操作該線程   2)該線程被中斷    3)park方法立刻返回
關於blocker,線程掛起的同步對象,blocker不是必須的, 作用是什么呢?
park有三種時間類別的調用
public static void park()
public static void parkNanos(long nanos)
public static void parkUntil(long deadline)
 
上面三個方法有對應的重載方法,就是加一個blocker對象作為參數
public static void park(Object blocker)
public static void parkNanos(Object blocker, long nanos)
public static void parkUntil(Object blocker, long deadline)
 
關於unpark
public static void unpark(Thread thread)
開始的時候不明白為什么沒有public static void unpark()操作當前線程,后來一想,一個線程park的時候已經被block了,沒有可能調用unpark來自救的。
 
AbstractOwnableSynchronizer, AbstractQueuedSynchronizer, AbstractQueuedLongSynchronizer
后兩者是第一個類的子類。
最后一個類是從JDK6才開始出現的,還沒有具體實現的子類
中間一個類的子類實現會在可重入鎖里面
AbstractOwnableSynchronizer只是實現了被線程獨占這些功能的Synchronizer,並不包含如何管理實現多個線程的同步。包含了一個exclusiveOwnerThread,set/get方法。
 
AbstractQueuedSynchronizer利用Queue的方式來管理線程關於鎖的使用和同步,相當於一個鎖的管理者。
首先關注四個最核心的方法:
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
 
前兩個用於獨占鎖,后兩者用於共享鎖,這四個方法是由子類來實現的,即如何獲取和釋放鎖AbstractQueuedSynchronizer是不參與的,默認實現是不支持,即拋出UnsupportedOperationException。
AbstractQueuedSynchronizer做什么呢?
 
當 前線程嘗試獲取鎖的時候,AbstractQueuedSynchronizer會先調用tryAcquire或者tryAcquireShared來嘗 試獲取,如果得到false,那么把當前線程放到等待隊列中去,然后再做進一步操作。我們來分析以下6種情況,前三種用於獨占鎖,后三者用於共享,獨占鎖 或者共享鎖按照等待方式又分為三種:不可中斷線程等待,可中斷線程等待,嘗試限時等待超時放棄。
這6種的方法都含有一個int類型的參數,這個是給上面的tryAcquire這種方法使用的,也就是說它一個自定義的參數,一般用來表示某個自定義的狀態。
1) 獨占鎖,放入隊列后,直到成功獲取鎖,會忽略線程的中斷
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
 
注意:隊列中的header節點是個dummy節點,真正等待的第一個節點是header后面的節點。
但是在acquire成功后中,如果發現這是第一個等待節點,dummy的header會被設置成這個節點,但是
prev和thread是null,相當於把原來的dummy的header移走,換成一個新的dummy的Node。
addWaiter()方法把當前線程加入到等待隊列中去,返回一個Node對象
acquireQueued()方法會監控Node對象在隊列中的變化,如果檢測到線程中斷,返回true,否則返回false.
如果等待期間檢測到中斷信號,也就是acquireQueued返回了true,會用selfInterrupt中斷當前線程。
 
acquireQueued源代碼
    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
               //檢測自己是否已經排到第一個了
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
               //shouldParkAfterFailedAcquire的作用檢測我是不是需要安心地等,如果是的話,
               //就調用parkAndCheckInterrupt進入等待,等待結束后會返回線程是否已經中斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

 

shouldParkAfterFailedAcquire的源代碼
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //SIGNAL的意思就是說前面那個家伙在釋放鎖以后會告訴我的,我安心等就是了
            return true;
        if (ws > 0) {
            /*
             * ws大於0的意思就是這個Node已經被取消了,需要跳過,並且從隊列中清除出去
             * 這里會清除我前面所有這種類型的Node
             */
         do {
          node.prev = pred = pred.prev;
         } while (pred.waitStatus > 0);
         pred.next = node;
        } else {
            /*
             * 這里ws是0或者PROPAGATE,表示他是傻乎乎的家伙,還不知道SIGNAL規則
             * 0是獨占鎖,PROPAGATE是共享鎖,compareAndSetWaitStatus會找人把他設成SIGNAL
             * 狀態,(成功與否未知,所以返回false)參見上面關於SIGNAL的解釋 
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

 

 
2) 獨占鎖,放入隊列后,直到成功獲取鎖或者遇到中斷
    public final void acquireInterruptibly(long arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

 

 

 
這里與1)的區別在於調用doAcquireInterruptibly,而實際上doAcquireInterruptibly和acquireQueued區別很小,前者不會中斷,僅此而已,參看紅色部分,發現中斷,直接break,然后取消獲取鎖的打算。
doAcquireInterruptibly的源代碼
    private void doAcquireInterruptibly(long arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    }
 
3)限時
doAcquireNanos的源代碼
    private boolean doAcquireNanos(long arg, long nanosTimeout)
        throws InterruptedException {
        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return true;
                }
                if (nanosTimeout <= 0) {
                    cancelAcquire(node);
                    return false;
                }

               // 多次parkNanos,計算實際耗費的時間才是安全的做法
                if (nanosTimeout > spinForTimeoutThreshold &&
                    shouldParkAfterFailedAcquire(p, node))
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
                nanosTimeout -= now - lastTime;
                lastTime = now;

               // 如果線程被中斷,不好意思,要拋出異常的
                if (Thread.interrupted())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    }

 

 
注意:計算的安全的做法不是一次等待,立刻超時,因為一次等待的時間不一定等於預先設定的值,而是多次等待,累計計算比較安全。
 
對於4),5),6)的共享鎖,做法與獨占鎖幾乎一致。
第一個區別來自於,當排隊輪到自己的時候,調用的setHeadAndPropagate方法相對於setHead要復雜一些,這是由於獨占鎖和共享鎖的區別決定的。
 
setHeadAndPropagate的源代碼???
    private void setHeadAndPropagate(Node node, long propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         * Propagation was indicated by caller,
         * or was recorded (as h.waitStatus) by a previous operation
         * (note: this uses sign-check of waitStatus because
         * PROPAGATE status may transition to SIGNAL.)
         * and
         * The next node is waiting in shared mode,
         * or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

 

 
對於4)來說,doAcquireShared把selfInterrupt()挪到自己的方法內
 
2) 關於release部分
作用,釋放鎖,喚醒下一個等待的線程
 
AbstractQueuedLongSynchronizer
AbstractQueuedLongSynchronizer和AbstractQueuedSynchronizer的區別在於acquire和release的arg參數是long而不是int類型。
 
ReentrantLock
所謂可重入鎖,就是當一個thread已經獲得一個lock的時候,再次請求該鎖的時候,會立即返回。
使用AbstractQueuedSynchronizer的子類(Sync, NonfairSync, FairSync)進行鎖獲取釋放的管理。
 
state等於0表示當前沒有線程占用鎖,下面兩個獲取鎖的過程基本類似,共同的過程是
首先檢查有沒有線程使用該鎖,沒有的話就占用該並且setState,否則就檢查那個占用鎖的線程是不是當前線程,如果是的話,
僅僅setState,否則就返回false。
Sync#nonfairTryAcquire
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

 

FairSync#tryAcquire
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (isFirst(current) &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

 

對於FairSync,唯一的不同在於isFirst的調用,而UnfairSync則完全不會檢查,誰搶到就是誰的。
    final boolean isFirst(Thread current) {
        Node h, s;
        return ((h = head) == null ||
                ((s = h.next) != null && s.thread == current) ||
                fullIsFirst(current));
    }

 

isFirst會檢查有沒有線程排隊,如果沒有,當前線程就可以獲得鎖,如果有隊列,就看當前線程是不是排第一個。
 
Sync#tryRelease
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

 

tryRelease會檢查當前線程有沒有占有鎖,如果不是回拋出異常。接着會從state中減去計數得到新的state,如果state為0表示所有的鎖都被釋放了。
 
ReentrantReadWriteLock
ReentrantReadWriteLock比較復雜,因為同事管理共享鎖(讀取鎖)和獨占鎖(寫入鎖)。
也對應使用AbstractQueuedSynchronizer的子類(Sync, NonfairSync, FairSync)進行鎖獲取釋放的管理。(名字一樣但是實現是不同的)。
Sync類
1) Sync的state是32位的,高位的16位是共享鎖的狀態,低位的16位是獨占鎖的狀態。
 
        /*
         * Note that tryRelease and tryAcquire can be called by
         * Conditions. So it is possible that their arguments contain
         * both read and write holds that are all released during a
         * condition wait and re-established in tryAcquire.
         */

        protected final boolean tryRelease(int releases) {
            int nextc = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            // 首先檢查獨占鎖計數,如果是0表示獨占鎖已經被完全釋放,則清除獨占鎖線程
            // 更新狀態
            if (exclusiveCount(nextc) == 0) {
                setExclusiveOwnerThread(null);
                setState(nextc);
                return true;
            } else {
                setState(nextc);
                return false;
            }
        }

        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. if read count nonzero or write count nonzero
             *     and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // c != 0表示有共享鎖或者獨占鎖存在,w == 0表示沒有獨占鎖
                // 那么兩個條件同時成立表示有共享鎖存在,就無法獲得獨占鎖
                // 或者有線程擁有獨占鎖但不是當前線程,那也無權獲得獨占鎖
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            // 到了這一步,可能有以下幾種情況:
            // 1) c == 0沒有任何鎖存在(這個時候 w == 0 也成立)
            // 2) 當前線程擁有獨占鎖,並且還沒到鎖的最大限制數

            // w == 0是當前線程沒有獨占鎖,屬於新申請
            // writerShouldBlock是抽象方法,對於FairSync和UnfairSync有不同實現
            // 該發現檢查當前線程申請獨占鎖應不應該被阻止
            // 對於FairSync,writerShouldBlock會用isFirst檢查,
            // 對於isFirst,如果如果沒人排隊,或者你是第一個排隊的,或者fullIsFirst就返回true
            // 對於fullIsFirst,不是很理解
            // 對於UnfairSync,writerShouldBlock永遠返回false,因為沒有排隊的概念(體現Unfair)
            if ((w == 0 && writerShouldBlock(current)) ||
                !compareAndSetState(c, c + acquires))
                return false;

            // 獲取獨占鎖成功,設置獨占鎖線程
            setExclusiveOwnerThread(current);
            return true;
        }

        // 這里使用HoldCounter類型的ThreadLocal變量存儲當前線程擁有的共享鎖的計數
        // cachedHoldCounter緩存最近一次成功獲取共享鎖的線程的ThreadLocal變量
        protected final boolean tryReleaseShared(int unused) {
            HoldCounter rh = cachedHoldCounter;
            Thread current = Thread.currentThread();
            if (rh == null || rh.tid != current.getId())
                rh = readHolds.get();
            // tryDecrement()返回擁有的共享鎖的計數,大於0則並且更新計數(減1)。
            if (rh.tryDecrement() <= 0)
                throw new IllegalMonitorStateException();

            // 更新共享鎖的計數
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT; // 高位的共享鎖計數減一
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

        protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail
             * 2. If count saturated, throw error
             * 3. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 4. If step 3 fails either because thread
             *    apparently not eligible or CAS fails,
             *    chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            // 其他線程正在使用獨占鎖
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 共享鎖計數到達最大限制
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");

            // 類似於writerShouldBlock,readerShouldBlock是抽象方法,有不同實現,
            // 檢查是不是阻止當前線程共享鎖的申請
            // 對於UnfairSync,為了防止獨占鎖餓死的情況,如果發現隊列中第一個排隊的是獨占鎖申請,
            // 就是block當前共享鎖的申請
            // 對於FairSync,同樣使用isFirst檢查當前線程
            if (!readerShouldBlock(current) &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != current.getId())
                    cachedHoldCounter = rh = readHolds.get();
                rh.count++;
                return 1;
            }
 
            // 針對CAS失敗或者一些不太常見的失敗的情況
            // 思想:實現常規版本和完整版本(包含所有情況),在常規版本失敗的情況下調用完整版本, 提高效率
            return fullTryAcquireShared(current);
        }

 

fullTryAcquireShared
增加計數緩存,以及紅色部分
        /**
         * Full version of acquire for reads, that handles CAS misses
         * and reentrant reads not dealt with in tryAcquireShared.
         */
        final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != current.getId())
                rh = readHolds.get();
            for (;;) {
                int c = getState();
                int w = exclusiveCount(c);
                // 紅色部分表示沒有占用共享鎖,新申請共享鎖
                if ((w != 0 && getExclusiveOwnerThread() != current) ||
                    ((rh.count | w) == 0 && readerShouldBlock(current)))
                    return -1;
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    cachedHoldCounter = rh; // cache for release
                    rh.count++;
                    return 1;
                }
            }
        }

 

 
關於ReadLock和WriteLock
調用AbstractQueuedSynchronizer對於的共享鎖/獨占鎖的acquire和release的方法來實現
 
Lock可以看出是沒有只有獨占鎖的鎖
 
Node#nextWaiter
null, 表示獨占鎖的Node
SHARED, 表示共享鎖的Node
其他, 某個條件下的下一個等待者


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM