一文徹底搞懂CAS實現原理 & 深入到CPU指令


本文導讀:

  • 前言
  • 如何保障線程安全
  • CAS原理剖析
  • CPU如何保證原子操作
  • 解密CAS底層指令
  • 小結

朋友,文章優先發布在公眾號上,如果你願意,可以掃右側二維碼支持一下下~,謝謝!

前言

日常編碼過程中,基本不會直接用到 CAS 操作,都是通過一些JDK 封裝好的並發工具類來使用的,在 java.util.concurrent 包下。

但是面試時 CAS 還是個高頻考點,所以呀,你還不得不硬着頭皮去死磕一下這塊的技能點,總比一問三不知強吧?

一般都是先針對一些簡單的並發知識問起,還有的面試官,比較直接:

面試官:Java並發工具類中的 CAS 機制講一講?

小東:額?大腦中問自己「啥是 CAS?」我聽過的,容我想一想...

一分鍾過去了...

小東:嘿嘿~,這塊我看過的,記不大清楚了。

面試官:好的,今天先到這吧~

小東:在路上

biaoqing

當然 CAS 你若真不懂,你可以引導面試官到你擅長的技術點上,用你的其他技能亮點扳回一局。

接下來,我們通過一個示例代碼來說:

// 類的成員變量
static int data = 0;
// main方法內代碼
IntStream.range(0, 2).forEach((i) -> {
		new Thread(() -> {
				try {
						Thread.sleep(20);
				} catch (InterruptedException e) {
						e.printStackTrace();
				}
				IntStream.range(0, 100).forEach(y -> {
						data++;
				});
		}).start();
});

	try {
			Thread.sleep(2000);
	} catch (InterruptedException e) {
			e.printStackTrace();
	}

System.out.println(data);
}

結合圖示理解:

線程不安全

上述代碼,問題很明顯,data 是類中的成員變量,int 類型,即共享的資源。當多個線程同時
執行 data++ 操作時,結果可能不等於 200,為了模擬出效果,線程中 sleep 了 20 毫秒,讓線程就緒,代碼運行多次,結果都不是 200 。

如何保障線程安全

示例代碼執行結果表明了,多個線程同時操作共享變量導致了結果不准確,線程是不安全的。如何解決呢?

方案一:使用 synchronized 關鍵字

使用 synchronized 關鍵字,線程內使用同步代碼塊,由JVM自身的機制來保障線程的安全性。

synchronized 關鍵代碼:

// 類中定義的Object鎖對象
Object lock = new Object();
 
 // synchronized 同步塊 () 中使用 lock 對象鎖定資源
IntStream.range(0, 100).forEach(y -> {
		synchronized (lock.getClass()) {
				data++;
		}
});

synchronized保障線程安全

方案二:使用 Lock 鎖

高並發場景下,使用 Lock 鎖要比使用 synchronized 關鍵字,在性能上得到極大的提高。
因為 Lock 底層是通過 AQS + CAS 機制來實現的。關於 AQS 機制可以參見往期文章 <<通過通過一個生活中的案例場景,揭開並發包底層AQS的神秘面紗>> 。CAS 機制會在文章中下面講到。

使用 Lock 的關鍵代碼:

// 類中定義成員變量  
Lock lock = new ReentrantLock();

// 執行 lock() 方法加鎖,執行 unlock() 方法解鎖
IntStream.range(0, 100).forEach(y -> {
		lock.lock();
		data++;
		lock.unlock();
});

結合圖示理解:

Lock鎖保障線程安全

方案三:使用 Atomic 原子類

除上面兩種方案還有沒有更為優雅的方案?synchronized 的使用在 JDK1.6 版本以后做了很多優化,如果並發量不大,相比 Lock 更為安全,性能也能接受,因其得益於 JVM 底層機制來保障,自動釋放鎖,無需硬編碼方式釋放鎖。而使用 Lock 方式,一旦 unlock() 方法使用不規范,可能導致死鎖。

JDK 並發包所有的原子類如下所示:

並發包原子類

使用 AtomicInteger 工具類實現代碼:

// 類中成員變量定義原子類
AtomicInteger atomicData = new AtomicInteger();

// 代碼中原子類的使用方式
IntStream.range(0, 2).forEach((i) -> {
	new Thread(() -> {
			try {
					Thread.sleep(20);
			} catch (InterruptedException e) {
					e.printStackTrace();
			}
			IntStream.range(0, 100).forEach(y -> {
				  // 原子類自增
					atomicData.incrementAndGet();
			});
	}).start();
});

try {
		Thread.sleep(2000);
} catch (InterruptedException e) {
		e.printStackTrace();
}

// 通過 get () 方法獲取結果
System.out.println(atomicData.get());

結合圖示理解:

AtomicInteger實現

之所以推薦使用 Atomic 原子類,因為其底層基於 CAS 樂觀鎖來實現的,下文會詳細分析。

方案四:使用 LongAdder 原子類

LongAdder 原子類在 JDK1.8 中新增的類, 跟方案三中提到的 AtomicInteger 類似,都是在 java.util.concurrent.atomic 並發包下的。

LongAdder 適合於高並發場景下,特別是寫大於讀的場景,相較於 AtomicInteger、AtomicLong 性能更好,代價是消耗更多的空間,以空間換時間。

使用 LongAdder 工具類實現代碼:

// 類中成員變量定義的LongAdder
LongAdder longAdderData = new LongAdder();

// 代碼中原子類的使用方式
IntStream.range(0, 2).forEach((i) -> {
		new Thread(() -> {
				try {
						Thread.sleep(20);
				} catch (InterruptedException e) {
						e.printStackTrace();
				}
				IntStream.range(0, 100).forEach(y -> {
					  // 使用 increment() 方法自增
						longAdderData.increment();
				});
		}).start();
});

try {
		Thread.sleep(2000);
} catch (InterruptedException e) {
		e.printStackTrace();
}
// 使用 sum() 獲取結果
System.out.println(longAdderData.sum());

結合圖示理解:

LongAdder實現

但是,如果使用了 LongAdder 原子類,當然其底層也是基於 CAS 機制實現的。LongAdder 內部維護了 base 變量和 Cell[] 數組,當多線程並發寫的情況下,各個線程都在寫入自己的 Cell 中,LongAdder 操作后返回的是個近似准確的值,最終也會返回一個准確的值。

換句話說,使用了 LongAdder 后獲取的結果並不是實時的,對實時性要求高的還是建議使用其他的原子類,如 AtomicInteger 等。

volatile 關鍵字方案?

可能還有朋友會說,還想到另外一種方案:使用** volatile** 關鍵字啊。

volatile不能保障原子性

經過驗證,是不可行的,大家可以試試,就本文給出的示例代碼直接執行,結果都不等於 200,說明線程仍然是不安全的。

data++ 自增賦值並不是原子的,跟 Java內存模型有關。

在非線程安全的圖示中有標注執行線程本地,會有個內存副本,即本地的工作內存,實際執行過程會經過如下幾個步驟:

(1)執行線程從本地工作內存讀取 data,如果有值直接獲取,如果沒有值,會從主內存讀取,然后將其放到本地工作內存當中。

(2)執行線程在本地工作內存中執行 +1 操作。

(3)將 data 的值寫入主內存。

結論:請記住!

一個變量簡單的讀取和賦值操作是原子性的,將一個變量賦值給另外一個變量不是原子性的。

Java內存模型(JMM)僅僅保障了變量的基本讀取和賦值操作是原子性的,其他均不會保證的。如果想要使某段代碼塊要求具備原子性,就需要使用 synchronized 關鍵字、並發包中的 Lock 鎖、並發包中 Atomic 各種類型的原子類來實現,即上面我們提到的四種方案都是可行的

volatile 關鍵字修飾的變量,恰恰是不能保障原子性的,僅能保障可見性和有序性。

CAS原理剖析

CAS 被認為是一種樂觀鎖,有樂觀鎖,相對應的是悲觀鎖。

在上述示例中,我們使用了 synchronized,如果在線程競爭壓力大的情況下,synchronized 內部會升級為重量級鎖,此時僅能有一個線程進入代碼塊執行,如果這把鎖始終不能釋放,其他線程會一直阻塞等待下去。此時,可以認為是悲觀鎖。

悲觀鎖會因線程一直阻塞導致系統上下文切換,系統的性能開銷大。

那么,我們可以用樂觀鎖來解決,所謂的樂觀鎖,其實就是一種思想。

樂觀鎖,會以一種更加樂觀的態度對待事情,認為自己可以操作成功。當多個線程操作同一個共享資源時,僅能有一個線程同一時間獲得鎖成功,在樂觀鎖中,其他線程發現自己無法成功獲得鎖,並不會像悲觀鎖那樣阻塞線程,而是直接返回,可以去選擇再次重試獲得鎖,也可以直接退出。

CAS 正是樂觀鎖的核心算法實現。

在示例代碼的方案中都提到了 AtomicInteger、LongAdder、Lock鎖底層,此外,當然還包括 java.util.concurrent.atomic 並發包下的所有原子類都是基於 CAS 來實現的。

以 AtomicInteger 原子整型類為例,一起來分析下 CAS 底層實現機制。

atomicData.incrementAndGet()

源碼如下所示:

// 提供自增易用的方法,返回增加1后的值
public final int incrementAndGet() {
		return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

// 額外提供的compareAndSet方法
public final boolean compareAndSet(int expect, int update) {
		return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

// Unsafe 類的提供的方法
public final int getAndAddInt (Object o,long offset, int delta){
		int v;
		do {
				v = getIntVolatile(o, offset);
		} while (!weakCompareAndSetInt(o, offset, v, v + delta));
		return v;
}

我們看到了 AtomicInteger 內部方法都是基於 Unsafe 類實現的,Unsafe 類是個跟底層硬件CPU指令通訊的復制工具類。

由這段代碼看到:

unsafe.compareAndSwapInt(this, valueOffset, expect, update)

所謂的 CAS,其實是個簡稱,全稱是 Compare And Swap,對比之后交換數據。
上面的方法,有幾個重要的參數:

(1)this,Unsafe 對象本身,需要通過這個類來獲取 value 的內存偏移地址。

(2)valueOffset,value 變量的內存偏移地址。

(3)expect,期望更新的值。

(4)update,要更新的最新值。

如果原子變量中的 value 值等於 expect,則使用 update 值更新該值並返回 true,否則返回 false。

再看如何獲得 valueOffset的:

// Unsafe實例
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

static {
		try {
			  // 獲得value在AtomicInteger中的偏移量
				valueOffset = unsafe.objectFieldOffset
						(AtomicInteger.class.getDeclaredField("value"));
		} catch (Exception ex) { throw new Error(ex); }
}
// 實際變量的值
private volatile int value;

這里看到了 value 實際的變量,是由 volatile 關鍵字修飾的,為了保證在多線程下的內存可見性

為何能通過 Unsafe.getUnsafe() 方法能獲得 Unsafe 類的實例?其實因為 AtomicInteger 類也在 **rt.jar **包下面的,所以 AtomicInteger 類就是通過 Bootstrap 根類加載器進行加載的。

源碼如下所示:

@CallerSensitive
public static Unsafe getUnsafe() {
		Class var0 = Reflection.getCallerClass();
		// Bootstrap 類加載器是C++的,正常返回null,否則就拋異常。
		if (!VM.isSystemDomainLoader(var0.getClassLoader())) {
				throw new SecurityException("Unsafe");
		} else {
				return theUnsafe;
		}
}

類加載器委托關系:

類加載器

CPU如何實現原子操作

CPU 處理器速度遠遠大於在主內存中的,為了解決速度差異,在他們之間架設了多級緩存,如 L1、L2、L3 級別的緩存,這些緩存離CPU越近就越快,將頻繁操作的數據緩存到這里,加快訪問速度 ,如下圖所示:

CPU架構

現在都是多核 CPU 處理器,每個 CPU 處理器內維護了一塊字節的內存,每個內核內部維護着一塊字節的緩存,當多線程並發讀寫時,就會出現緩存數據不一致的情況。

此時,處理器提供:

  • 總線鎖定

當一個處理器要操作共享變量時,在 BUS 總線上發出一個 Lock 信號,其他處理就無法操作這個共享變量了。

缺點很明顯,總線鎖定在阻塞其它處理器獲取該共享變量的操作請求時,也可能會導致大量阻塞,從而增加系統的性能開銷。

  • **緩存鎖定 **

后來的處理器都提供了緩存鎖定機制,也就說當某個處理器對緩存中的共享變量進行了操作,其他處理器會有個嗅探機制,將其他處理器的該共享變量的緩存失效,待其他線程讀取時會重新從主內存中讀取最新的數據,基於 MESI 緩存一致性協議來實現的。

現代的處理器基本都支持和使用的緩存鎖定機制。

注意:

有如下兩種情況處理器不會使用緩存鎖定:

(1)當操作的數據跨多個緩存行,或沒被緩存在處理器內部,則處理器會使用總線鎖定。

(2)有些處理器不支持緩存鎖定,比如:Intel 486 和 Pentium 處理器也會調用總線鎖定。

解密CAS底層指令

其實,掌握以上內容,對於 CAS 機制的理解相對來說算是比較清楚了。

當然,如果感興趣,也可以繼續深入學習用到了哪些硬件 CPU 指令。

底層硬件通過將 CAS 里的多個操作在硬件層面語義實現上,通過一條處理器指令保證了原子性操作。這些指令如下所示:

(1)測試並設置(Tetst-and-Set)

(2)獲取並增加(Fetch-and-Increment)

(3)交換(Swap)

(4)比較並交換(Compare-and-Swap)

(5)加載鏈接/條件存儲(Load-Linked/Store-Conditional)

前面三條大部分處理器已經實現,后面的兩條是現代處理器當中新增加的。而且根據不同的體系結構,指令存在着明顯差異。

在IA64,x86 指令集中有 cmpxchg 指令完成 CAS 功能,在 sparc-TSO 也有 casa 指令實現,而在 ARM 和 PowerPC 架構下,則需要使用一對 ldrex/strex 指令來完成 LL/SC 的功能。在精簡指令集的體系架構中,則通常是靠一對兒指令,如:load and reserve 和 **store conditional ** 實現的,在大多數處理器上 CAS 都是個非常輕量級的操作,這也是其優勢所在。

sun.misc.Unsafe 中 CAS 的核心方法:

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

這三個方法可以對應去查看 openjdk 的 hotspot 源碼:

源碼位置:hotspot/src/share/vm/prims/unsafe.cpp

#define FN_PTR(f) CAST_FROM_FN_PTR(void*, &f)

{CC"compareAndSwapObject", CC"("OBJ"J"OBJ""OBJ")Z",  FN_PTR(Unsafe_CompareAndSwapObject)},

{CC"compareAndSwapInt",  CC"("OBJ"J""I""I"")Z",      FN_PTR(Unsafe_CompareAndSwapInt)},

{CC"compareAndSwapLong", CC"("OBJ"J""J""J"")Z",      FN_PTR(Unsafe_CompareAndSwapLong)},

上述三個方法,最終在 hotspot 源碼實現中都會調用統一的 cmpxchg 函數,可以在 hotspot 源碼中找到核心代碼。

源碼地址:hotspot/src/share/vm/runtime/Atomic.cpp

cmpxchg 函數源碼:

jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte*dest, jbyte compare_value) {
		 assert (sizeof(jbyte) == 1,"assumption.");
		 uintptr_t dest_addr = (uintptr_t) dest;
		 uintptr_t offset = dest_addr % sizeof(jint);
		 volatile jint*dest_int = ( volatile jint*)(dest_addr - offset);
		 // 對象當前值
		 jint cur = *dest_int;
		 // 當前值cur的地址
		 jbyte * cur_as_bytes = (jbyte *) ( & cur);
		 // new_val地址
		 jint new_val = cur;
		 jbyte * new_val_as_bytes = (jbyte *) ( & new_val);
		  // new_val存exchange_value,后面修改則直接從new_val中取值
		 new_val_as_bytes[offset] = exchange_value;
		 // 比較當前值與期望值,如果相同則更新,不同則直接返回
		 while (cur_as_bytes[offset] == compare_value) {
		  // 調用匯編指令cmpxchg執行CAS操作,期望值為cur,更新值為new_val
			 jint res = cmpxchg(new_val, dest_int, cur);
			 if (res == cur) break;
			 cur = res;
			 new_val = cur;
			 new_val_as_bytes[offset] = exchange_value;
		 }
		 // 返回當前值
		 return cur_as_bytes[offset];
}

源碼中具體變量添加了注釋,因為都是 C++ 代碼,所以作為了解即可 ~

jint res = cmpxchg(new_val, dest_int, cur);

這里就是調用了匯編指令 cmpxchg 了,其中也是包含了三個參數,跟CAS上的參數能對應上。

總結

任何技術都要找到適合的場景,都不是萬能的,CAS 機制也一樣,也有副作用。

問題1:

作為樂觀鎖的一種實現,當多線程競爭資源激烈的情況下,而且鎖定的資源處理耗時,那么其他線程就要考慮自旋的次數限制,避免過度的消耗 CPU。

另外,可以考慮上文示例代碼中提到的 LongAdder 來解決,LongAdder 以空間換時間的方式,來解決 CAS 大量失敗后長時間占用 CPU 資源,加大了系統性能開銷的問題。

問題2:

A-->B--->A 問題,假設有一個變量 A ,修改為B,然后又修改為了 A,實際已經修改過了,但 CAS 可能無法感知,造成了不合理的值修改操作。

整數類型還好,如果是對象引用類型,包含了多個變量,那怎么辦?加個版本號或時間戳唄,沒問題!

JDK 中 java.util.concurrent.atomic 並發包下,提供了 AtomicStampedReference,通過為引用建立個 Stamp 類似版本號的方式,確保 CAS 操作的正確性。

希望此文大家收藏消化,CAS 在JDK並發包底層實現中是個非常重要的算法。

撰文不易,文章中有什么問題還請指正!

歡迎關注我的公眾號,掃二維碼關注獲得更多精彩文章,與你一同成長~

Java愛好者社區


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM