AbstractQueuedSynchronizer原理及代碼分析


一、AQS簡介

AbstractQueuedSynchronizer(AQS)是java.util.concurrent並發包下最基本的同步器,其它同步器實現,如ReentrantLock類,ReentrantReadWriteLock類,Semaphore類(計數信號量),CountDownLatch類,FutureTask類和SynchronousQueues類都是基於它來實現的(各個實現類在內部持有了一個實現AQS的內部類,然后通過代理對外提供同步器的功能)。AQS會維護一個同步狀態(state),在互斥和共享語義下有不同的含義,在AQS內部,采用CAS技術實現同步狀態原子性的管理,而對同步狀態的更改邏輯(什么情況下進行更改)則由子類來實現,正是由於這點,產生了不同語義的同步器;另外,AQS根據同步狀態的值將對調用線程進行阻塞和解除阻塞的操作;最后,AQS提供了一個FIFO隊列,將阻塞的線程壓入隊列,進行排隊管理,然后再按照順序出隊列。總之,AQS框架為同步狀態的原子性管理、線程的阻塞和解除阻塞以及排隊提供了一種通用機制。

二、AQS實現

AQS包含兩種方法,一種是acquire,另一種是release。acquire操作阻塞調用的線程,直到或除非同步狀態允許其繼續執行。而release操作則是通過某種方式改變同步狀態,使得一或多個被acquire阻塞的線程繼續執行。

同步器背后的基本思想非常簡單。acquire操作如下:

while(synchronization state does not allow acquire) {
        enqueue current thread if not already queued;
        possibly block current thread;
    }
    dequeue current thread if it was queued;

release操作如下:

update synchronization state;
    if (state may permit a blocked thread to acquire)
        unblock one or more queued threads;

為了實現上述操作,需要下面三個基本組件:

1) 同步狀態的原子性管理;

2) 線程的阻塞與解除阻塞;

3) 隊列的管理;

1、同步狀態

AQS類使用一個int(32位)來保存同步狀態,並暴露出getState、setState以及compareAndSet操作來讀取和更新這個狀態。這些方法都依賴於j.u.c.atomic包的支持,這個包提供了兼容JSR133中volatile在讀和寫上的語義,並且通過使用本地的compare-and-swap或load-linked/store-conditional指令來實現compareAndSetState,使得僅當同步狀態擁有一個期望值的時候,才會被原子地設置成新值。

基於AQS的具體實現類必須根據暴露出的更改狀態的方法來定義tryAcquire和tryRelease方法,來控制acquire和release操作。當同步狀態滿足時,tryAcquire方法必須返回true,而當新的同步狀態允許后續acquire時,tryRelease方法也必須返回true。這些方法都接受一個int類型的參數用於傳遞想要的狀態。例如:可重入鎖中,這個參數表示同個線程申請鎖的次數。很多同步器並不需要這樣一個參數,忽略它即可。

2、阻塞

在JSR166之前,阻塞線程和解除線程阻塞都是用Java內置管程(synchronised)來實現,沒有其它的API可以使用。唯一可以選擇的是Thread.suspend和Thread.resume,但是它們都有無法解決的競態問題,所以也沒法用:當一個非阻塞的線程在一個正准備阻塞的線程調用suspend前調用了resume,這個resume操作將不會有什么效果。

j.u.c包有一個LockSuport類,這個類中包含了解決這個問題的方法。方法LockSupport.park阻塞當前線程除非直到有個LockSupport.unpark方法被調用(unpark方法被提前調用也是可以的)。unpark的調用是沒有被計數的,因此在一個park調用前多次調用unpark方法只會解除一個park操作。另外,它們作用於每個線程而不是每個同步器。一個線程在一個新的同步器上調用park操作可能會立即返回,因為在此之前可能有”剩余的”unpark操作。但是,在缺少一個unpark操作時,下一次調用park就會阻塞。雖然可以顯式地消除這個狀態,但並不值得這樣做。在需要的時候多次調用park會更高效。

3、隊列

整個框架的關鍵就是如何管理被阻塞的線程的隊列,該隊列是嚴格的FIFO隊列,不支持基於優先級的同步,它是基於CLH鎖(自旋鎖)實現的。它是一個雙向鏈表隊列,通過兩個字段head和tail來存取,這兩個字段是原子更新的,兩者在初始化時都指向了一個空節點。

clhNode

 

 

 

 

 

 

一個新的節點node,通過一個原子操作入隊:

do{
        pred = tail;
     while(!tail.compareAndSet(pred, node));

新的結點總是加在對尾,並且調用park()阻塞自己,然后等待前驅結點通過unpark(node.thread)喚醒。出隊列需要判斷當前結點的前驅結點是否是head,只有前驅結點是頭結點才能出隊列,該結點出隊列之后,同時將head字段指向該節點:

head = node;

在節點中顯式地維護前驅結點除了在出/入隊列中所起的作用之外,還可以有效處理”超時”和各種形式的”取消”:如果一個節點的前驅節點取消了,這個節點就可以忽略該前驅結點,直接使用前驅結點的前驅結點。

另外,在結點中設置結點的狀態是用於控制阻塞的,這樣也可以避免沒有必要的park和unpark調用。在調用park前,線程設置一個”喚醒(signal me)”位,一個釋放的線程會清空其自身狀態,這樣線程就不必頻繁地嘗試阻塞。

拋開具體的細節,基本的acquire操作的最終實現的一般形式如下(互斥,非中斷,無超時):

if(!tryAcquire(arg)) {
        node = create and enqueue new node;
        pred = node's effective predecessor;
        while (pred is not head node || !tryAcquire(arg)) {
            if (pred's signal bit is set) park();
            else compareAndSet pred's signal bit to true;

            pred = node's effective predecessor;
        }

        head = node;
    }

release操作:

if(tryRelease(arg) && head node's signal bit is set) {
         compareAndSet head's bit to false;
         unpark head's successor, if one exist
    }

4、條件隊列

AQS框架提供了一個ConditionObject類,給維護獨占同步的類以及實現Lock接口的類使用。一個鎖對象可以關聯任意數目的條件對象,可以提供典型的管程風格的await、signal和signalAll操作,包括帶有超時的,以及一些檢測、監控的方法。ConditionObject類有效地將條件(conditions)與其它同步操作結合到了一起。該類只支持Java風格的管程訪問規則,這些規則中,僅當當前線程持有鎖且要操作的條件(condition)屬於該鎖時,條件操作才是合法的。這樣,一個ConditionObject關聯到一個ReentrantLock上就表現的跟內置的管程(通過Object.wait等)一樣了。兩者的不同僅僅在於方法的名稱、額外的功能以及用戶可以為每個鎖聲明多個條件。

ConditionObject使用了與同步器一樣的內部隊列節點。但是,是在一個單獨的條件隊列中維護這些節點的。signal操作是通過將節點從條件隊列轉移到鎖隊列中來實現的,而沒有必要在需要喚醒的線程重新獲取到鎖之前將其喚醒。

基本的await操作如下:

create and add new node to conditon queue;
    release lock;
    block until node is on lock queue;
    re-acquire

signal操作如下:

transfer the first node from condition queue to lock queue;

因為只有在持有鎖的時候才能執行這些操作,因此他們可以使用順序鏈表隊列操作來維護條件隊列(在節點中用一個nextWaiter字段)。轉移操作僅僅把第一個節點從條件隊列中的鏈接解除,然后通過CLH插入操作將其插入到鎖隊列上。

三、AQS實現原理

1、同步狀態

AQS如何區分同步器的排他性和共享性?這主要通過維護一個同步狀態來實現,其定義如下:

private volatile int state;

通過添加volatile修飾符保證線程中讀取到的state值都是最新更改的值,另外通過compareAndSetState函數來原子性操作(用CAS指令實現無阻塞的原子性操作)。在不同的同步器中,state的含義是不同的。

ReentrantLock類使用AQS同步狀態來保存鎖(重復)持有的次數。當鎖被一個線程獲取時,ReentrantLock也會記錄下當前獲得鎖的線程標識,以便檢查是否是重復獲取,以及當錯誤的線程試圖進行解鎖操作時拋出異常。

ReentrantReadWriteLock類使用AQS同步狀態中的16位來保存寫鎖持有的次數,剩下的16位用來保存讀鎖的持有次數。WriteLock的構建方式同ReentrantLock。ReadLock則通過使用acquireShared方法來支持同時允許多個讀線程。

Semaphore類(計數信號量)使用AQS同步狀態來保存信號量的當前計數。它里面定義的acquireShared方法會減少計數,或當計數為非正值時阻塞線程;tryRelease方法會增加計數,當計數為正值時還要解除線程的阻塞。

CountDownLatch類使用AQS同步狀態來表示計數。當該計數為0時,所有的acquire操作才能通過。

FutureTask類使用AQS同步狀態來表示某個異步計算任務的運行狀態(初始化、運行中、被取消和完成)。設置或取消一個FutureTask時會調用AQS的release操作,等待計算結果的線程的阻塞解除是通過AQS的acquire操作實現的。

SynchronousQueues類(一種CSPCommunicating Sequential Processes形式的傳遞)使用了內部的等待節點,這些節點可以用於協調生產者和消費者。同時,它使用AQS同步狀態來控制當某個消費者消費當前一項時,允許一個生產者繼續生產,反之亦然。

2、FIFO隊列

在AQS內部維護一個FIFO隊列對阻塞的線程進行管理,隊列中的元素Node保存着線程引用和線程狀態的容器,每個線程都可以看作是隊列中的一個節點。Node的主要包含以下成員變量:

Node {
        int waitStatus;
        Node prev;
        Node next;
        Node nextWaiter;
        Thread thread;
    }

其含義如下所示:

屬性名稱 描述
int waitStatus

表示節點的狀態。其中包含的狀態有:

  1. CANCELLED,值為1,表示當前的線程被取消;
  2. SIGNAL,值為-1,表示當前節點的后繼節點包含的線程需要運行,也就是unpark;
  3. CONDITION,值為-2,表示當前節點在等待condition,也就是在condition隊列中;
  4. PROPAGATE,值為-3,表示當前場景下后續的acquireShared能夠得以執行;
  5. 值為0,節點的初始值。
Node prev 前驅節點
Node next 后繼節點
Node nextWaiter 存儲condition隊列中的后繼節點
Thread thread 入隊列時的當前線程

在AQS內部,會維護兩個隊列,一個是同步隊列,一個是條件隊列,當條件滿足之后,條件隊列中的結點會被轉移到同步隊列中,由同步隊列統一維護。

3、API說明

AQS采用模板方法,定義了一套通用的處理框架,規范了排他鎖和共享鎖獲取鎖的流程,其中包括隊列的維護,線程的阻塞及喚醒等等,而實現排他鎖和共享鎖的邏輯則交給子類來實現(主要是賦予同步狀態不同的含義來實現)。其中acquire和release函數定義了排他鎖的鎖獲取和釋放的處理流程,對應的同步狀態的維護則由tryAcquire和tryRelease函數負責,這個函數由子類來實現,如果直接調用AQS中的實現,則會拋出UnsupportedOperationException異常。依次類推,acquireShared,releaseShared,tryAcquireShared和tryReleaseShared定義共享鎖的處理流程。除了這兩個基本的流程,AQS還為它們提供了超時和中斷的版本,為了簡單起見,在這里主要分析非超時和中斷的版本。

四、源碼分析

1、acquire方法主要實現排他鎖的獲取,其代碼如下:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

這個函數規范了獲取鎖的流程:1、嘗試獲取鎖;2、獲取失敗入隊列;3、阻塞當前線程。另外在調用過程中,會清空線程的中斷狀態,在退出之前,還需要有恢復中斷狀態(selfInterruput)。

1)嘗試獲取鎖

以ReentrantLock的不公平鎖的獲取為例(在tryAcquire中調用nonfairTryAcquire函數)

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

不公平鎖主要是指最新申請鎖的線程有可能比在隊列中等待獲取鎖的線程優先得到鎖,這對后者來說是不公平的,因為這違背了先來先得的原則。這樣做的目的主要是為了獲取更大的吞吐量,在ReentrantLock中同時也實現了公平鎖,可以根據不同的場景中自由選擇,在性能和公平之間自由切換。

在ReentrantLock中,有兩個重要的變量與之關聯,一個是鎖的狀態:state和當前獲取鎖的線程。state為零表示鎖未被占用,可以被申請;大於零表示被占用,具體的數值表示重入的數量,即鎖被重新獲取的數量(ReentrantLock可被占有的線程重新獲取)。在該方法中,兩種情況返回true,一種是鎖未被占用,另外一種情況是申請鎖的線程和鎖占用的線程是同一個時。

2)入隊列並阻塞線程

當鎖被占用的時候,AQS會將申請鎖的線程壓入隊列:

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

在addWaiter方法中主要是生成一個排他類型的結點,將根據CAS指令插入到FIFO隊列的隊尾。

final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

生成結點並壓入隊尾之后,會將新生成的結點傳入acquireQueued方法。在該方法中,會對當前結點的前驅結點進行輪詢,判斷其前驅結點是否是head結點,並再次嘗試獲取鎖,如果成功,則將當前結點設置成為head結點,並結束輪詢退出方法,至此acquire流程結束。如果其前驅結點不是head結點,則對結點進行狀態設置且調用park方法阻塞當前線程。對結點狀態主要包含兩個方面(shouldParkAfterFailedAcquire方法):1)將前驅結點中處於取消狀態的結點過濾掉,同時將前驅結點設置為首個非取消狀態的結點;2)將前驅結點的狀態設置為Node.SIGNAL,表示其后繼結點需要被喚醒。設置狀態成功之后調用parkAndCheckInterrupt方法,在該方法中調用LockSupport.park(this),阻塞當前線程,並清空線程的中斷狀態。

2、release方法主要是鎖的釋放操作,其代碼如下所示:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

在release方法中,主要包含兩個操作:1)嘗試釋放鎖,判斷鎖的狀態(state)是否可以被申請;2)喚醒head結點后繼結點對應的線程。

1) 嘗試釋放鎖

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

在該方法中,主要是對當前鎖的狀態值減1,如果為0則表示鎖未被占用,另外鎖的釋放操作只能由鎖的占有者調用 。

1) 喚醒后繼結點

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0); 
      
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

在unparkSuccessor方法中,一般會直接喚醒head后繼結點中的線程,不過如果該后續結點被取消或為空,則會從隊尾遍歷出第一個未取消的線程進行喚醒。喚醒的線程被系統調度之后將從acquireQueued方法繼續執行,同時將該后繼結點設置為head頭結點,至此,被阻塞的線程才從acquireQueued中返回。

3、ConditionObject條件變量,可替換之前的管程的同步操作(Object對象的wait,notify方法),在該對象中持有兩個變量firstWaiter和lastWaiter,表示等條件列的頭尾。await對應Object中的wait方法,它的工作是將當前線程加入到條件對列中,並阻塞線程,等待條件滿足之后喚醒。signal對應Object中的notify,它僅僅是將當前線程從條件隊列中移到同步對列中。

1)await方法

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

在await方法中,主要有四個操作:1)構建結點,加入到條件對列(addConditionWaiter方法);2)釋放鎖;3)阻塞當前線程;4)重新獲取鎖。

2)signal方法,該方法主要是調用doSignal實現。

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

該方法主要是將條件隊列中第一個未取消的線程移到同步隊列中。

五、總結

這篇文章主要分析了AbstractQueuedSynchronizer(AQS)的實現原理和相關代碼,AQS是java並發包中一個基礎組件,在它的基礎上,構建了一系列高級的同步組件。理解它的原理對於工作中使用java並發技術有莫大的幫助。

 

引用:

1、 The java.util.concurrent Synchronizer Framework 中文翻譯版,http://ifeve.com/aqs/

2、 AbstractQueuedSynchronizer的介紹和原理分析,http://ifeve.com/introduce-abstractqueuedsynchronizer/#more-8074

3、java線程阻塞中斷和LockSupport的常見問題,http://agapple.iteye.com/blog/970055


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM