Java多線程之Atomic:原子變量與原子類


Atomic簡介

​ Atomic包是java.util.concurrent下的另一個專門為線程安全設計的Java包,包含多個原子操作類這個包里面提供了一組原子變量類。

​ 其基本的特性就是在多線程環境下,當有多個線程同時執行這些類的實例包含的方法時,具有排他性,即當某個線程進入方法,執行其中的指令時,不會被其他線程打斷,而別的線程就像自旋鎖一樣,一直等到該方法執行完成,才由JVM從等待隊列中選擇一個另一個線程進入,這只是一種邏輯上的理解。實際上是借助硬件的相關指令來實現的,不會阻塞線程(或者說只是在硬件級別上阻塞了)。可以對基本數據、數組中的基本數據、對類中的基本數據進行操作。原子變量類相當於一種泛化的volatile變量,能夠支持原子的和有條件的讀-改-寫操作。

傳統鎖的問題

我們先來看一個例子:計數器(Counter),采用Java里比較方便的鎖機制synchronized關鍵字,初步的代碼如下:


class Counter {
		
	private int value;
 
	public synchronized int getValue() {
		return value;
	}
 
	public synchronized int increment() {
		return ++value;
	}
 
	public synchronized int decrement() {
		return --value;
	}
}

其實像這樣的鎖機制,滿足基本的需求是沒有問題的了,但是有的時候我們的需求並非這么簡單,我們需要更有效,更加靈活的機制,synchronized關鍵字是基於阻塞的鎖機制,也就是說當一個線程擁有鎖的時候,訪問同一資源的其它線程需要等待,直到該線程釋放鎖,這里會有些問題:首先,如果被阻塞的線程優先級很高很重要怎么辦?其次,如果獲得鎖的線程一直不釋放鎖怎么辦?(這種情況是非常糟糕的)。還有一種情況,如果有大量的線程來競爭資源,那CPU將會花費大量的時間和資源來處理這些競爭(事實上CPU的主要工作並非這些),同時,還有可能出現一些例如死鎖之類的情況,最后,其實鎖機制是一種比較粗糙,粒度比較大的機制,相對於像計數器這樣的需求有點兒過於笨重,因此,對於這種需求我們期待一種更合適、更高效的線程安全機制,於是CAS誕生了。

傳送門:CAS

Atomic

java.util.concurrent.atomic中的類可以分成4組:

  • 標量類(Scalar):AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
  • 數組類:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
  • 更新器類:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
  • 復合變量類:AtomicMarkableReference,AtomicStampedReference

基礎數據型

第一組AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference這四種基本類型用來處理布爾,整數,長整數,對象四種數據,其內部實現不是簡單的使用synchronized,而是一個更為高效的方式CAS (compare and swap) + volatile和native方法,從而避免了synchronized的高開銷,執行效率大為提升。我們來看個例子,與我們平時i++所對應的原子操作為:getAndIncrement()

package com.company.atomics;

/*
* 使用原子變量類定義一個計數器
* 該計數器在整個程序中都能使用,並且在所有的地方都可以使用這個計數器,
* 這個計數器可以設計為單例
* */

import java.util.concurrent.atomic.AtomicLong;

public class Indicator {
    //構造方法私有化
    private Indicator(){}
    //定義一個私有的本類靜態的對象
    private static final Indicator INSTANCE=new Indicator();
    //提供一個公共靜態方法返回唯一實例
    public static Indicator getInstance(){
        return INSTANCE;
    }
    //使用原子變量類保存請求總數,成功數,失敗數
    private final AtomicLong requestCount=new AtomicLong(0);
    private final AtomicLong successCount=new AtomicLong(0);
    private final AtomicLong failureCount=new AtomicLong(0);

    //有新的請求
    public void requestProcessReceive(){
        requestCount.incrementAndGet();
    }
    //處理成功
    public void requestProcessSuccess(){
        successCount.incrementAndGet();
    }
    //處理失敗
    public void requestProcessFailur(){
        failureCount.incrementAndGet();
    }
    //查看總數,
    public long getRequestCount(){
        return requestCount.get();
    }
    //查看成功數
    public long getSuccessCount(){
        return successCount.get();
    }
    //查看失敗數
    public long getFailurCount(){
        return failureCount.get();
    }
}

結果測試:

package com.company.atomics;
import java.util.Random;
public class IndicatorTest {
    public static void main(String[] args) {
        //通過線程模擬請求
        for (int i = 0; i < 10000; i++) {
         new Thread(new Runnable() {
             @Override
             public void run() {
                 //每個線程就是一個請求,請求總數要+1
                 Indicator.getInstance().requestProcessReceive();
                 int num=new Random().nextInt();
                 if (num%2==0){  //處理成功
                     Indicator.getInstance().requestProcessSuccess();
                 }else{   //處理失敗
                     Indicator.getInstance().requestProcessFailur();
                 }
             }
         }).start();
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Indicator.getInstance().getRequestCount());//總數
        System.out.println(Indicator.getInstance().getSuccessCount());//成功數
        System.out.println(Indicator.getInstance().getFailurCount());//失敗數

    }
}

常用方法

  • 構造函數(兩個構造函數)

    • 默認的構造函數:初始化的數據分別是false,0,0,null
    • 帶參構造函數:參數為初始化的數據
  • set( )和get( )方法:可以原子地設定和獲取atomic的數據。類似於volatile,保證數據會在主存中設置或讀取

  • void set()和void lazySet():set設置為給定值,直接修改原始值;lazySet延時設置變量值,這個等價於set()方法,但是由於字段是volatile類型的,因此次字段的修改會比普通字段(非volatile字段)有稍微的性能延時(盡管可以忽略),所以如果不是想立即讀取設置的新值,允許在“后台”修改值,那么此方法就很有用。

  • getAndSet( )方法

    • 原子的將變量設定為新數據,同時返回先前的舊數據

    • 其本質是get( )操作,然后做set( )操作。盡管這2個操作都是atomic,但是他們合並在一起的時候,就不是atomic。在Java的源程序的級別上,如果不依賴synchronized的機制來完成這個工作,是不可能的。只有依靠native方法才可以。

      public final int getAndSet(int newValue) {  
          for (;;) {  
              int current = get();  
              if (compareAndSet(current, newValue))  
                  return current;  
          }  
      }  
      
  • compareAndSet( ) 和weakCompareAndSet( )方法

    這兩個方法都是conditional modifier方法。接收2個參數,一個是期望數據(expected),一個是新數據(new),如果atomic里面的數據和期望數據一 致,則將新數據設定給atomic的數據,返回true,表明成功;否則就不設定,並返回false。JSR規范中說:以原子方式讀取和有條件地寫入變量但不 創建任何 happen-before 排序,因此不提供與除 weakCompareAndSet 目標外任何變量以前或后續讀取或寫入操作有關的任何保證。大意就是說調用weakCompareAndSet時並不能保證不存在happen- before的發生(也就是可能存在指令重排序導致此操作失敗)。但是從Java源碼來看,其實此方法並沒有實現JSR規范的要求,最后效果和 compareAndSet是等效的,都調用了unsafe.compareAndSwapInt()完成操作。

    public final boolean compareAndSet(int expect, int update) {  
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);  
    }  
    public final boolean weakCompareAndSet(int expect, int update) {  
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);  
    }  
    
  • 對於 AtomicInteger、AtomicLong還提供了一些特別的方法。
    getAndIncrement( ):以原子方式將當前值加 1,相當於線程安全的i++操作。
    incrementAndGet( ):以原子方式將當前值加 1, 相當於線程安全的++i操作。
    getAndDecrement( ):以原子方式將當前值減 1, 相當於線程安全的i--操作。
    decrementAndGet ( ):以原子方式將當前值減 1,相當於線程安全的--i操作。
    addAndGet( ): 以原子方式將給定值與當前值相加, 實際上就是等於線程安全的i =i+delta操作。
    getAndAdd( ):以原子方式將給定值與當前值相加, 相當於線程安全的t=i;i+=delta;return t;操作。
    以實現一些加法,減法原子操作。(注意 --i、++i不是原子操作,其中包含有3個操作步驟:第一步,讀取i;第二步,加1或減1;第三步:寫回內存)

數組型

AtomicIntegerArray,AtomicLongArray還有AtomicReferenceArray類進一步擴展了原子操作,對這些類型的數組提供了支持。這些類在為其數組元素提供 volatile 訪問語義方面也引人注目,這對於普通數組來說是不受支持的。

他們內部並不是像AtomicInteger一樣維持一個valatile變量,而是全部由native方法實現,如下
AtomicIntegerArray的實現片斷:

private static final Unsafe unsafe = Unsafe.getUnsafe();  
private static final int base = unsafe.arrayBaseOffset(int[].class);  
private static final int scale = unsafe.arrayIndexScale(int[].class);  
private final int[] array;  
public final int get(int i) {  
        return unsafe.getIntVolatile(array, rawIndex(i));  
}  
public final void set(int i, int newValue) {  
        unsafe.putIntVolatile(array, rawIndex(i), newValue);  
}  

基本使用示例:

public class AtomicIntegerArrayTest {
    public static void main(String[] args) {
        //創建一個指定長度的原子數組
        AtomicIntegerArray atomicIntegerArray=new AtomicIntegerArray(10);
        System.out.println(atomicIntegerArray);//[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
        //返回指定位置的元素
        System.out.println(atomicIntegerArray.get(0));//0
        System.out.println(atomicIntegerArray.get(1));//0
        //設置指定位置的元素
        atomicIntegerArray.set(0,10);
        //先獲取指定位置的值在設置
        System.out.println(atomicIntegerArray.getAndSet(1,11));//0
        //把數組元素加上某個值
        System.out.println(atomicIntegerArray.addAndGet(0,22));//32
        System.out.println(atomicIntegerArray.getAndAdd(1,33));//11
        System.out.println(atomicIntegerArray);//[32, 44, 0, 0, 0, 0, 0, 0, 0, 0]

        //CAS操作,如果0位置的值是32,就將其值修改為222
        System.out.println(atomicIntegerArray.compareAndSet(0,32,222));//返回ture
        //先自增在返回
        System.out.println(atomicIntegerArray.incrementAndGet(0));//223
        //先返回再自減
        System.out.println(atomicIntegerArray.getAndIncrement(1));//44
        System.out.println(atomicIntegerArray);//[223, 45, 0, 0, 0, 0, 0, 0, 0, 0]
        //先自減再返回
        System.out.println(atomicIntegerArray.decrementAndGet(2));//-1
        //先返回再自減
        System.out.println(atomicIntegerArray.getAndDecrement(3));//0
        System.out.println(atomicIntegerArray);//[223, 45, -1, -1, 0, 0, 0, 0, 0, 0]

    }
}

import java.util.concurrent.atomic.AtomicIntegerArray;

public class AtomicIntegerArrayTest01 {
    //定義一個原子數組
    static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(10);

    public static void main(String[] args) {
        //定義線程數組
        Thread[] threads=new Thread[10];
        //給線程數組賦值
        for (int i = 0; i < threads.length; i++) {
            threads[i]=new addThread();
        }
        //開啟子線程
        for (Thread thread:threads
             ) {
            thread.start();
        }

        //在主線程中查看自增完后原子數組中各個元素的值,主線程中需要在所有子線程都執行完后查看
        //把所有的子線程合並到當前主線程中
        for (Thread thread:threads
             ) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
         System.out.println(atomicIntegerArray)
    }

    //定義一個線程類,在線程類中修改原子數組
    static class addThread extends Thread{
        @Override
        public void run() {
            //把數組元素的每個元素自增1000次
            for (int i = 0; i < 1000; i++) {
                for (int j = 0; j < atomicIntegerArray.length() ; j++) {
                    atomicIntegerArray.getAndIncrement(i%atomicIntegerArray.length());
                }
            }
        }
    }
}

輸出結果:

//10個線程,每個線程自增1000次
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

字段更新器

AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater基於反射的實用工具,可以對指定類的指定 volatile 字段進行原子更新。API非常簡單,但是也是有一些約束:

(1)字段必須是volatile類型的

(2)字段的描述類型(修飾符public/protected/default/private)是與調用者與操作對象字段的關系一致。也就是說 調用者能夠直接操作對象字段,那么就可以反射進行原子操作。但是對於父類的字段,子類是不能直接操作的,盡管子類可以訪問父類的字段。

(3)只能是實例變量,不能是類變量,也就是說不能加static關鍵字。

(4)只能是可修改變量,不能使final變量,因為final的語義就是不可修改。實際上final的語義和volatile是有沖突的,這兩個關鍵字不能同時存在。

(5)對於AtomicIntegerFieldUpdaterAtomicLongFieldUpdater 只能修改int/long類型的字段,不能修改其包裝類型(Integer/Long)。如果要修改包裝類型就需要使用AtomicReferenceFieldUpdater

netty5.0中類ChannelOutboundBuffer統計發送的字節總數,由於使用volatile變量已經不能滿足,所以使用AtomicIntegerFieldUpdater 來實現的

示例:

//定義一個User實體類
public class User {
     int id;

     volatile int age;

    public User(int id, int age) {
        this.id = id;
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", age=" + age +
                '}';
    }
}

//定義一個線程類,實現字段自增
public class AtomicUpdater extends Thread{
    private User user;

    private AtomicIntegerFieldUpdater<User> updater = AtomicIntegerFieldUpdater.newUpdater(User.class,"age");

    public AtomicUpdater(User user) {
        this.user = user;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println(updater.getAndIncrement(user));
        }
    }
}

//測試類
public class Test {
    public static void main(String[] args) {
        //初始化User對象
        User user = new User(1, 10);
        //開啟10個線程
        for (int i = 0; i < 10; i++) {
            new AtomicUpdater(user).start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(user);
    }
}

輸出結果:

//一個線程自增10次,10個線程自增100次
User{id=1, age=110}

引用型

原子更新基本類型的AtomicInteger,只能更新一個變量,如果要原子更新多個變量,就需要使用這個原子更新引用類型提供的類。Atomic包提供了以下3個類

AtomicReference

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {
    static AtomicReference<String> atomicReference=new AtomicReference<>("abc");

    public static void main(String[] args) {
        //創建十個線程修改字符串
        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (atomicReference.compareAndSet("abc","def")){
                        System.out.println(Thread.currentThread().getName()+"把字符串修改為了def");
                    }
                }
            }).start();
        }
        //再創建100個線程
        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (atomicReference.compareAndSet("def","abc")){
                        System.out.println(Thread.currentThread().getName()+"把字符串還原為了abc");
                    }
                }
            }).start();
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(atomicReference.get());
    }

}

AtomicStampedReference

由於atomicReference是采用了CAS,所以可能會產生ABA問題。利用版本戳的形式記錄了每次改變以后的版本號,這樣的話就不會存在ABA問題了。這就是AtomicStampedReference的解決方案。

public class AtomicStampReferenceTest {

    //定義AtomicStampedReference引用操作"abc"字符串,指定初始版本號為0
    private static AtomicStampedReference<String> stampedReference=new AtomicStampedReference<>("abc",0);

    public static void main(String[] args) {
        Thread t1=new Thread(new Runnable() {
            @Override
            public void run() {
   stampedReference.compareAndSet("abc","def",
                                  stampedReference.getStamp(),
                                  stampedReference.getStamp()+1);
                
System.out.println(Thread.currentThread().getName()+"----"+stampedReference.getReference());
                
                stampedReference.compareAndSet("def","abc",
                                               stampedReference.getStamp(),
                                               stampedReference.getStamp()+1);
            }
        });
        Thread t2=new Thread(new Runnable() {
            //如果在此處獲取stamp,則可能無法獲取當前的版本號,在線程睡眠的那一秒中,版本號可能發生了改變
            //int stamp=stampedReference.getStamp();//獲取版本號
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                int stamp=stampedReference.getStamp();//獲取版本號
                System.out.println(stampedReference.compareAndSet("abc","ggg",stamp,stamp+1));
            }
        });
        t1.start();
        t2.start();
        try {
            t1.join();
            t2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(stampedReference.getReference());

    }
}

AtomicMarkableReference

原子更新帶有標記位的引用類型。可以原子更新一個布爾類型的標記位和引用類型。構造方法是AtomicMarkableReference(V initialRef,booleaninitialMark)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM