Atomic簡介
Atomic包是java.util.concurrent下的另一個專門為線程安全設計的Java包,包含多個原子操作類這個包里面提供了一組原子變量類。
其基本的特性就是在多線程環境下,當有多個線程同時執行這些類的實例包含的方法時,具有排他性,即當某個線程進入方法,執行其中的指令時,不會被其他線程打斷,而別的線程就像自旋鎖一樣,一直等到該方法執行完成,才由JVM從等待隊列中選擇一個另一個線程進入,這只是一種邏輯上的理解。實際上是借助硬件的相關指令來實現的,不會阻塞線程(或者說只是在硬件級別上阻塞了)。可以對基本數據、數組中的基本數據、對類中的基本數據進行操作。原子變量類相當於一種泛化的volatile變量,能夠支持原子的和有條件的讀-改-寫操作。
傳統鎖的問題
我們先來看一個例子:計數器(Counter),采用Java里比較方便的鎖機制synchronized關鍵字,初步的代碼如下:
class Counter {
private int value;
public synchronized int getValue() {
return value;
}
public synchronized int increment() {
return ++value;
}
public synchronized int decrement() {
return --value;
}
}
其實像這樣的鎖機制,滿足基本的需求是沒有問題的了,但是有的時候我們的需求並非這么簡單,我們需要更有效,更加靈活的機制,synchronized關鍵字是基於阻塞的鎖機制,也就是說當一個線程擁有鎖的時候,訪問同一資源的其它線程需要等待,直到該線程釋放鎖,這里會有些問題:首先,如果被阻塞的線程優先級很高很重要怎么辦?其次,如果獲得鎖的線程一直不釋放鎖怎么辦?(這種情況是非常糟糕的)。還有一種情況,如果有大量的線程來競爭資源,那CPU將會花費大量的時間和資源來處理這些競爭(事實上CPU的主要工作並非這些),同時,還有可能出現一些例如死鎖之類的情況,最后,其實鎖機制是一種比較粗糙,粒度比較大的機制,相對於像計數器這樣的需求有點兒過於笨重,因此,對於這種需求我們期待一種更合適、更高效的線程安全機制,於是CAS誕生了。
傳送門:CAS
Atomic
java.util.concurrent.atomic中的類可以分成4組:
- 標量類(Scalar):AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
- 數組類:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
- 更新器類:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
- 復合變量類:AtomicMarkableReference,AtomicStampedReference
基礎數據型
第一組AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference這四種基本類型用來處理布爾,整數,長整數,對象四種數據,其內部實現不是簡單的使用synchronized,而是一個更為高效的方式CAS (compare and swap) + volatile和native方法,從而避免了synchronized的高開銷,執行效率大為提升。我們來看個例子,與我們平時i++所對應的原子操作為:getAndIncrement()
package com.company.atomics;
/*
* 使用原子變量類定義一個計數器
* 該計數器在整個程序中都能使用,並且在所有的地方都可以使用這個計數器,
* 這個計數器可以設計為單例
* */
import java.util.concurrent.atomic.AtomicLong;
public class Indicator {
//構造方法私有化
private Indicator(){}
//定義一個私有的本類靜態的對象
private static final Indicator INSTANCE=new Indicator();
//提供一個公共靜態方法返回唯一實例
public static Indicator getInstance(){
return INSTANCE;
}
//使用原子變量類保存請求總數,成功數,失敗數
private final AtomicLong requestCount=new AtomicLong(0);
private final AtomicLong successCount=new AtomicLong(0);
private final AtomicLong failureCount=new AtomicLong(0);
//有新的請求
public void requestProcessReceive(){
requestCount.incrementAndGet();
}
//處理成功
public void requestProcessSuccess(){
successCount.incrementAndGet();
}
//處理失敗
public void requestProcessFailur(){
failureCount.incrementAndGet();
}
//查看總數,
public long getRequestCount(){
return requestCount.get();
}
//查看成功數
public long getSuccessCount(){
return successCount.get();
}
//查看失敗數
public long getFailurCount(){
return failureCount.get();
}
}
結果測試:
package com.company.atomics;
import java.util.Random;
public class IndicatorTest {
public static void main(String[] args) {
//通過線程模擬請求
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
//每個線程就是一個請求,請求總數要+1
Indicator.getInstance().requestProcessReceive();
int num=new Random().nextInt();
if (num%2==0){ //處理成功
Indicator.getInstance().requestProcessSuccess();
}else{ //處理失敗
Indicator.getInstance().requestProcessFailur();
}
}
}).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Indicator.getInstance().getRequestCount());//總數
System.out.println(Indicator.getInstance().getSuccessCount());//成功數
System.out.println(Indicator.getInstance().getFailurCount());//失敗數
}
}
常用方法
-
構造函數(兩個構造函數)
- 默認的構造函數:初始化的數據分別是false,0,0,null
- 帶參構造函數:參數為初始化的數據
-
set( )和get( )方法:可以原子地設定和獲取atomic的數據。類似於volatile,保證數據會在主存中設置或讀取
-
void set()和void lazySet():set設置為給定值,直接修改原始值;lazySet延時設置變量值,這個等價於set()方法,但是由於字段是volatile類型的,因此次字段的修改會比普通字段(非volatile字段)有稍微的性能延時(盡管可以忽略),所以如果不是想立即讀取設置的新值,允許在“后台”修改值,那么此方法就很有用。
-
getAndSet( )方法
-
原子的將變量設定為新數據,同時返回先前的舊數據
-
其本質是get( )操作,然后做set( )操作。盡管這2個操作都是atomic,但是他們合並在一起的時候,就不是atomic。在Java的源程序的級別上,如果不依賴synchronized的機制來完成這個工作,是不可能的。只有依靠native方法才可以。
public final int getAndSet(int newValue) { for (;;) { int current = get(); if (compareAndSet(current, newValue)) return current; } }
-
-
compareAndSet( ) 和weakCompareAndSet( )方法
這兩個方法都是conditional modifier方法。接收2個參數,一個是期望數據(expected),一個是新數據(new),如果atomic里面的數據和期望數據一 致,則將新數據設定給atomic的數據,返回true,表明成功;否則就不設定,並返回false。JSR規范中說:以原子方式讀取和有條件地寫入變量但不 創建任何 happen-before 排序,因此不提供與除
weakCompareAndSet目標外任何變量以前或后續讀取或寫入操作有關的任何保證。大意就是說調用weakCompareAndSet時並不能保證不存在happen- before的發生(也就是可能存在指令重排序導致此操作失敗)。但是從Java源碼來看,其實此方法並沒有實現JSR規范的要求,最后效果和 compareAndSet是等效的,都調用了unsafe.compareAndSwapInt()完成操作。public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } public final boolean weakCompareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } -
對於 AtomicInteger、AtomicLong還提供了一些特別的方法。
getAndIncrement( ):以原子方式將當前值加 1,相當於線程安全的i++操作。
incrementAndGet( ):以原子方式將當前值加 1, 相當於線程安全的++i操作。
getAndDecrement( ):以原子方式將當前值減 1, 相當於線程安全的i--操作。
decrementAndGet ( ):以原子方式將當前值減 1,相當於線程安全的--i操作。
addAndGet( ): 以原子方式將給定值與當前值相加, 實際上就是等於線程安全的i =i+delta操作。
getAndAdd( ):以原子方式將給定值與當前值相加, 相當於線程安全的t=i;i+=delta;return t;操作。
以實現一些加法,減法原子操作。(注意 --i、++i不是原子操作,其中包含有3個操作步驟:第一步,讀取i;第二步,加1或減1;第三步:寫回內存)
數組型
AtomicIntegerArray,AtomicLongArray還有AtomicReferenceArray類進一步擴展了原子操作,對這些類型的數組提供了支持。這些類在為其數組元素提供 volatile 訪問語義方面也引人注目,這對於普通數組來說是不受支持的。
他們內部並不是像AtomicInteger一樣維持一個valatile變量,而是全部由native方法實現,如下
AtomicIntegerArray的實現片斷:
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int scale = unsafe.arrayIndexScale(int[].class);
private final int[] array;
public final int get(int i) {
return unsafe.getIntVolatile(array, rawIndex(i));
}
public final void set(int i, int newValue) {
unsafe.putIntVolatile(array, rawIndex(i), newValue);
}
基本使用示例:
public class AtomicIntegerArrayTest {
public static void main(String[] args) {
//創建一個指定長度的原子數組
AtomicIntegerArray atomicIntegerArray=new AtomicIntegerArray(10);
System.out.println(atomicIntegerArray);//[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
//返回指定位置的元素
System.out.println(atomicIntegerArray.get(0));//0
System.out.println(atomicIntegerArray.get(1));//0
//設置指定位置的元素
atomicIntegerArray.set(0,10);
//先獲取指定位置的值在設置
System.out.println(atomicIntegerArray.getAndSet(1,11));//0
//把數組元素加上某個值
System.out.println(atomicIntegerArray.addAndGet(0,22));//32
System.out.println(atomicIntegerArray.getAndAdd(1,33));//11
System.out.println(atomicIntegerArray);//[32, 44, 0, 0, 0, 0, 0, 0, 0, 0]
//CAS操作,如果0位置的值是32,就將其值修改為222
System.out.println(atomicIntegerArray.compareAndSet(0,32,222));//返回ture
//先自增在返回
System.out.println(atomicIntegerArray.incrementAndGet(0));//223
//先返回再自減
System.out.println(atomicIntegerArray.getAndIncrement(1));//44
System.out.println(atomicIntegerArray);//[223, 45, 0, 0, 0, 0, 0, 0, 0, 0]
//先自減再返回
System.out.println(atomicIntegerArray.decrementAndGet(2));//-1
//先返回再自減
System.out.println(atomicIntegerArray.getAndDecrement(3));//0
System.out.println(atomicIntegerArray);//[223, 45, -1, -1, 0, 0, 0, 0, 0, 0]
}
}
import java.util.concurrent.atomic.AtomicIntegerArray;
public class AtomicIntegerArrayTest01 {
//定義一個原子數組
static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);
public static void main(String[] args) {
//定義線程數組
Thread[] threads=new Thread[10];
//給線程數組賦值
for (int i = 0; i < threads.length; i++) {
threads[i]=new addThread();
}
//開啟子線程
for (Thread thread:threads
) {
thread.start();
}
//在主線程中查看自增完后原子數組中各個元素的值,主線程中需要在所有子線程都執行完后查看
//把所有的子線程合並到當前主線程中
for (Thread thread:threads
) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(atomicIntegerArray)
}
//定義一個線程類,在線程類中修改原子數組
static class addThread extends Thread{
@Override
public void run() {
//把數組元素的每個元素自增1000次
for (int i = 0; i < 1000; i++) {
for (int j = 0; j < atomicIntegerArray.length() ; j++) {
atomicIntegerArray.getAndIncrement(i%atomicIntegerArray.length());
}
}
}
}
}
輸出結果:
//10個線程,每個線程自增1000次
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
字段更新器
AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater基於反射的實用工具,可以對指定類的指定 volatile 字段進行原子更新。API非常簡單,但是也是有一些約束:
(1)字段必須是volatile類型的
(2)字段的描述類型(修飾符public/protected/default/private)是與調用者與操作對象字段的關系一致。也就是說 調用者能夠直接操作對象字段,那么就可以反射進行原子操作。但是對於父類的字段,子類是不能直接操作的,盡管子類可以訪問父類的字段。
(3)只能是實例變量,不能是類變量,也就是說不能加static關鍵字。
(4)只能是可修改變量,不能使final變量,因為final的語義就是不可修改。實際上final的語義和volatile是有沖突的,這兩個關鍵字不能同時存在。
(5)對於AtomicIntegerFieldUpdater 和AtomicLongFieldUpdater 只能修改int/long類型的字段,不能修改其包裝類型(Integer/Long)。如果要修改包裝類型就需要使用AtomicReferenceFieldUpdater 。
netty5.0中類ChannelOutboundBuffer統計發送的字節總數,由於使用volatile變量已經不能滿足,所以使用AtomicIntegerFieldUpdater 來實現的
示例:
//定義一個User實體類
public class User {
int id;
volatile int age;
public User(int id, int age) {
this.id = id;
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", age=" + age +
'}';
}
}
//定義一個線程類,實現字段自增
public class AtomicUpdater extends Thread{
private User user;
private AtomicIntegerFieldUpdater<User> updater = AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
public AtomicUpdater(User user) {
this.user = user;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println(updater.getAndIncrement(user));
}
}
}
//測試類
public class Test {
public static void main(String[] args) {
//初始化User對象
User user = new User(1, 10);
//開啟10個線程
for (int i = 0; i < 10; i++) {
new AtomicUpdater(user).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(user);
}
}
輸出結果:
//一個線程自增10次,10個線程自增100次
User{id=1, age=110}
引用型
原子更新基本類型的AtomicInteger,只能更新一個變量,如果要原子更新多個變量,就需要使用這個原子更新引用類型提供的類。Atomic包提供了以下3個類
AtomicReference
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceTest {
static AtomicReference<String> atomicReference=new AtomicReference<>("abc");
public static void main(String[] args) {
//創建十個線程修改字符串
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
if (atomicReference.compareAndSet("abc","def")){
System.out.println(Thread.currentThread().getName()+"把字符串修改為了def");
}
}
}).start();
}
//再創建100個線程
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
if (atomicReference.compareAndSet("def","abc")){
System.out.println(Thread.currentThread().getName()+"把字符串還原為了abc");
}
}
}).start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicReference.get());
}
}
AtomicStampedReference
由於atomicReference是采用了CAS,所以可能會產生ABA問題。利用版本戳的形式記錄了每次改變以后的版本號,這樣的話就不會存在ABA問題了。這就是AtomicStampedReference的解決方案。
public class AtomicStampReferenceTest {
//定義AtomicStampedReference引用操作"abc"字符串,指定初始版本號為0
private static AtomicStampedReference<String> stampedReference=new AtomicStampedReference<>("abc",0);
public static void main(String[] args) {
Thread t1=new Thread(new Runnable() {
@Override
public void run() {
stampedReference.compareAndSet("abc","def",
stampedReference.getStamp(),
stampedReference.getStamp()+1);
System.out.println(Thread.currentThread().getName()+"----"+stampedReference.getReference());
stampedReference.compareAndSet("def","abc",
stampedReference.getStamp(),
stampedReference.getStamp()+1);
}
});
Thread t2=new Thread(new Runnable() {
//如果在此處獲取stamp,則可能無法獲取當前的版本號,在線程睡眠的那一秒中,版本號可能發生了改變
//int stamp=stampedReference.getStamp();//獲取版本號
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
int stamp=stampedReference.getStamp();//獲取版本號
System.out.println(stampedReference.compareAndSet("abc","ggg",stamp,stamp+1));
}
});
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(stampedReference.getReference());
}
}
AtomicMarkableReference
原子更新帶有標記位的引用類型。可以原子更新一個布爾類型的標記位和引用類型。構造方法是AtomicMarkableReference(V initialRef,booleaninitialMark)
