目錄
無鎖即無障礙的運行, 所有線程都可以到達臨界區, 接近於無等待.
無鎖采用CAS(compare and swap)算法來處理線程沖突, 其原理如下
CAS原理
CAS包含3個參數CAS(V,E,N).V表示要更新的變量, E表示預期值, N表示新值.
僅當V值等於E值時, 才會將V的值設為N, 如果V值和E值不同, 則說明已經有其他線程做了更新, 則當前線程什么
都不做. 最后, CAS返回當前V的真實值. CAS操作是抱着樂觀的態度進行的, 它總是認為自己可以成功完成操作.
當多個線程同時使用CAS操作一個變量時, 只有一個會勝出, 並成功更新, 其余均會失敗.失敗的線程不會被掛起,
僅是被告知失敗, 並且允許再次嘗試, 當然也允許失敗的線程放棄操作.基於這樣的原理, CAS操作即時沒有鎖,
也可以發現其他線程對當前線程的干擾, 並進行恰當的處理.
CPU指令
另外, 雖然上述步驟繁多, 實際上CAS整一個操作過程是一個原子操作, 它是由一條CPU指令完成的,
從指令層保證操作可靠, 不會被多線程干擾.
無鎖與volatile
無鎖可以通過cas來保證原子性與線程安全, 他與volatile什么區別呢?
當給變量加了volatile關鍵字, 表示該變量對所有線程可見, 但不保證原子性.
以volatile i, i++為例, 分為以下四步:
- 加載i
- 對i進行+1
- 回寫i的值
- 用內存屏障通知其他線程i的值
其中前三步是線程不安全的, 可能其他線程會對i進行讀寫.
因此任何依賴於之前值的操作, 如i++, i = i *10使用volatile都不安全.
而諸如get/set, boolean這類可以使用volatile.
AtomicInteger
主要接口
1 // 取得當前值 2 public final int get() 3 // 設置當前值 4 public final void set(int newValue) 5 // 設置新值,並返回舊值 6 public final int getAndSet(int newValue) 7 // 如果當前值為expect,則設置為u 8 public final boolean compareAndSet(int expect, int u) 9 // 當前值加1,返回舊值 10 public final int getAndIncrement() 11 // 當前值減1,返回舊值 12 public final int getAndDecrement() 13 // 當前值增加delta,返回舊值 14 public final int getAndAdd(int delta) 15 // 當前值加1,返回新值 16 public final int incrementAndGet() 17 // 當前值減1,返回新值 18 public final int decrementAndGet() 19 // 當前值增加delta,返回新值 20 public final int addAndGet(int delta)
源碼實現
1 // 封裝了一個int對其加減 2 private volatile int value; 3 ....... 4 public final boolean compareAndSet(int expect, int update) { 5 // 通過unsafe 基於CPU的CAS指令來實現, 可以認為無阻塞. 6 return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 7 } 8 ....... 9 public final int getAndIncrement() { 10 for (;;) { 11 // 當前值 12 int current = get(); 13 // 預期值 14 int next = current + 1; 15 if (compareAndSet(current, next)) { 16 // 如果加成功了, 則返回當前值 17 return current; 18 } 19 // 如果加失敗了, 說明其他線程已經修改了數據, 與期望不相符, 20 // 則繼續無限循環, 直到成功. 這種樂觀鎖, 理論上只要等兩三個時鍾周期就可以設值成功 21 // 相比於直接通過synchronized獨占鎖的方式操作int, 要大大節約等待時間. 22 } 23 }
Demo
使用10個線程打印0-10000, 最終得到結果10w.
1 import java.util.concurrent.atomic.AtomicInteger; 2 3 public class AtomicIntegerDemo { 4 static AtomicInteger i = new AtomicInteger(); 5 6 public static class AddThread implements Runnable { 7 public void run() { 8 for (int k = 0; k < 10000; k++) { 9 i.incrementAndGet(); 10 } 11 } 12 } 13 14 public static void main(String[] args) throws InterruptedException { 15 Thread[] ts = new Thread[10]; 16 for (int k = 0; k < 10; k++) { 17 ts[k] = new Thread(new AddThread()); 18 } 19 for (int k = 0; k < 10; k++) { 20 ts[k].start(); 21 } 22 for (int k = 0; k < 10; k++) { 23 ts[k].join(); 24 } 25 System.out.println(i); 26 } 27 }
Unsafe
Unsafe類是在sun.misc包下, 可以用於一些非安全的操作,比如:
根據偏移量設置值, 線程park(), 底層的CAS操作等等.
1 // 獲取類實例中變量的偏移量 2 valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value")); 3 // 基於偏移量對值進行操作 4 unsafe.compareAndSwapInt(this, valueOffset, expect, update);
主要接口
1 // 獲得給定對象偏移量上的int值 2 public native int getInt(Object o, long offset); 3 // 設置給定對象偏移量上的int值 4 public native void putInt(Object o, long offset, int x); 5 // 獲得字段在對象中的偏移量 6 public native long objectFieldOffset(Field f); 7 // 設置給定對象的int值,使用volatile語義 8 public native void putIntVolatile(Object o, long offset, int x); 9 // 獲得給定對象對象的int值,使用volatile語義 10 public native int getIntVolatile(Object o, long offset); 11 // 和putIntVolatile()一樣,但是它要求被操作字段就是volatile類型的 12 public native void putOrderedInt(Object o, long offset, int x);
AtomicReference
與AtomicInteger類似, 只是里面封裝了一個對象, 而不是int, 對引用進行修改
主要接口
1 get() 2 set(V) 3 compareAndSet() 4 getAndSet(V)
Demo
使用10個線程, 同時嘗試修改AtomicReference中的String, 最終只有一個線程可以成功.
1 import java.util.concurrent.atomic.AtomicReference; 2 3 public class AtomicReferenceTest { 4 public final static AtomicReference<String> attxnicStr = new AtomicReference<String>("abc"); 5 6 public static void main(String[] args) { 7 for (int i = 0; i < 10; i++) { 8 new Thread() { 9 public void run() { 10 try { 11 Thread.sleep(Math.abs((int) (Math.random() * 100))); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 if (attxnicStr.compareAndSet("abc", "def")) { 16 System.out.println("Thread:" + Thread.currentThread().getId() + " change value to " + attxnicStr.get()); 17 } else { 18 System.out.println("Thread:" + Thread.currentThread().getId() + " change failed!"); 19 } 20 } 21 }.start(); 22 } 23 } 24 }
AtomicStampedReference
也是封裝了一個引用, 主要解決ABA問題.
ABA問題
線程一准備用CAS將變量的值由A替換為B, 在此之前線程二將變量的值由A替換為C, 線程三又將C替換為A, 然后線程一執行CAS時發現變量的值仍然為A, 所以線程一CAS成功.
主要接口
1 // 比較設置 參數依次為:期望值 寫入新值 期望時間戳 新時間戳 2 public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) 3 // 獲得當前對象引用 4 public V getReference() 5 // 獲得當前時間戳 6 public int getStamp() 7 // 設置當前對象引用和時間戳 8 public void set(V newReference, int newStamp)
源碼分析
1 // 內部封裝了一個Pair對象, 每次對對象操作的時候, stamp + 1 2 private static class Pair<T> { 3 final T reference; 4 final int stamp; 5 private Pair(T reference, int stamp) { 6 this.reference = reference; 7 this.stamp = stamp; 8 } 9 static <T> Pair<T> of(T reference, int stamp) { 10 return new Pair<T>(reference, stamp); 11 } 12 } 13 14 private volatile Pair<V> pair; 15 16 // 進行cas操作的時候, 會對比stamp的值 17 public boolean compareAndSet(V expectedReference, 18 V newReference, 19 int expectedStamp, 20 int newStamp) { 21 Pair<V> current = pair; 22 return 23 expectedReference == current.reference && 24 expectedStamp == current.stamp && 25 ((newReference == current.reference && 26 newStamp == current.stamp) || 27 casPair(current, Pair.of(newReference, newStamp))); 28 }
Demo
后台使用多個線程對用戶充值, 要求只能充值一次
1 public class AtomicStampedReferenceDemo { 2 static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(19,0); 3 public staticvoid main(String[] args) { 4 //模擬多個線程同時更新后台數據庫,為用戶充值 5 for(int i = 0 ; i < 3 ; i++) { 6 final int timestamp=money.getStamp(); 7 newThread() { 8 public void run() { 9 while(true){ 10 while(true){ 11 Integerm=money.getReference(); 12 if(m<20){ 13 if(money.compareAndSet(m,m+20,timestamp,timestamp+1)){ 14 System.out.println("余額小於20元,充值成功,余額:"+money.getReference()+"元"); 15 break; 16 } 17 }else{ 18 //System.out.println("余額大於20元,無需充值"); 19 break ; 20 } 21 } 22 } 23 } 24 }.start(); 25 } 26 27 //用戶消費線程,模擬消費行為 28 new Thread() { 29 publicvoid run() { 30 for(int i=0;i<100;i++){ 31 while(true){ 32 int timestamp=money.getStamp(); 33 Integer m=money.getReference(); 34 if(m>10){ 35 System.out.println("大於10元"); 36 if(money.compareAndSet(m, m-10,timestamp,timestamp+1)){ 37 System.out.println("成功消費10元,余額:"+money.getReference()); 38 break; 39 } 40 }else{ 41 System.out.println("沒有足夠的金額"); 42 break; 43 } 44 } 45 try {Thread.sleep(100);} catch (InterruptedException e) {} 46 } 47 } 48 }.start(); 49 } 50 }
AtomicIntegerArray
支持無鎖的數組
主要接口
1 // 獲得數組第i個下標的元素 2 public final int get(int i) 3 // 獲得數組的長度 4 public final int length() 5 // 將數組第i個下標設置為newValue,並返回舊的值 6 public final int getAndSet(int i, int newValue) 7 // 進行CAS操作,如果第i個下標的元素等於expect,則設置為update,設置成功返回true 8 public final boolean compareAndSet(int i, int expect, int update) 9 // 將第i個下標的元素加1 10 public final int getAndIncrement(int i) 11 // 將第i個下標的元素減1 12 public final int getAndDecrement(int i) 13 // 將第i個下標的元素增加delta(delta可以是負數) 14 public final int getAndAdd(int i, int delta)
源碼分析
1 // 數組本身基地址 2 private static final int base = unsafe.arrayBaseOffset(int[].class); 3 4 // 封裝了一個數組 5 private final int[] array; 6 7 static { 8 // 數組中對象的寬度, int類型, 4個字節, scale = 4; 9 int scale = unsafe.arrayIndexScale(int[].class); 10 if ((scale & (scale - 1)) != 0) 11 throw new Error("data type scale not a power of two"); 12 // 前導0 : 一個數字轉為二進制后, 他前面0的個數 13 // 對於4來講, 他就是00000000 00000000 00000000 00000100, 他的前導0 就是29 14 // 所以shift = 2 15 shift = 31 - Integer.numberOfLeadingZeros(scale); 16 } 17 18 // 獲取第i個元素 19 public final int get(int i) { 20 return getRaw(checkedByteOffset(i)); 21 } 22 23 // 第i個元素, 在數組中的偏移量是多少 24 private long checkedByteOffset(int i) { 25 if (i < 0 || i >= array.length) 26 throw new IndexOutOfBoundsException("index " + i); 27 28 return byteOffset(i); 29 } 30 31 // base : 數組基地址, i << shift, 其實就是i * 4, 因為這邊是int array. 32 private static long byteOffset(int i) { 33 // i * 4 + base 34 return ((long) i << shift) + base; 35 } 36 37 // 根據偏移量從數組中獲取數據 38 private int getRaw(long offset) { 39 return unsafe.getIntVolatile(array, offset); 40 }
Demo
1 import java.util.concurrent.atomic.AtomicIntegerArray; 2 3 public class AtomicArrayDemo { 4 static AtomicIntegerArray arr = new AtomicIntegerArray(10); 5 6 public static class AddThread implements Runnable { 7 public void run() { 8 for (int k = 0; k < 10000; k++) { 9 arr.incrementAndGet(k % arr.length()); 10 } 11 } 12 } 13 14 public static void main(String[] args) throws InterruptedException { 15 Thread[] ts = new Thread[10]; 16 for (int k = 0; k < 10; k++) { 17 ts[k] = new Thread(new AddThread()); 18 } 19 for (int k = 0; k < 10; k++) { 20 ts[k].start(); 21 } 22 for (int k = 0; k < 10; k++) { 23 ts[k].join(); 24 } 25 System.out.println(arr); 26 } 27 }
AtomicIntegerFieldUpdater
讓普通變量也享受原子操作
主要接口
1 AtomicIntegerFieldUpdater.newUpdater() 2 incrementAndGet()
- Updater只能修改它可見范圍內的變量。因為Updater使用反射得到這個變量。如果變量不可見,就會出錯。比如如果score申明為private,就是不可行的。
- 為了確保變量被正確的讀取,它必須是volatile類型的。如果我們原有代碼中未申明這個類型,那么簡單得申明一下就行,這不會引起什么問題。
- 由於CAS操作會通過對象實例中的偏移量直接進行賦值,因此,它不支持static字段(Unsafe.objectFieldOffset()不支持靜態變量)。
1 import java.util.concurrent.atomic.AtomicInteger; 2 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; 3 4 public class AtomicIntegerFieldUpdaterDemo { 5 public static class Candidate { 6 int id; 7 // 如果直接把int改成atomicinteger, 可能對代碼破壞比較大 8 // 因此使用AtomicIntegerFieldUpdater對score進行封裝 9 volatile int score; 10 } 11 12 // 通過反射實現 13 public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score"); 14 // 檢查Updater是否工作正確, allScore的結果應該跟score一致 15 public static AtomicInteger allScore = new AtomicInteger(0); 16 17 public static void main(String[] args) throws InterruptedException { 18 final Candidate stu = new Candidate(); 19 Thread[] t = new Thread[10000]; 20 for (int i = 0; i < 10000; i++) { 21 t[i] = new Thread() { 22 public void run() { 23 if (Math.random() > 0.4) { 24 scoreUpdater.incrementAndGet(stu); 25 allScore.incrementAndGet(); 26 } 27 } 28 }; 29 t[i].start(); 30 } 31 for (int i = 0; i < 10000; i++) { 32 t[i].join(); 33 } 34 35 System.out.println("score=" + stu.score); 36 System.out.println("allScore=" + allScore); 37 } 38 }
無鎖的Vector
jdk中Vector是加鎖的, 網上找的一個無鎖Vector LockFreeVector, 給他添加了源碼中文注釋.
主要關注push_back, 添加元素的函數
1 import java.util.AbstractList; 2 import java.util.concurrent.atomic.AtomicReference; 3 import java.util.concurrent.atomic.AtomicReferenceArray; 4 5 /** 6 * It is a thread safe and lock-free vector. 7 * This class implement algorithm from:<br> 8 * 9 * Lock-free Dynamically Resizable Arrays <br> 10 * 11 * @param <E> type of element in the vector 12 * 13 */ 14 public class LockFreeVector<E> extends AbstractList<E> { 15 private static final boolean debug = false; 16 /** 17 * Size of the first bucket. sizeof(bucket[i+1])=2*sizeof(bucket[i]) 18 */ 19 private static final int FIRST_BUCKET_SIZE = 8; 20 21 /** 22 * number of buckets. 30 will allow 8*(2^30-1) elements 23 */ 24 private static final int N_BUCKET = 30; 25 26 /** 27 * We will have at most N_BUCKET number of buckets. And we have 28 * sizeof(buckets.get(i))=FIRST_BUCKET_SIZE**(i+1) 29 * 30 * 為什么AtomicReferenceArray里再套一個AtomicReferenceArray呢, 類似一個籃子(buckets)里放了很多籃子 31 * 為了在容量擴展時希望盡可能少的改動原有數據, 因此把一維數組擴展成二維數組. 32 * 該二維數組並非均衡的分布. 可能第一個數組8個元素, 第二個數組16個元素, 第三個數組32個...... 33 */ 34 private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets; 35 36 /** 37 * @param <E> 38 */ 39 static class WriteDescriptor<E> { 40 public E oldV; 41 public E newV; 42 public AtomicReferenceArray<E> addr; 43 public int addr_ind; 44 45 /** 46 * Creating a new descriptor. 47 * 48 * @param addr Operation address 對哪個數組進行寫 49 * @param addr_ind Index of address 指定index 50 * @param oldV old operand 51 * @param newV new operand 52 */ 53 public WriteDescriptor(AtomicReferenceArray<E> addr, int addr_ind, 54 E oldV, E newV) { 55 this.addr = addr; 56 this.addr_ind = addr_ind; 57 this.oldV = oldV; 58 this.newV = newV; 59 } 60 61 /** 62 * set newV. 63 */ 64 public void doIt() { 65 // 這邊失敗后重試的邏輯在另外的代碼里. 66 addr.compareAndSet(addr_ind, oldV, newV); 67 } 68 } 69 70 /** 71 * @param <E> 72 */ 73 static class Descriptor<E> { 74 public int size; 75 volatile WriteDescriptor<E> writeop; 76 77 /** 78 * Create a new descriptor. 79 * 80 * @param size Size of the vector 81 * @param writeop Executor write operation 82 */ 83 public Descriptor(int size, WriteDescriptor<E> writeop) { 84 this.size = size; 85 this.writeop = writeop; 86 } 87 88 /** 89 * 90 */ 91 public void completeWrite() { 92 WriteDescriptor<E> tmpOp = writeop; 93 if (tmpOp != null) { 94 tmpOp.doIt(); 95 writeop = null; // this is safe since all write to writeop use 96 // null as r_value. 97 } 98 } 99 } 100 101 private AtomicReference<Descriptor<E>> descriptor; 102 private static final int zeroNumFirst = Integer 103 .numberOfLeadingZeros(FIRST_BUCKET_SIZE); 104 105 /** 106 * Constructor. 107 */ 108 public LockFreeVector() { 109 buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET); 110 buckets.set(0, new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE)); 111 descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0, 112 null)); 113 } 114 115 /** 116 * add e at the end of vector. 117 * 把元素e加到vector中 118 * 119 * @param e 120 * element added 121 */ 122 public void push_back(E e) { 123 Descriptor<E> desc; 124 Descriptor<E> newd; 125 do { 126 desc = descriptor.get(); 127 desc.completeWrite(); 128 // desc.size Vector 本身的大小 129 // FIRST_BUCKET_SIZE 第一個一維數組的大小 130 int pos = desc.size + FIRST_BUCKET_SIZE; 131 // 取出pos 的前導0 132 int zeroNumPos = Integer.numberOfLeadingZeros(pos); 133 // zeroNumFirst 為FIRST_BUCKET_SIZE 的前導0 134 // bucketInd 數據應該放到哪一個一維數組(籃子)里的 135 int bucketInd = zeroNumFirst - zeroNumPos; 136 // 00000000 00000000 00000000 00001000 第一個籃子滿 8 137 // 00000000 00000000 00000000 00011000 第二個籃子滿 8 + 16 138 // 00000000 00000000 00000000 00111000 第三個籃子滿 8 + 16 + 32 139 // ... bucketInd其實通過前導0相減, 就是為了得出來當前第幾個籃子是空的. 140 141 // 判斷這個一維數組是否已經啟用, 可能是第一次初始化 142 if (buckets.get(bucketInd) == null) { 143 //newLen 一維數組的長度, 取前一個數組長度 * 2 144 int newLen = 2 * buckets.get(bucketInd - 1).length(); 145 // 設置失敗也沒關系, 只要有人初始化成功就行 146 buckets.compareAndSet(bucketInd, null, 147 new AtomicReferenceArray<E>(newLen)); 148 } 149 150 // 在這個一位數組中,我在哪個位置 151 // 0x80000000是 10000000 00000000 00000000 00000000 152 // 這句話就是把上述111000, 第一個1變成了0, 得到011000, 即新值的位置. 153 int idx = (0x80000000>>>zeroNumPos) ^ pos; 154 // 通過bucketInd與idx來確定元素在二維數組中的位置 155 // 期望寫入的時候, 該位置值是null, 如果非null, 說明其他線程已經寫了, 則繼續循環. 156 newd = new Descriptor<E>(desc.size + 1, new WriteDescriptor<E>( 157 buckets.get(bucketInd), idx, null, e)); 158 // 循環cas設值 159 } while (!descriptor.compareAndSet(desc, newd)); 160 descriptor.get().completeWrite(); 161 } 162 163 /** 164 * Remove the last element in the vector. 165 * 166 * @return element removed 167 */ 168 public E pop_back() { 169 Descriptor<E> desc; 170 Descriptor<E> newd; 171 E elem; 172 do { 173 desc = descriptor.get(); 174 desc.completeWrite(); 175 176 int pos = desc.size + FIRST_BUCKET_SIZE - 1; 177 int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 178 - Integer.numberOfLeadingZeros(pos); 179 int idx = Integer.highestOneBit(pos) ^ pos; 180 elem = buckets.get(bucketInd).get(idx); 181 newd = new Descriptor<E>(desc.size - 1, null); 182 } while (!descriptor.compareAndSet(desc, newd)); 183 184 return elem; 185 } 186 187 /** 188 * Get element with the index. 189 * 190 * @param index 191 * index 192 * @return element with the index 193 */ 194 @Override 195 public E get(int index) { 196 int pos = index + FIRST_BUCKET_SIZE; 197 int zeroNumPos = Integer.numberOfLeadingZeros(pos); 198 int bucketInd = zeroNumFirst - zeroNumPos; 199 int idx = (0x80000000>>>zeroNumPos) ^ pos; 200 return buckets.get(bucketInd).get(idx); 201 } 202 203 /** 204 * Set the element with index to e. 205 * 206 * @param index 207 * index of element to be reset 208 * @param e 209 * element to set 210 */ 211 /** 212 * {@inheritDoc} 213 */ 214 public E set(int index, E e) { 215 int pos = index + FIRST_BUCKET_SIZE; 216 int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 217 - Integer.numberOfLeadingZeros(pos); 218 int idx = Integer.highestOneBit(pos) ^ pos; 219 AtomicReferenceArray<E> bucket = buckets.get(bucketInd); 220 while (true) { 221 E oldV = bucket.get(idx); 222 if (bucket.compareAndSet(idx, oldV, e)) 223 return oldV; 224 } 225 } 226 227 /** 228 * reserve more space. 229 * 230 * @param newSize 231 * new size be reserved 232 */ 233 public void reserve(int newSize) { 234 int size = descriptor.get().size; 235 int pos = size + FIRST_BUCKET_SIZE - 1; 236 int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 237 - Integer.numberOfLeadingZeros(pos); 238 if (i < 1) 239 i = 1; 240 241 int initialSize = buckets.get(i - 1).length(); 242 while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE) 243 - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)) { 244 i++; 245 initialSize *= FIRST_BUCKET_SIZE; 246 buckets.compareAndSet(i, null, new AtomicReferenceArray<E>( 247 initialSize)); 248 } 249 } 250 251 /** 252 * size of vector. 253 * 254 * @return size of vector 255 */ 256 public int size() { 257 return descriptor.get().size; 258 } 259 260 /** 261 * {@inheritDoc} 262 */ 263 @Override 264 public boolean add(E object) { 265 push_back(object); 266 return true; 267 } 268 }