對於並發控制而言,
鎖是一種悲觀的策略。它總是假設每一次的臨界區操作會產生沖突,因此,必須對每次操作都小心翼翼。如果有多個線程同時需要訪問臨界區資源,就寧可犧牲性能讓線程進行等待,所以說鎖會阻塞線程執行。
而無鎖是一種樂觀的策略,它會假設對資源的訪問是沒有沖突的。既然沒有沖突,自然不需要等待,所以所有的線程都可以在不停頓的狀態下持續執行。那遇到沖突怎么辦呢?無鎖的策略使用一種叫做比較交換的技術(CAS Compare And Swap)來鑒別線程沖突,一旦檢測到沖突產生,就重試當前操作直到沒有沖突為止。
無鎖的好處:
第一,在高並發的情況下,它比有鎖的程序擁有更好的性能;
第二,它天生就是死鎖免疫的。
就憑借這兩個優勢,就值得我們冒險嘗試使用無鎖的並發。
1.與眾不同的並發策略:比較交換(CAS)
與鎖相比,使用比較交換(下文簡稱CAS)會使程序看起來更加復雜一些。但由於其非阻塞性,它對死鎖問題天生免疫,並且,線程間的相互影響也遠遠比基於鎖的方式要小。更為重要的是,使用無鎖的方式完全沒有鎖競爭帶來的系統開銷,也沒有線程間頻繁調度帶來的開銷,因此,它要比基於鎖的方式擁有更優越的性能。
CAS算法的過程是這樣:它包含三個參數CAS(V,E,N)。V表示要更新的變量,E表示預期值,N表示新值。僅當V值等於E值時,才會將V的值設為N,如果V值和E值不同,則說明已經有其他線程做了更新,則當前線程什么都不做。最后,CAS返回當前V的真實值。CAS操作是抱着樂觀的態度進行的,它總是認為自己可以成功完成操作。當多個線程同時使用CAS操作一個變量時,只有一個會勝出,並成功更新,其余均會失敗。失敗的線程不會被掛起,僅是被告知失敗,並且允許再次嘗試,當然也允許失敗的線程放棄操作。基於這樣的原理,CAS操作即使沒有鎖,也可以發現其他線程對當前線程的干擾,並進行恰當的處理。
簡單地說,CAS需要你額外給出一個期望值,也就是你認為這個變量現在應該是什么樣子的。如果變量不是你想象的那樣,那說明它已經被別人修改過了。你就重新讀取,再次嘗試修改就好了。
在硬件層面,大部分的現代處理器都已經支持原子化的CAS指令。在JDK 5.0以后,虛擬機便可以使用這個指令來實現並發操作和並發數據結構,並且,這種操作在虛擬機中可以說是無處不在。
2.無鎖的線程安全整數:AtomicI nteger
為了讓Java程序員能夠受益於CAS等CPU指令,JDK並發包中有一個atomic包,里面實現了一些直接使用CAS操作的線程安全的類型。其中,最常用的一個類,應該就是AtomicIn-teger。你可以把它看做是一個整數。但是與Inte-ger不同,它是可變的,並且是線程安全的。對其進行修改等任何操作,都是用CAS指令進行的。這里簡單列舉一下AtomicInteger的一些主要方法,對於其他原子類,操作也是非常類似的:
public final int get()//取得當前值
public final void set(int newValue)//設置當前值
public final int getAndSet(int newValue)//設置新值,並返回舊值
public final boolean compareAndSet(int expect, int u)//如果當前值為expect,則設置為u
public final int getAndIncrement()//當前值加1,返回舊值
public final int getAndDecrement()//當前值減1,返回舊值
public final int getAndAdd(int delta)//當前值增加delta,返回舊值
public final int incrementAndGet()//當前值加1,返回新值
public final int decrementAndGet()//當前值減1,返回新值
public final int addAndGet(int delta)//當前值增加delta,返回新值
就內部實現上來說,AtomicInteger中保存一個核心字段:
private volatile int value;它就代表了AtomicInteger的當前實際取值。
此外還有一個:
private static final long valueOffset;它保存着value字段在AtomicInteger對象中的偏移量。后面你會看到,這個偏移量是實現AtomicInteger的關鍵。
和AtomicInteger類似的類還有AtomicLong用來代表long型,AtomicBoolean表示boolean型,AtomicReference表示對象引用。
3.Java中的指針:Unsafe類
在AtomicInteger中compareAndSet()方法:
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
有一個特殊的變量unsafe,它是sun.misc.Unsafe類型。這個類封裝了一些不安全的操作,類似C語言中指針的操作。
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
方法是一個navtive方法,它的參數含義是:
var1為給定的對象
var2為對象內的偏移量(其實就是一個字段到對象頭部的偏移量,通過這個偏移量可以快速定位字段)
var4表示期望值
xvar5要設置的值。如果指定的字段的值等於var4,那么就會把它設置為var5。
不難看出,compareAndSwapInt()方法的內部,必然是使用CAS原子指令來完成的
此外,Unsafe類還提供了一些方法,主要有以下幾個(以Int操作為例,其他數據類型是類似的):
public native int getInt(Object o, long offset);//獲得給定對象偏移量上的int值
public native void putInt(Object o, long offset, int x);//設置給定對象偏移量上的int值
public native long objectFieldOffset(Field f);//獲得字段在對象中的偏移量
public native void putIntVolatile(Object o, long offset, int x);//設置給定對象的int值,使用volatile語義
public native int getIntVolatile(Object o, long offset);//獲得給定對象對象的int值,使用volatile語義
public native void putOrderedInt(Object o, long offset, int x);//和putIntVolatile()一樣,但是它要求被操作字段就是volatile類型的
這里就可以看到,雖然Java拋棄了指針。但是在關鍵時刻,類似指針的技術還是必不可少的。這里底層的Unsafe實現就是最好的例子。但是很不幸,JDK的開發人員並不希望大家使用這個類。獲得Unsafe實例的方法是調動其工廠方法getUnsafe()。但是,它的實現卻是這樣:
@CallerSensitive
public static Unsafe getUnsafe() {
Class var0 = Reflection.getCallerClass();
if(!VM.isSystemDomainLoader(var0.getClassLoader())) {
throw new SecurityException("Unsafe");
} else {
return theUnsafe;
}
}
注意加粗部分的代碼,它會檢查調用getUnsafe()函數的類,如果這個類的ClassLoader不為null,就直接拋出異常,拒絕工作。因此,這也使得我們自己的應用程序無法直接使用Unsafe類。它是一個JDK內部使用的專屬類。
注意:根據Java類加載器的工作原理,應用程序的類由App Loader加載。而系統核心類,如rt.jar中的類由Bootstrap類加載器加載。Bootstrap加載器沒有Java對象的對象,因此試圖獲得這個類加載器會返回null。所以,當一個類的類加載器為null時,說明它是由Bootstrap加載的,而這個類也極有可能是rt.jar中的類。
4.無鎖的對象引用:AtomicReference
AtomicReference和AtomicInteger非常類似,不同之處就在於AtomicInteger是對整數的封裝,而AtomicReference則對應普通的對象引用。也就是它可以保證你在修改對象引用時的線程安全性。在介紹AtomicReference的同時,我希望同時提出一個有關原子操作的邏輯上的不足。
之前我們說過,線程判斷被修改對象是否可以正確寫入的條件是對象的當前值和期望值是否一致。這個邏輯從一般意義上來說是正確的。但有可能出現一個小小的例外,就是當你獲得對象當前數據后,在准備修改為新值前,對象的值被其他線程連續修改了兩次,而經過這兩次修改后,對象的值又恢復為舊值。這樣,當前線程就無法正確判斷這個對象究竟是否被修改過,如圖:

雖然說這種情況出現的概率不大,但是依然是有可能出現的。因此,當業務上確實可能出現這種情況時,我們也必須多加防范。體貼的JDK也已經為我們考慮到了這種情況,使用AtomicStampedReference就可以很好地解決這個問題。
5.帶有時間戳的對象引用:AtomicStampedReference
AtomicReference無法解決上述問題的根本因為是對象在修改過程中,丟失了狀態信息。
AtomicStampedReference,它內部不僅維護了對象值,還維護了一個時間戳(我這里把它稱為時間戳,實際上它可以使任何一個整數來表示狀態值)。當AtomicStampedReference對應的數值被修改時,除了更新數據本身外,還必須要更新時間戳。
當AtomicStampedReference設置對象值時,對象值以及時間戳都必須滿足期望值,寫入才會成功。因此,即使對象值被反復讀寫,寫回原值,只要時間戳發生變化,就能防止不恰當的寫入。
AtomicStampedReference的幾個API在AtomicReference的基礎上新增了有關時間戳的信息:
public boolean compareAndSet(V expectedReference,VnewReference,int expectedStamp,int newStamp)//比較設置 參數依次為:期望值 寫入新值 期望時間戳 新時間戳
public V getReference()//獲得當前對象引用
public int getStamp()//獲得當前時間戳
public void set(V newReference, int newStamp)//設置當前對象引用和時間戳
6.數組也能無鎖:AtomicIntegerArray
除了提供基本數據類型外,JDK還為我們准備了數組等復合結構。當前可用的原子數組有:AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray,分別表示整數數組、long型數組和普通的對象數組。
這里以AtomicIntegerArray為例,展示原子數組的使用方式。
AtomicIntegerArray本質上是對int[]類型的封裝,使用Unsafe類通過CAS的方式控制int[]在多線程下的安全性。它提供了以下幾個核心API:
public final int get(int i)//獲得數組第i個下標的元素
public final int length()//獲得數組的長度
public final int getAndSet(int i, int newValue)//將數組第i個下標設置為newValue,並返回舊的值
public final boolean compareAndSet(int i, int expect, int update)//進行CAS操作,如果第i個下標的元素等於expect,則設置為update,設置成功返回true
public final int getAndIncrement(int i)//將第i個下標的元素加1
public final int getAndDecrement(int i)//將第i個下標的元素減1
public final int getAndAdd(int i, int delta)//將第i個下標的元素增加delta(delta可以是負數)
7.讓普通變量也享受原子操作:AtomicIntegerFieldUpdater
將普通變量也變成線性安全的。
在原子包里還有一個實用的工具類AtomicIn-tegerFieldUpdater。它可以讓你在不改動(或者極少改動)原有代碼的基礎上,讓普通的變量也享受CAS操作帶來的線程安全性,這樣你可以修改極少的代碼,來獲得線程安全的保證。
根據數據類型不同,這個Updater有三種,分別是AtomicIntegerFieldUpdater、AtomicLong-FieldUpdater和AtomicReferenceFieldUpdater。顧名思義,它們分別可以對int、long和普通對象進行CAS修改。
雖然AtomicIntegerFieldUpdater很好用,但是還是有幾個注意事項:
第一,Updater只能修改它可見范圍內的變量。因為Updater使用反射得到這個變量。如果變量不可見,就會出錯。比如如果score申明為private,就是不可行的。
第二,為了確保變量被正確的讀取,它必須是volatile類型的。如果我們原有代碼中未申明這個類型,那么簡單地申明一下就行,這不會引起什么問題。
第三,由於CAS操作會通過對象實例中的偏移量直接進行賦值,因此,它不支持static字段(Unsafe. objectFieldOffset()不支持靜態變量)。
8.SynchronousQueue的實現
在對線程池的介紹中,提到了一個非常特殊的等待隊列SynchronousQueue。Syn-chronousQueue的容量為0,任何一個對Syn-chronousQueue的寫需要等待一個對Syn-chronousQueue的讀,因此,Syn-chronousQueue與其說是一個隊列,不如說是一個數據交換通道。
對SynchronousQueue來說,它將put()和take()兩個功能截然不同的操作抽象為一個共通的方法Transferer.transfer()。
Object transfer(Object e, boolean timed, long nanos)
當參數e為非空時,表示當前操作傳遞給一個消費者,如果為空,則表示當前操作需要請求一個數據。timed參數決定是否存在timeout時間,nanos決定了timeout的時長。如果返回值非空,則表示數據已經接受或者正常提供,如果為空,則表示失敗(超時或者中斷)。
SynchronousQueue內部會維護一個線程等待隊列。等待隊列中會保存等待線程以及相關數據的信息。比如,生產者將數據放入Syn-chronousQueue時,如果沒有消費者接收,那么數據本身和線程對象都會打包在隊列中等待(因為SynchronousQueue容積為0,沒有數據可以正常放入)。
Transferer.transfer()函數的實現是Syn-chronousQueue的核心,它大體上分為三個步驟:
/*
* Basic algorithm is to loop trying one of three actions:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
* returning it, or null if cancelled.
*
* 2. If apparently containing node of complementary mode,
* try to push a fulfilling node on to stack, match
* with corresponding waiting node, pop both from
* stack, and return matched item. The matching or
* unlinking might not actually be necessary because of
* other threads performing action 3:
*
* 3. If top of stack already holds another fulfilling node,
* help it out by doing its match and/or pop
* operations, and then continue. The code for helping
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*/
1. 如果等待隊列為空,或者隊列中節點的類型和本次操作是一致的,那么將當前操作壓入隊列等待。比如,等待隊列中是讀線程等待,本次操作也是讀,因此這兩個讀都需要等待。進入等待隊列的線程可能會被掛起,它們會等待一個“匹配”操作。
2. 如果等待隊列中的元素和本次操作是互補的(比如等待操作是讀,而本次操作是寫),那么就插入一個“完成”狀態的節點,並且讓他“匹配”到一個等待節點上。接着彈出這兩個節點,並且使得對應的兩個線程繼續執行。
3. 如果線程發現等待隊列的節點就是“完成”節點,那么幫助這個節點完成任務。其流程和步驟2是一致的。