從synchronize到CAS和AQS


導論

Java 中的並發鎖大致分為隱式鎖和顯式鎖兩種。

隱式鎖就是我們最常使用的 synchronized 關鍵字,顯式鎖主要包含兩個接口:Lock 和 ReadWriteLock,主要實現類分別為 ReentrantLock 和 ReentrantReadWriteLock,

這兩個類都是基於 AQS(AbstractQueuedSynchronizer) 實現的。還有的地方將 CAS 也稱為一種鎖,在包括 AQS 在內的很多並發相關類中,CAS 都扮演了很重要的角色。

就是一些名詞的概念,知道他們是什么中文意思,知道理解意思之后在學習代碼的實現就比較輕松了。

這個文章也算帶着看了 AbstractQueuedSynchronizer 和 ReentrantReadWriteLock 和 ReentrantLock 的源碼。

相關閱讀:

線程同步-synchronized

CAS(https://www.cnblogs.com/zwtblog/p/15129821.html#cas)

悲觀鎖和樂觀鎖

悲觀鎖和獨占鎖是一個意思,

它假設一定會發生沖突,因此獲取到鎖之后會阻塞其他等待線程。

這么做的好處是簡單安全,但是掛起線程和恢復線程都需要轉入內核態進行,這樣做會帶來很大的性能開銷。

悲觀鎖的代表是 synchronized

然而在真實環境中,大部分時候都不會產生沖突。悲觀鎖會造成很大的浪費。

而樂觀鎖不一樣,它假設不會產生沖突,先去嘗試執行某項操作,失敗了再進行其他處理(一般都是不斷循環重試)。

這種鎖不會阻塞其他的線程,也不涉及上下文切換,性能開銷小。代表實現是 CAS。

公平鎖和非公平鎖

公平鎖是指各個線程在加鎖前先檢查有無排隊的線程,按排隊順序去獲得鎖

非公平鎖是指線程加鎖前不考慮排隊問題,直接嘗試獲取鎖,獲取不到再去隊尾排隊

值得注意的是,在 AQS 的實現中,一旦線程進入排隊隊列,即使是非公平鎖,線程也得乖乖排隊。

可重入鎖和不可重入鎖

如果一個線程已經獲取到了一個鎖,那么它可以訪問被這個鎖鎖住的所有代碼塊。

不可重入鎖與之相反。

Synchronized 關鍵字

Synchronized 是一種獨占鎖。

在修飾靜態方法時,鎖的是類,如 Object.class。

修飾非靜態方法時,鎖的是對象,即 this。修飾方法塊時,鎖的是括號里的對象。

每個對象有一個鎖和一個等待隊列,鎖只能被一個線程持有,其他需要鎖的線程需要阻塞等待。

鎖被釋放后,對象會從隊列中取出一個並喚醒,喚醒哪個線程是不確定的,不保證公平性。

實現原理

synchronized 是基於 Java 對象頭和 Monitor 機制來實現的。

Java 對象頭

一個對象在內存中包含三部分:對象頭,實例數據和對齊填充

其中 Java 對象頭包含兩部分:

  • Class Metadata Address (類型指針)。存儲類的元數據的指針。虛擬機通過這個指針找到它是哪個類的實例。
  • Mark Word(標記字段)。存出一些對象自身運行時的數據。包括哈希碼,GC 分代年齡,鎖狀態標志等。

Monitor

Mark Word 有一個字段指向 monitor 對象

monitor 中記錄了鎖的持有線程,等待的線程隊列等信息。

前面說的每個對象都有一個鎖和一個等待隊列,就是在這里實現的。 monitor 對象由 C++ 實現。其中有三個關鍵字段:

  • _owner 記錄當前持有鎖的線程
  • _EntryList 是一個隊列,記錄所有阻塞等待鎖的線程
  • _WaitSet 也是一個隊列,記錄調用 wait() 方法並還未被通知的線程。

Monitor的操作機制如下:

  1. 多個線程競爭鎖時,會先進入 EntryList 隊列。競爭成功的線程被標記為 Owner。其他線程繼續在此隊列中阻塞等待。
  2. 如果 Owner 線程調用 wait() 方法,則其釋放對象鎖並進入 WaitSet 中等待被喚醒。Owner 被置空,EntryList 中的線程再次競爭鎖。
  3. 如果 Owner 線程執行完了,便會釋放鎖,Owner 被置空,EntryList 中的線程再次競爭鎖。

JVM 對 synchronized 的處理

上面了解了 monitor 的機制,那虛擬機是如何將 synchronized 和 monitor 關聯起來的呢?

分兩種情況:

  • 如果同步的是代碼塊,編譯時會直接在同步代碼塊前加上 monitorenter 指令,代碼塊后加上 monitorexit 指令。這稱為顯示同步。
  • 如果同步的是方法,虛擬機會為方法設置 ACC_SYNCHRONIZED 標志。調用的時候 JVM 根據這個標志判斷是否是同步方法。

JVM 對 synchronized 的優化

synchronized 是重量級鎖,由於消耗太大,虛擬機對其做了一些優化。

自旋鎖與自適應自旋

在許多應用中,鎖定狀態只會持續很短的時間,為了這么一點時間去掛起恢復線程,不值得。

我們可以讓等待線程執行一定次數的循環,在循環中去獲取鎖。這項技術稱為自旋鎖,它可以節省系統切換線程的消耗,但仍然要占用處理器。

在 JDK1.4.2 中,自選的次數可以通過參數來控制。 JDK 1.6又引入了自適應的自旋鎖,不再通過次數來限制,而是由前一次在同一個鎖上的自旋時間及鎖的擁有者的狀態來決定。

鎖消除

虛擬機在運行時,如果發現一段被鎖住的代碼中不可能存在共享數據,就會將這個鎖清除。

鎖粗化

當虛擬機檢測到有一串零碎的操作都對同一個對象加鎖時,會把鎖擴展到整個操作序列外部。如 StringBuffer 的 append 操作。

輕量級鎖

對絕大部分的鎖來說,在整個同步周期內都不存在競爭。如果沒有競爭,輕量級鎖可以使用 CAS 操作避免使用互斥量的開銷。

偏向鎖

偏向鎖的核心思想是,如果一個線程獲得了鎖,那么鎖就進入偏向模式,當這個線程再次請求鎖時,無需再做任何同步操作,即可獲取鎖。

CAS

操作模型

CAS 是 compare and swap 的簡寫,即比較並交換

它是指一種操作機制,而不是某個具體的類或方法。

在 Java 平台上對這種操作進行了包裝。在 Unsafe 類中,調用代碼如下:

unsafe.compareAndSwapInt(this, valueOffset, expect, update);

它需要三個參數,分別是內存位置 V,舊的預期值 A 和新的值 B。

操作時,先從內存位置讀取到值,然后和預期值A比較。如果相等,則將此內存位置的值改為新值 B,返回 true。如果不相等,說明和其他線程沖突了,則不做任何改變,返回 false。

這種機制在不阻塞其他線程的情況下避免了並發沖突,比獨占鎖的性能高很多。 CAS 在 Java 的原子類和並發包中有大量使用。

重試機制(循環 CAS)

第一,CAS 本身並未實現失敗后的處理機制,它只負責返回成功或失敗的布爾值,后續由調用者自行處理。只不過我們最常用的處理方式是重試而已。

第二,這句話很容易理解錯,被理解成重新比較並交換。實際上失敗的時候,原值已經被修改,如果不更改期望值,再怎么比較都會失敗。而新值同樣需要修改。

所以正確的方法是,使用一個死循環進行 CAS 操作,成功了就結束循環返回,失敗了就重新從內存讀取值和計算新值,再調用 CAS。看下 AtomicInteger 的源碼就什么都懂了:

CAS-Java基礎

public final int incrementAndGet () {
    for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return next;
    }
}

底層實現

CAS 主要分三步,讀取-比較-修改。

其中比較是在檢測是否有沖突,如果檢測到沒有沖突后,其他線程還能修改這個值,那么 CAS 還是無法保證正確性。所以最關鍵的是要保證比較-修改這兩步操作的原子性。

CAS 底層是靠調用 CPU 指令集的 cmpxchg 完成的,它是 x86 和 Intel 架構中的 compare and exchange 指令。在多核的情況下,這個指令也不能保證原子性,需要在前面加上 lock 指令。

lock 指令可以保證一個 CPU 核心在操作期間獨占一片內存區域。那么 這又是如何實現的呢?

在處理器中,一般有兩種方式來實現上述效果:總線鎖和緩存鎖。

在多核處理器的結構中,CPU 核心並不能直接訪問內存,而是統一通過一條總線訪問。

總線鎖就是鎖住這條總線,使其他核心無法訪問內存。這種方式代價太大了,會導致其他核心停止工作。

而緩存鎖並不鎖定總線,只是鎖定某部分內存區域。當一個 CPU 核心將內存區域的數據讀取到自己的緩存區后,它會鎖定緩存對應的內存區域。鎖住期間,其他核心無法操作這塊內存區域。

CAS 就是通過這種方式實現比較和交換操作的原子性的。

值得注意的是, CAS 只是保證了操作的原子性,並不保證變量的可見性,因此變量需要加上 volatile 關鍵字。

ABA問題?

比如是有兩個線程A,B ,一個變量:蘋果

A線程期望拿到一個蘋果

B線程一進來把蘋果改成了梨子,但是在最后結束的時候又把梨子換成了蘋果

A線程在此期間是不知情的,以為自己拿到的蘋果還是原來的那一個,其實已經被換過了

如何解決ABA問題?

原子引用-解決ABA問題

AtomicStampedReference

類似於樂觀鎖,比較時會去對比版本號,確認變量是否被換過了。

可重入鎖 ReentrantLock

ReentrantLock 使用代碼實現了和 synchronized 一樣的語義,包括可重入,保證內存可見性和解決競態條件問題等。相比 synchronized,它還有如下好處:

  • 支持以非阻塞方式獲取鎖
  • 可以響應中斷
  • 可以限時
  • 支持了公平鎖和非公平鎖

基本用法如下:

public class Counter {
    private final Lock lock = new ReentrantLock();
    private volatile int count;

    public void incr() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

    public int getCount() {
        return count;
    }
}

ReentrantLock 內部有兩個內部類,分別是 FairSync 和 NoFairSync,對應公平鎖和非公平鎖。他們都繼承自 Sync。Sync 又繼承自AQS。

AQS

AQS全稱AbstractQueuedSynchronizer,即抽象的隊列同步器,是一種用來構建鎖和同步器的框架。

AQS 中有兩個重要的成員:

  • 成員變量 state。用於表示鎖現在的狀態,用 volatile 修飾,保證內存一致性。

    同時所用對 state 的操作都是使用 CAS 進行的。

    state 為0表示沒有任何線程持有這個鎖,線程持有該鎖后將 state 加1,釋放時減1。

    多次持有釋放則多次加減。

  • 還有一個雙向鏈表,鏈表除了頭結點外,每一個節點都記錄了線程的信息,代表一個等待線程。這是一個 FIFO 的鏈表。

下面以 ReentrantLock 非公平鎖的代碼看看 AQS 的原理。

AQS中Node常量含義

CANCELLED
waitStatus值為1時表示該線程節點已釋放(超時、中斷),已取消的節點不會再阻塞。

SIGNAL
waitStatus為-1時表示該線程的后續線程需要阻塞,即只要前置節點釋放鎖,就會通知標識為 SIGNAL 狀態的后續節點的線程

CONDITION
waitStatus為-2時,表示該線程在condition隊列中阻塞(Condition有使用)

PROPAGATE
waitStatus為-3時,表示該線程以及后續線程進行無條件傳播(CountDownLatch中有使用)共享模式下, PROPAGATE 狀態的線程處於可運行狀態

Condition隊列

除了同步隊列之外,AQS中還存在Condition隊列,這是一個單向隊列。

調用ConditionObject.await()方法,能夠將當前線程封裝成Node加入到Condition隊列的末尾,

然后將獲取的同步狀態釋放(即修改同步狀態的值,喚醒在同步隊列中的線程)。

Condition隊列也是FIFO。調用ConditionObject.signal()方法,能夠喚醒firstWaiter節點,將其添加到同步隊列末尾。

獨占模式下的AQS

所謂獨占模式,即只允許一個線程獲取同步狀態,當這個線程還沒有釋放同步狀態時,其他線程是獲取不了的,只能加入到同步隊列,進行等待。

很明顯,我們可以將state的初始值設為0,表示空閑。

當一個線程獲取到同步狀態時,利用CAS操作讓state加1,表示非空閑,那么其他線程就只能等待了。

釋放同步狀態時,不需要CAS操作,因為獨占模式下只有一個線程能獲取到同步狀態。

ReentrantLock、CyclicBarrier正是基於此設計的。

例如,ReentrantLock,state初始化為0,表示未鎖定狀態。

A線程lock()時,會調用tryAcquire()獨占該鎖並將state+1

獨占模式下的AQS是不響應中斷的,指的是加入到同步隊列中的線程,如果因為中斷而被喚醒的話,

不會立即返回,並且拋出InterruptedException。

而是再次去判斷其前驅節點是否為head節點,決定是否爭搶同步狀態。

如果其前驅節點不是head節點或者爭搶同步狀態失敗,那么再次掛起。

請求鎖

請求鎖時有三種可能:

  1. 如果沒有線程持有鎖,則請求成功,當前線程直接獲取到鎖。
  2. 如果當前線程已經持有鎖,則使用 CAS 將 state 值加1,表示自己再次申請了鎖,釋放鎖時減1。這就是可重入性的實現。
  3. 如果由其他線程持有鎖,那么將自己添加進等待隊列。

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                //沒有線程持有鎖時,直接獲取鎖,對應情況1
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }


/**以獨占模式獲取,忽略中斷。通過至少調用一次 {@link tryAcquire} 實現,成功返回。
否則線程會排隊,可能會反復阻塞和解除阻塞,調用 {@link tryAcquire} 直到成功。
該方法可用於實現方法{@link Locklock}。 
@param arg 獲取參數。這個值被傳送到 {@link tryAcquire},但不會被解釋,可以代表任何你喜歡的東西。*/

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&//在此方法中會判斷當前持有線程是否等於自己,對應情況2
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//將自己加入隊列中,對應情況3
            selfInterrupt();
    }

創建 Node 節點並加入鏈表

如果沒競爭到鎖,這時候就要進入等待隊列。

隊列是默認有一個 head 節點的,並且不包含線程信息。

上面情況3中,addWaiter 會創建一個 Node,並添加到鏈表的末尾,Node 中持有當前線程的引用。同時還有一個成員變量 waitStatus,表示線程的等待狀態,初始值為0。我們還需要關注兩個值:

  • CANCELLED,cancelled,值為1,表示取消狀態,就是說我不要這個鎖了,請你把我移出去。
  • SINGAL,singal,值為-1,表示下一個節點正在掛起等待,注意是下一個節點,不是當前節點。

同時,加到鏈表末尾的操作使用了 CAS+死循環的模式,很有代表性,拿出來看一看:

/**

AbstractQueuedSynchronizer

為當前線程和給定模式創建和排隊節點。 
@param mode Node.EXCLUSIVE 表示獨占,Node.SHARED 表示共享 @return 新節點
*/

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

可以看到,在死循環里調用了 CAS 的方法。

如果多個線程同時調用該方法,那么每次循環都只有一個線程執行成功,其他線程進入下一次循環,重新調用。

N個線程就會循環N次。這樣就在無鎖的模式下實現了並發模型。

掛起等待

  • 如果此節點的上一個節點是頭部節點,則再次嘗試獲取鎖,獲取到了就移除並返回。獲取不到就進入下一步;
  • 判斷前一個節點的 waitStatus,如果是 SINGAL,則返回 true,並調用 LockSupport.park() 將線程掛起;
  • 如果是 CANCELLED,則將前一個節點移除;
  • 如果是其他值,則將前一個節點的 waitStatus 標記為 SINGAL,進入下一次循環。

可以看到,一個線程最多有兩次機會,還競爭不到就去掛起等待。

/**
以獨占不間斷模式獲取已在隊列中的線程。
由條件等待方法以及獲取使用。 
@param node 節點 
@param arg 獲取參數 
@return {@code true} 如果在等待時中斷
*/
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

釋放鎖

  • 調用 tryRelease,此方法由子類實現。實現非常簡單,如果當前線程是持有鎖的線程,就將 state 減1。減完后如果 state 大於0,表示當前線程仍然持有鎖,返回 false。如果等於0,表示已經沒有線程持有鎖,返回 true,進入下一步;
  • 如果頭部節點的 waitStatus 不等於0,則調用LockSupport.unpark()喚醒其下一個節點。頭部節點的下一個節點就是等待隊列中的第一個線程,這反映了 AQS 先進先出的特點。另外,即使是非公平鎖,進入隊列之后,還是得按順序來。
public final boolean release(int arg) {
    if (tryRelease(arg)) { //將 state 減1
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);
        
    Node s = node.next;
    if (s == null || s.waitStatus > 0) { 
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    if (s != null) //喚醒第一個等待的線程
        LockSupport.unpark(s.thread);
}

公平鎖如何實現

上面分析的是非公平鎖,那公平鎖呢?很簡單,在競爭鎖之前判斷一下等待隊列中有沒有線程在等待就行了。

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() && //判斷等待隊列是否有節點
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    ......
    return false;
}

可重入讀寫鎖 ReentrantReadWriteLock

讀寫鎖機制

理解 ReentrantLock 和 AQS 之后,再來理解讀寫鎖就很簡單了。

讀寫鎖有一個讀鎖和一個寫鎖,分別對應讀操作和鎖操作。

鎖的特性如下:

  • 只有一個線程可以獲取到寫鎖。在獲取寫鎖時,只有沒有任何線程持有任何鎖才能獲取成功;
  • 如果有線程正持有寫鎖,其他任何線程都獲取不到任何鎖;
  • 沒有線程持有寫鎖時,可以有多個線程獲取到讀鎖。

實現原理

讀寫鎖雖然有兩個鎖,但實際上只有一個等待隊列。

  • 獲取寫鎖時,要保證沒有任何線程持有鎖;
  • 寫鎖釋放后,會喚醒隊列第一個線程,可能是讀鎖和寫鎖;
  • 獲取讀鎖時,先判斷寫鎖有沒有被持有,沒有就可以獲取成功;
  • 獲取讀鎖成功后,會將隊列中等待讀鎖的線程挨個喚醒,知道遇到等待寫鎖的線程位置;
  • 釋放讀鎖時,要檢查讀鎖數,如果為0,則喚醒隊列中的下一個線程,否則不進行操作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM