CAS,Compare And Swap,即比較並交換。Doug lea大神在同步組件中大量使用CAS技術鬼斧神工地實現了Java多線程的並發操作。整個AQS同步組件、Atomic原子類操作等等都是以CAS為基礎實現的,甚至ConcurrentHashMap在1.8的版本中也調整為了CAS+Synchronized。可以說CAS是整個JUC的基石,是樂觀並發策略的一種實現,硬件保證一個語義上看起來需要多次操作(比較並交換)的行為通過一條處理器指令就能完成。

CAS分析
public class CompareAndSwap{ private int value; //獲取內存值
public synchronized int get() { return value; } /** * 比較當前內存值和舊的預期值,只有兩個值相等的情況,進行更新 * @param expectedValue 舊的預期值 - 在進行運算前從內存中讀取的值 * @param newValue 擬寫入的新值 - 運算得到的值,即擬寫入內存的值 * @return
*/
public synchronized int compareAndSwap(int expectedValue, int newValue){ int oldValue = value; //比較當前內存值和舊的預期值 如果相等,將更新值賦給內存值
if (oldValue == expectedValue) { this.value = newValue; } return oldValue; } //設置
public synchronized boolean compareAndSet(int expectedValue, int newValue){ return expectedValue == compareAndSwap(expectedValue, newValue); } }
常見的使用情況是:線程首先從內存位置V中讀取到預期值A,在執行寫入前,比較當前內存值和舊的預期值A是否相等,如果相等,就將計算得到的值賦給內存值。不相等則說明,期間有其他線程修改了內存位置V的值。
當多個線程使用CAS同時更新一個變量值時,只有其中一個線程能夠更新成功,其他的線程都將失敗。但是,失敗的線程不會被掛起(但如果獲取鎖失敗,線程將被掛起),而是返回失敗狀態,調用者線程可以選擇是否需要再一次嘗試(如果是在一些競爭激烈的情況下,更好的方式是在重試之前等待一段時間或者回退,從而避免活鎖問題–不斷重試,不斷失敗),或者執行一些恢復操作,也可以什么都不做。
JUC下的atomic類都是通過CAS來實現的,下面就以AtomicInteger為例來闡述CAS的實現。如下:
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value;
Unsafe是CAS的核心類,但是Unsafe類不是提供給用戶程序調用的(Unsafe.getUnsafe()代碼限制了只有啟動類加載器(Bootstrap ClassLoader)加載的Class才能訪問它),因此,如果不使用反射,只能通過其他的java API來間接使用。 比如J.U.C包中的原子類。其中整數原子類有compareAndSet() 和 getAndIncrement()方法都是用了Unsafe類的CAS操作。該操作由Unsafe類里面的compareAndSwapInt()和compareAndSwapLong()等幾個方法包裝提供,虛擬機在內部對這些方法做了特殊處理,即時編譯出來的結果就是一條平台相關的處理器CAS指令。
我們就以AtomicInteger的incrementAndGet()方法來做說明,先看源代碼:
/** * Atomically increments by one the current value. * * @return the updated value */
public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
內部調用unsafe的getAndAddInt方法,在getAndAddInt方法中主要是看compareAndSwapInt方法:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
該方法為native方法,有四個參數,分別代表:對象、對象的地址、預期值、修改值。
CPU提供了兩種方法來實現多處理器的原子操作:總線加鎖或者緩存加鎖。
總線加鎖:總線加鎖就是就是使用處理器提供的一個LOCK#信號,當一個處理器在總線上輸出此信號時,其他處理器的請求將被阻塞住,那么該處理器可以獨占使用共享內存。但是這種處理方式顯得有點兒霸道,不厚道,他把CPU和內存之間的通信鎖住了,在鎖定期間,其他處理器都不能其他內存地址的數據,其開銷有點兒大。所以就有了緩存加鎖。
緩存加鎖:其實針對於上面那種情況我們只需要保證在同一時刻對某個內存地址的操作是原子性的即可。緩存加鎖就是緩存在內存區域的數據如果在加鎖期間,當它執行鎖操作寫回內存時,處理器不在輸出LOCK#信號,而是修改內部的內存地址,利用緩存一致性協議來保證原子性。緩存一致性機制可以保證同一個內存區域的數據僅能被一個處理器修改,也就是說當CPU1修改緩存行中的i時使用緩存鎖定,那么CPU2就不能同時緩存了i的緩存行。
CAS缺陷
CAS雖然高效地解決了原子操作,但是還是存在一些缺陷的,它無法涵蓋互斥同步的所有場景,缺陷主要表現在三個方面:循環時間太長、只能保證一個共享變量原子操作、ABA問題。
循環時間太長
如果CAS一直不成功呢?這種情況絕對有可能發生,如果CAS自旋長時間地不成功,則會給CPU帶來非常大的開銷。在JUC中有些地方就限制了CAS自旋的次數,例如BlockingQueue的SynchronousQueue。
只能保證一個共享變量原子操作
看了CAS的實現就知道這只能針對一個共享變量,如果是多個共享變量就只能使用鎖了,當然如果你有辦法把多個變量整成一個變量,利用CAS也不錯。例如讀寫鎖中state的高低位
ABA問題
CAS中存在這樣一種場景:如果一個變量V初次讀取的時候是A值,如果在這段期間它的值曾經被改成了B,后來又被改回為A,那CAS檢查就會誤認為它從來沒有被改變過,但是實質上它已經發生了改變,這就是CAS操作的"ABA"問題。對於ABA問題其解決方案是加上版本號,即在每個變量都加上一個版本號,每次改變時加1,即A —> B —> A,變成1A —> 2B —> 3A。
用一個例子來闡述ABA問題所帶來的影響。
有如下鏈表
假如我們想要把B替換為A,也就是compareAndSet(this,A,B)。線程1執行B替換A操作,線程2主要執行如下動作,A 、B出棧,然后C、A入棧,最終該鏈表如下:
完成后線程1發現仍然是A,那么compareAndSet(this,A,B)成功,但是這時會存在一個問題就是B.next = null,compareAndSet(this,A,B)后,會導致C丟失,改棧僅有一個B元素,平白無故把C給丟失了。
AtomicStampedReference的compareAndSet()方法定義如下:
/** * Atomically sets the value of both the reference and stamp * to the given update values if the * current reference is {@code ==} to the expected reference * and the current stamp is equal to the expected stamp. * * @param expectedReference the expected value of the reference * @param newReference the new value for the reference * @param expectedStamp the expected value of the stamp * @param newStamp the new value for the stamp * @return {@code true} if successful */
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }
compareAndSet有四個參數,分別表示:預期引用、更新后的引用、預期標志、更新后的標志。源碼很好理解預期的引用 == 當前引用,預期的標識 == 當前標識,如果更新后的引用和標志和當前的引用和標志相等則直接返回true,否則通過Pair生成一個新的pair對象與當前pair CAS替換。Pair為AtomicStampedReference的內部類,主要用於記錄引用和版本戳信息(標識),定義如下:
private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } }
Pair記錄着對象的引用和版本戳,版本戳為int型,保持自增。同時Pair是一個不可變對象,其所有屬性全部定義為final,對外提供一個of方法,該方法返回一個新建的Pari對象。pair對象定義為volatile,保證多線程環境下的可見性。在AtomicStampedReference中,大多方法都是通過調用Pair的of方法來產生一個新的Pair對象,然后賦值給變量pair。如set方法:
/** * Unconditionally sets the value of both the reference and stamp. * * @param newReference the new value for the reference * @param newStamp the new value for the stamp */
public void set(V newReference, int newStamp) { Pair<V> current = pair; if (newReference != current.reference || newStamp != current.stamp) this.pair = Pair.of(newReference, newStamp); }
下面我們將通過一個例子可以可以看到AtomicStampedReference和AtomicInteger的區別。我們定義兩個線程,線程1負責將100 —> 110 —> 100,線程2執行 100 —>120,看兩者之間的區別。
public class Test { private static AtomicInteger atomicInteger = new AtomicInteger(100); private static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1); public static void main(String[] args) throws InterruptedException { //AtomicInteger
Thread at1 = new Thread(new Runnable() { @Override public void run() { atomicInteger.compareAndSet(100,110); atomicInteger.compareAndSet(110,100); } }); Thread at2 = new Thread(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(2); // at1,執行完
} catch (InterruptedException e) { e.printStackTrace(); } System.out.println("AtomicInteger:" + atomicInteger.compareAndSet(100,120)); } }); at1.start(); at2.start(); at1.join(); at2.join(); //AtomicStampedReference
Thread tsf1 = new Thread(new Runnable() { @Override public void run() { try { //讓 tsf2先獲取stamp,導致預期時間戳不一致
TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } // 預期引用:100,更新后的引用:110,預期標識getStamp() 更新后的標識getStamp() + 1
atomicStampedReference.compareAndSet(100,110,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1); atomicStampedReference.compareAndSet(110,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1); } }); Thread tsf2 = new Thread(new Runnable() { @Override public void run() { int stamp = atomicStampedReference.getStamp(); try { TimeUnit.SECONDS.sleep(2); //線程tsf1執行完
} catch (InterruptedException e) { e.printStackTrace(); } System.out.println("AtomicStampedReference:" +atomicStampedReference.compareAndSet(100,120,stamp,stamp + 1)); } }); tsf1.start(); tsf2.start(); } }
運行結果:

運行結果充分展示了AtomicInteger的ABA問題和AtomicStampedReference解決ABA問題。
Ref:
http://cmsblogs.com/?p=2235


