java.util.concurrent包學習(一)鎖的基本原理


  與synchronized不同,Lock完全用Java寫成,在java這個層面是無關JVM實現的。

  在java.util.concurrent.locks包中有很多Lock的實現類,常用的有ReentrantLock、ReadWriteLock(實現類ReentrantReadWriteLock),其實現都依賴java.util.concurrent.AbstractQueuedSynchronizer類,實現思路都大同小異,因此我們以ReentrantLock作為講解切入點。

 

1. ReentrantLock的調用過程

經過觀察ReentrantLock把所有Lock接口的操作都委派到一個Sync類上,該類繼承了AbstractQueuedSynchronizer:

static abstract class Sync extends AbstractQueuedSynchronizer

Sync又有兩個子類:

final static class NonfairSync extends Sync

final static class FairSync extends Sync  

  顯然是為了支持公平鎖和非公平鎖而定義,默認情況下為非公平鎖。

  先理一下Reentrant.lock()方法的調用過程(默認非公平鎖):

 

  這些討厭的Template模式導致很難直觀的看到整個調用過程,其實通過上面調用過程及AbstractQueuedSynchronizer的注釋可以發現,AbstractQueuedSynchronizer中抽象了絕大多數Lock的功能,而只把tryAcquire方法延遲到子類中實現。tryAcquire方法的語義在於用具體子類判斷請求線程是否可以獲得鎖,無論成功與否AbstractQueuedSynchronizer都將處理后面的流程。

2. 鎖實現(加鎖)

  簡單說來,AbstractQueuedSynchronizer會把所有的請求線程構成一個CLH隊列,當一個線程執行完畢(lock.unlock())時會激活自己的后繼節點,但正在執行的線程並不在隊列中,而那些等待執行的線程全部處於阻塞狀態,經過調查線程的顯式阻塞是通過調用LockSupport.park()完成,而LockSupport.park()則調用sun.misc.Unsafe.park()本地方法,再進一步,HotSpot在Linux中中通過調用pthread_mutex_lock函數把線程交給系統內核進行阻塞。

該隊列如圖:

 

  

  與synchronized相同的是,這也是一個虛擬隊列,不存在隊列實例,僅存在節點之間的前后關系。令人疑惑的是為什么采用CLH隊列呢?原生的CLH隊列是用於自旋鎖,但Doug Lea把其改造為阻塞鎖。

  當有線程競爭鎖時,該線程會首先嘗試獲得鎖,這對於那些已經在隊列中排隊的線程來說顯得不公平,這也是非公平鎖的由來,與synchronized實現類似,這樣會極大提高吞吐量。

  如果已經存在Running線程,則新的競爭線程會被追加到隊尾,具體是采用基於CAS的Lock-Free算法,因為線程並發對Tail調用CAS可能會導致其他線程CAS失敗,解決辦法是循環CAS直至成功。AbstractQueuedSynchronizer的實現非常精巧,令人嘆為觀止,不入細節難以完全領會其精髓,下面詳細說明實現過程:

 

2.1 Sync.nonfairTryAcquire

nonfairTryAcquire方法將是lock方法間接調用的第一個方法,每次請求鎖時都會首先調用該方法。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

  該方法會首先判斷當前狀態,如果c==0說明沒有線程正在競爭該鎖,如果不c !=0 說明有線程正擁有了該鎖。

  如果發現c==0,則通過CAS設置該狀態值為acquires,acquires的初始調用值為1,每次線程重入該鎖都會+1,每次unlock都會-1,但為0時釋放鎖。如果CAS設置成功,則可以預計其他任何線程調用CAS都不會再成功,也就認為當前線程得到了該鎖,也作為Running線程,很顯然這個Running線程並未進入等待隊列。

  如果c !=0 但發現自己已經擁有鎖,只是簡單地++acquires,並修改status值,但因為沒有競爭,所以通過setStatus修改,而非CAS,也就是說這段代碼實現了偏向鎖的功能,並且實現的非常漂亮。

2.2 AbstractQueuedSynchronizer.addWaiter

   addWaiter方法負責把當前無法獲得鎖的線程包裝為一個Node添加到隊尾:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

其中參數mode是獨占鎖還是共享鎖,默認為null,獨占鎖。追加到隊尾的動作分兩步:

  1. 如果當前隊尾已經存在(tail!=null),則使用CAS把當前線程更新為Tail
  2. 如果當前Tail為null或則線程調用CAS設置隊尾失敗,則通過enq方法繼續設置Tail

下面是enq方法:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                Node h = new Node(); // Dummy header
                h.next = node;
                node.prev = h;
                if (compareAndSetHead(h)) {
                    tail = node;
                    return h;
                }
            }
            else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

  該方法就是循環調用CAS,即使有高並發的場景,無限循環將會最終成功把當前線程追加到隊尾(或設置隊頭)。總而言之,addWaiter的目的就是通過CAS把當前現在追加到隊尾,並返回包裝后的Node實例。

把線程要包裝為Node對象的主要原因,除了用Node構造供虛擬隊列外,還用Node包裝了各種線程狀態,這些狀態被精心設計為一些數字值:

  • SIGNAL(-1) :線程的后繼線程正/已被阻塞,當該線程release或cancel時要重新這個后繼線程(unpark)
  • CANCELLED(1):因為超時或中斷,該線程已經被取消
  • CONDITION(-2):表明該線程被處於條件隊列,就是因為調用了Condition.await而被阻塞
  • PROPAGATE(-3):傳播共享鎖
  • 0:0代表無狀態

2.3 AbstractQueuedSynchronizer.acquireQueued

  acquireQueued的主要作用是把已經追加到隊列的線程節點(addWaiter方法返回值)進行阻塞,但阻塞前又通過tryAccquire重試是否能獲得鎖,如果重試成功能則無需阻塞,直接返回

    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

  仔細看看這個方法是個無限循環,感覺如果p == head && tryAcquire(arg)條件不滿足循環將永遠無法結束,當然不會出現死循環,奧秘在於第12行的parkAndCheckInterrupt會把當前線程掛起,從而阻塞住線程的調用棧。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

  如前面所述,LockSupport.park最終把線程交給系統(Linux)內核進行阻塞。當然也不是馬上把請求不到鎖的線程進行阻塞,還要檢查該線程的狀態,比如如果該線程處於Cancel狀態則沒有必要,具體的檢查在shouldParkAfterFailedAcquire中:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
        do {
        node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE. Indicate that we
             * need a signal, but don't park yet. Caller will need to
             * retry to make sure it cannot acquire before parking. 
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        } 
        return false;
    }

檢查原則在於:

  • 規則1:如果前繼的節點狀態為SIGNAL,表明當前節點需要unpark,則返回成功,此時acquireQueued方法的第12行(parkAndCheckInterrupt)將導致線程阻塞
  • 規則2:如果前繼節點狀態為CANCELLED(ws>0),說明前置節點已經被放棄,則回溯到一個非取消的前繼節點,返回false,acquireQueued方法的無限循環將遞歸調用該方法,直至規則1返回true,導致線程阻塞
  • 規則3:如果前繼節點狀態為非SIGNAL、非CANCELLED,則設置前繼的狀態為SIGNAL,返回false后進入acquireQueued的無限循環,與規則2同

總體看來,shouldParkAfterFailedAcquire就是靠前繼節點判斷當前線程是否應該被阻塞,如果前繼節點處於CANCELLED狀態,則順便刪除這些節點重新構造隊列。

至此,鎖住線程的邏輯已經完成,下面討論解鎖的過程。

 

3. 解鎖

  請求鎖不成功的線程會被掛起在acquireQueued方法的第12行,12行以后的代碼必須等線程被解鎖鎖才能執行,假如被阻塞的線程得到解鎖,則執行第13行,即設置interrupted = true,之后又進入無限循環。

從無限循環的代碼可以看出,並不是得到解鎖的線程一定能獲得鎖,必須在第6行中調用tryAccquire重新競爭,因為鎖是非公平的,有可能被新加入的線程獲得,從而導致剛被喚醒的線程再次被阻塞,這個細節充分體現了“非公平”的精髓。通過之后將要介紹的解鎖機制會看到,第一個被解鎖的線程就是Head,因此p == head的判斷基本都會成功。

至此可以看到,把tryAcquire方法延遲到子類中實現的做法非常精妙並具有極強的可擴展性,令人嘆為觀止!當然精妙的不是這個Templae設計模式,而是Doug Lea對鎖結構的精心布局。

解鎖代碼相對簡單,主要體現在AbstractQueuedSynchronizer.release和Sync.tryRelease方法中:

class AbstractQueuedSynchronizer

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 

class Sync

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

  tryRelease與tryAcquire語義相同,把如何釋放的邏輯延遲到子類中。tryRelease語義很明確:如果線程多次鎖定,則進行多次釋放,直至status==0則真正釋放鎖,所謂釋放鎖即設置status為0,因為無競爭所以沒有使用CAS。

release的語義在於:如果可以釋放鎖,則喚醒隊列第一個線程(Head),具體喚醒代碼如下:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling. It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0); 

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

  這段代碼的意思在於找出第一個可以unpark的線程,一般說來head.next == head,Head就是第一個線程,但Head.next可能被取消或被置為null,因此比較穩妥的辦法是從后往前找第一個可用線程。貌似回溯會導致性能降低,其實這個發生的幾率很小,所以不會有性能影響。之后便是通知系統內核繼續該線程,在Linux下是通過pthread_mutex_unlock完成。之后,被解鎖的線程進入上面所說的重新競爭狀態。

 

  我們再回頭看公平鎖(FairSync)和非公平鎖(NonfairSync)。

公平鎖和非公平鎖只是在獲取鎖的時候有差別,其它都是一樣的。

//NonfairSync extends Sync

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

//FairSync extends Sync

final void lock() {
            acquire(1);
        }

 

在上面非公平鎖的代碼中總是優先嘗試當前是否有線程持有鎖,一旦沒有任何線程持有鎖,那么非公平鎖就霸道的嘗試將鎖“占為己有”。如果在搶占鎖的時候失敗就和公平鎖一樣老老實實的去請求。

也即是說公平鎖和非公平鎖只是在入AQSCLH隊列之前有所差別,一旦進入了隊列,所有線程都是按照隊列中先來后到的順序請求鎖。

FaiSync的tryAcquire還會判斷是當前線程是否第一個節點,如果是才去搶鎖。

4. Lock VS Synchronized

  AbstractQueuedSynchronizer通過構造一個基於阻塞的CLH隊列容納所有的阻塞線程,而對該隊列的操作均通過Lock-Free(CAS)操作,但對已經獲得鎖的線程而言,ReentrantLock實現了偏向鎖的功能。

synchronized的底層也是一個基於CAS操作的等待隊列,但JVM實現的更精細,把等待隊列分為ContentionList和EntryList,目的是為了降低線程的出列速度;當然也實現了偏向鎖,從數據結構來說二者設計沒有本質區別。但synchronized還實現了自旋鎖,並針對不同的系統和硬件體系進行了優化,而Lock則完全依靠系統阻塞掛起等待線程。

  當然Lock比synchronized更適合在應用層擴展,可以繼承AbstractQueuedSynchronizer定義各種實現,比如實現讀寫鎖(ReadWriteLock),公平或不公平鎖;同時,Lock對應的Condition也比wait/notify要方便的多、靈活的多。


  synchronized使用的內置鎖和ReentrantLock這種顯式鎖在java6以后性能沒多大差異,在更新的版本中內置鎖只會比顯式鎖性能更好。這兩種鎖都是獨占鎖,java5以前內置鎖性能低的原因是它沒做任何優化,直接使用系統的互斥體來獲取鎖。顯式鎖除了CAS的時候利用的是本地代碼以外,其它的部分都是Java代碼實現的,在后續版本的Java中,顯式鎖不太可能會比內置鎖好,只會更差。使用顯式鎖的唯一理由是要利用它更多的功能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM