【Java】使用Atomic變量實現鎖


Atomic原子操作

在 Java 5.0 提供了 java.util.concurrent(簡稱JUC)包,在此包中增加了在並發編程中很常用的工具類

Java從JDK1.5開始提供了java.util.concurrent.atomic包,方便程序員在多線程環境下,無鎖的進行原子操作。原子變量的底層使用了處理器提供的原子指令,但是不同的CPU架構可能提供的原子指令不一樣,也有可能需要某種形式的內部鎖,所以該方法不能絕對保證線程不被阻塞。

在Atomic包里一共有12個類,四種原子更新方式,分別是原子更新基本類型,原子更新數組,原子更新引用和原子更新字段。Atomic包里的類基本都是使用Unsafe實現的包裝類。

  • 原子更新基本類型類: AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
  • 原子更新數組類:AtomicIntegerArray,AtomicLongArray
  • 原子更新引用類型:AtomicMarkableReference,AtomicStampedReference,AtomicReferenceArray
  • 原子更新字段類:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater

詳細介紹可以參考:Java中的Atomic包使用指南

Atomic的原理

下面通過AtomicInteger的源碼來看一下是怎么在沒有鎖的情況下保證數據正確性。首先看一下incrementAndGet()方法的實現:

1
2
3
4
5
6
7
/**
* Atomically increments by one the current value.
* @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

我們繼續看,unsafe.getAndAddInt() 的實現是什么樣的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Atomically adds the given value to the current value of a field
* or array element within the given object <code>o</code>
* at the given <code>offset</code>.
*
* @param o object/array to update the field/element in
* @param offset field/element offset
* @param delta the value to add
* @return the previous value
* @since 1.8
*/
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}


public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);

這是一個循環,offset是變量v在內存中相對於對象o起始位置的偏移,傳給JNI層用來計算這個value的內存絕對地址。

然后找到JNI的實現代碼,來看 native層的compareAndSwapInt()方法的實現。這個方法的實現是這樣的:

1
2
3
4
5
6
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); //計算變量的內存絕對地址
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

這個函數其實很簡單,就是去看一下obj 的 offset 上的那個位置上的值是多少,如果是 e,那就把它更新為 x,返回true,如果不是 e,那就什么也不做,並且返回false。里面的核心方法是Atomic::compxchg(),這個方法所屬的類文件是在os_cpu目錄下面,由此可以看出這個類是和CPU操作有關,進入代碼如下:

1
2
3
4
5
6
7
8
9
10
11
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
// alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}

這個方法里面都是匯編指令,看到LOCK_IF_MP也有鎖指令實現的原子操作,其實CAS也算是有鎖操作,只不過是由CPU來觸發,比synchronized性能好的多。

什么是CAS

​ CAS,Compare and Swap即比較並交換。 java.util.concurrent包借助CAS實現了區別於synchronized同步鎖的一種樂觀鎖。樂觀鎖就是每次去取數據的時候都樂觀的認為數據不會被修改,所以不會上鎖,但是在更新的時候會判斷一下在此期間數據有沒有更新。CAS有3個操作數:內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改為B,否則什么都不做。CAS的關鍵點在於,系統在硬件層面保證了比較並交換操作的原子性,處理器使用基於對緩存加鎖或總線加鎖的方式來實現多處理器之間的原子操作。

CAS的優缺點

  • CAS由於是在硬件層面保證的原子性,不會鎖住當前線程,它的效率是很高的。
  • CAS雖然很高效的實現了原子操作,但是它依然存在三個問題。

1、ABA問題。CAS在操作值的時候檢查值是否已經變化,沒有變化的情況下才會進行更新。但是如果一個值原來是A,變成B,又變成A,那么CAS進行檢查時會認為這個值沒有變化,但是實際上卻變化了。ABA問題的解決方法是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那么A-B-A 就變成1A-2B-3A。從Java1.5開始JDK的atomic包里提供了一個類AtomicStampedReference來解決ABA問題。

2、並發越高,失敗的次數會越多,CAS如果長時間不成功,會極大的增加CPU的開銷。因此CAS不適合競爭十分頻繁的場景。

3、只能保證一個共享變量的原子操作。當對多個共享變量操作時,CAS就無法保證操作的原子性,這時就可以用鎖,或者把多個共享變量合並成一個共享變量來操作。比如有兩個共享變量i=2,j=a,合並一下ij=2a,然后用CAS來操作ij。從Java1.5開始JDK提供了AtomicReference類來保證引用對象的原子性,你可以把多個變量放在一個對象里來進行CAS操作。

實現自旋鎖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 使用AtomicInteger實現自旋鎖
*/
public class SpinLock {

private AtomicInteger state = new AtomicInteger(0);

/**
* 自旋等待直到獲得許可
*/
public void lock(){
for (;;){
//CAS指令要鎖總線,效率很差。所以我們通過一個if判斷避免了多次使用CAS指令。
if (state.get() == 1) {
continue;
} else if(state.compareAndSet(0, 1)){
return;
}
}
}

public void unlock(){
state.set(0);
}
}

原理很簡單,就是一直CAS搶鎖,如果搶不到,就一直死循環,直到搶到了才退出這個循環。

自旋鎖實現起來非常簡單,如果關鍵區的執行時間很短,往往自旋等待會是一種比較高效的做法,它可以避免線程的頻繁切換和調度。但如果關鍵區的執行時間很長,那這種做法就會大量地浪費CPU資源。

針對關鍵區執行時間長的情況,該怎么辦呢?

實現可等待的鎖

如果關鍵區的執行時間很長,自旋的鎖會大量地浪費CPU資源,我們可以這樣改進:當一個線程拿不到鎖的時候,就讓這個線程先休眠等待。這樣,CPU就不會白白地空轉了。大致步驟如下:

  1. 需要一個容器,如果線程搶不到鎖,就把線程掛起來,並記錄到這個容器里。
  2. 當一個線程放棄了鎖,得從容器里找出一個掛起的線程,把它恢復了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 使用AtomicInteger實現可等待鎖
*/
public class BlockLock implements Lock {

private AtomicInteger state = new AtomicInteger(0);
private ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();

@Override
public void lock() {
if (state.compareAndSet(0, 1)) {
return;
}
//放到等待隊列
waiters.add(Thread.currentThread());

for (;;) {
if (state.get() == 0) {
if (state.compareAndSet(0, 1)) {
waiters.remove(Thread.currentThread());
return;
}
} else {
LockSupport.park(); //掛起線程
}
}
}

@Override
public void unlock() {
state.set(0);
//喚醒等待隊列的第一個線程
Thread waiterHead = waiters.peek();
if(waiterHead != null){
LockSupport.unpark(waiterHead); //喚醒線程
}
}
}

我們引入了一個 waitList,用於存儲搶不到鎖的線程,讓它掛起。這里我們先借用一下JDK里的ConcurrentLinkedQueue,因為這個Queue也是使用CAS操作實現的無鎖隊列,所以並不會引入JDK里的其他鎖機制。如果大家去看AbstractQueuedSynchronizer的實現,就會發現,它的acquire()方法的邏輯與上面的實現是一樣的。

不過上面的代碼是不是沒問題了呢?如果一個線程在還未調用park掛起之前,是不是有可能被其他線程先調用一遍unpark?這就是喚醒發生在休眠之前。發生這樣的情況會不會帶來問題呢?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM