Java原子類中CAS的底層實現


Java原子類中CAS的底層實現

從Java到c++到匯編, 深入講解cas的底層原理.

介紹原理前, 先來一個Demo

以AtomicBoolean類為例.先來一個調用cas的demo.

主線程在for語句里cas忙循環, 直到cas操作成功返回true為止.

而新開的一個縣城new Thread 會在4秒后,將flag設置為true, 為了讓主線程能夠設置成功.(因為cas的預期值是true, 而flag被初始化為了false)

現象就是主線程一直在跑for循環. 4秒后, 主線程將會設置成功, 然后輸出時間差, 然后終止for循環.

public class TestAtomicBoolean {
    public static void main(String[] args) {
        AtomicBoolean flag = new AtomicBoolean(false);

        long start = System.currentTimeMillis();

        new Thread(()->{
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            flag.set(true);
        }).start();

        for(;;){
            if(flag.compareAndSet(true,false)){
                System.out.println(System.currentTimeMillis() - start);
                System.out.println("inner loop OK!");
                break;
            }
        }
    }
}

這里只是舉了一個例子, 也許這個例子也不太恰當, 本文只是列出了這個api的調用方法而已, 重點在於介紹compareAndSet()方法的底層原理.

Java級源碼AtomicBoolean.java

發現AtomicBoolean的compareAndSet()調用的是unsafe里的compareAndSwapInt()方法.

Java級源碼Unsafe.java

有的同學可能好奇, 其中的unsafe是怎么來的.

在AtomicBoolean類中的靜態成員變量:

如果還要細究Unsafe.getUnsafe()是怎么實現的話....那么我再貼一份Unsafe類里的getUnsafe的代碼:

首先, 在Unsafe類里, 自己就有了一個自己的實例.(而且是單例的)

然后Unsafe類里的getUnsafe()方法會進行檢查, 最終會return這個單例 theUnsafe.

剛剛跑去取介紹了getUnsafe()方法...接下來繼續講解cas...

剛才說到了AtomicBoolean類里的compareAndSet()方法內部其實調用了Unsafe類里的compareAndSwapInt()方法.

Unsafe類里的compareAndSwapInt源碼如下:

(OpenJDK8的源碼里路徑: openjdk/jdk/src/share/classes/sun/misc/Unsafe.java)

發現這里是一段native方法.說明繼續看源碼的話, 從這里就開始脫離Java語言了....

c++級源碼Unsafe.cpp

本源碼在OpenJDK8里的路徑為: openjdk/hotspot/src/share/vm/prims/unsafe.cpp

(這里臨時跑題一下:   如果說要細究 UNSAFE_ENTRY 是什么的話...UNSAFE_ENTRY 就是 JVM_ENTRY, 而 JVM_ENTRY 在interfaceSupport.hpp里面定義了, jni相關.如果想看的話, 源碼路徑在OpenJDK8中的路徑是這個: 

openjdk/hotspot/src/share/vm/runtime/interfaceSupport.hpp)

回到本文的主題cas....上面截圖的這段代碼, 看后面那一句return, 發現其中的使用到的是Atomic下的cmpxchg()方法.

c++級源碼atomic.cpp

本段源碼對應OpenJDK8的路徑是這個: openjdk/hotspot/src/share/vm/runtime/atomic.cpp

其中的cmpxchg為核心內容. 但是這句代碼根據操作系統和處理器的不同, 使用不同的底層代碼. 

而atomic.inline.hpp里聲明如下:

可見 ...不同不同操作系統, 不同的處理器, 都要走不同的cmpxchg()方法的實現.

咱們接下來以其中的linux操作系統 x86處理器為例 , atomic_linux_x86.inline.hpp

匯編級源碼atomic_linux_x86.inline.hpp

OpenJDK中路徑如下: openjdk/hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp

看到了__asm__, 說明c++要開始內聯匯編了,說明繼續看代碼的話, 將會是匯編語言.

這是一段內聯匯編:

其中 __asm__ volatile 指示了編譯器不要改動優化后面的匯編語句, 如果進行了優化(優化是為了減少訪問內存, 直接通過緩存, 加快取讀速度), 那么就在這段函數的周期內, 某幾個變量就相當於常亮了, 其值可能會與內存中真實的值有差異.


2018.6.7更新: 10天沒更新了, 由於實習結束, 這幾天手頭沒電腦, 而且租房火車回學校等各種問題繁雜...

了解匯編指令cmpxchg

環境

懟代碼之前...先把環境介紹一下...匯編的指令細節方便會有所不同, 比如__asm__  和asm不一定一樣, 又比如 asm(...)  和  asm{...}  的括號問題, 又比如 匯編指令的首操作數和第2操作數的含義是顛倒的, 也就是`mov 操作數1, 操作數2` , 要改寫成`mov  操作數2, 操作數1`...反正很亂..我也不專門搞匯編, 我只能保證我這里使用g++進行編譯能正常運行.(linux下或者mac下用g++編譯是沒問題的, windows就不知道了...)

內聯匯編格式

有些同學可能會對上面突如其來的匯編語句感到迷茫...

先來一下內聯匯編的語法格式:

asm volatile("Instruction List"

: Output

: Input

: Clobber/Modify);

其中的Output和Input大家很好理解, 但是Clobber/Modify是什么意思呢:

(下面關於Clobber/Modify的內容引用自:https://blog.csdn.net/dlh0313/article/details/52172833)
有時候,當你想通知GCC當前內聯匯編語句可能會對某些寄存器或內存進行修改,希望GCC在編譯時能夠將這一點考慮進去;那么你就可以在Clobber/Modify部分聲明這些寄存器或內存

大家可以看看上面圖片中openJDK中的匯編代碼, 里面的Clobber/Modify部分是"cc"和"memory", memory好理解, 就是內存. 那么"cc"是什么呢?

當一個內聯匯編中包含影響標志寄存器eflags的條件,那么也需要在Clobber/Modify部分中使用"cc"來向GCC聲明這一點

內聯匯編的簡單例子

接下來展示一個簡單的c++內聯匯編的程序: a是1000, 通過匯編來給b變量也賦值為1000

#include<iostream>

using namespace std;

int main() {
    int a = 1000, b = 0;

    asm("movl %1,%%eax\n"
        "movl %%eax,%0\n"
    : "=r"(b)
    : "r" (a)
    : "%eax");
    cout << "a := " << a << endl;
    cout << "b := " << b << endl;
    return 0;
}

首先在c++里定義了a=1000, b=0;

然后看asm里的第一行冒號, 這里表示Output    ` : "=r"(b) `, b是第0個參數, 等號(=)表示當前輸出表達式的屬性為只寫, r表示寄存器

然后看asm里的第一行冒號, 這里表示Input        `: "r" (a)`, a是第1個參數, r表示寄存器.

然后看asm里的第一行指令   `movl %1,%%eax\n`     , 將第0個參數的值(變量a)傳入到寄存器eax中

然后看asm里的第二行指令   `movl %%eax,%0\n`     , 將寄存器eax的值傳給第一個參數(變量b)

接下來使用c++的cout進行輸出, 查看是否賦值成功, 結果如下:

再來一個簡單匯編例子

用匯編給a變量賦值為100

#include<iostream>

using namespace std;

int main() {
    int a = 0;

    asm("movl $100,%%eax\n"
        "movl %%eax,%0\n"
    : "=r"(a)
    : /*no input*/
    : "%eax", "memory");
    cout << "a := " << a << endl;
    return 0;
}

(當然如果用的熟練, 就不需要使用兩句mov來實現這個功能. 可以直接 movl $100, %0 ,就可以實現把100賦值給a變量了. 這里只是為了演示使用寄存器, 所以先給寄存器eax存上100, 再把寄存器的值賦值給a, 用寄存器eax做了一個中間的臨時存儲)

程序運行結果如下:

內聯匯編cmpxchg(cmpxchg比對成功)

(有同學不會編譯運行g++下的cpp, 我順便也在這里做一個示范吧.....)

#include<iostream>

using namespace std;

int main() {
    int cpp_eax = 0;
    int cpp_ebx = 0;
    int expect = 2222;
    int target = 8888;
    asm("movl    %2,    %%eax \n"  /*將2222存入到eax寄存器中*/
        "movl    %3,    %%ebx \n"  /*將8888存入到ebx寄存器中*/
        "cmpxchg %%ebx, %2    \n"  /*如果變量expect的值與寄存器eax的值相等(成功), 那么ebx的值就賦給expect*/
        "movl    %%eax, %0    \n"  /*將寄存器eax的值賦值給變量cpp_eax*/
        "movl    %%ebx, %1    \n"  /*將寄存器ebx的值賦值給變量cpp_ebx*/
    :"=r"(cpp_eax), "=r"(cpp_ebx), "+r"(expect)   /*等於號表示可寫, 加號表示可寫可讀*/
    :"r"(target)
    /*cpp_eax是%0, cpp_ebx是%1, expect是%2, target是%3*/
    :"%eax", "%ebx", "memory", "cc");
    cout << "eax := " << cpp_eax << endl;
    cout << "ebx := " << cpp_ebx << endl;
    cout << "expect := " << expect << endl;
    cout << "target := " << target << endl;
}

 運行方式和結果如下:

內聯匯編cmpxchg(cmpxchg比對失敗)

如果cmpxchg指令中的第二個操作數與寄存器eax進行比對, 發現值不一樣的時候, 就會比對失敗, 那么expect的值就會賦值給eax, 而expect的值保持不變.

#include<iostream>

using namespace std;

int main() {
    int cpp_eax = 0;
    int cpp_ebx = 0;
    int expect = 2222;
    int target = 8888;
    asm("movl    $77,   %%eax \n"  /*將字面量77存入到eax寄存器中*/
        "movl    %3,    %%ebx \n"  /*將8888存入到ebx寄存器中*/
        "cmpxchg %%ebx, %2    \n"  /*如果變量expect的值與寄存器eax的值不相等(失敗), 那么expect的值就賦給eax*/
        "movl    %%eax, %0    \n"  /*將寄存器eax的值賦值給變量cpp_eax*/
        "movl    %%ebx, %1    \n"  /*將寄存器ebx的值賦值給變量cpp_ebx*/
    :"=r"(cpp_eax), "=r"(cpp_ebx), "+r"(expect)   /*等於號表示可寫, 加號表示可寫可讀*/
    :"r"(target)
    /*cpp_eax是%0, cpp_ebx是%1, expect是%2, target是%3*/
    :"%eax", "%ebx", "memory", "cc");
    cout << "eax := " << cpp_eax << endl;
    cout << "ebx := " << cpp_ebx << endl;
    cout << "expect := " << expect << endl;
    cout << "target := " << target << endl;
}

 

 cmpxchg指令結論

     1.指令格式: CMPXCHG 操作數1 (8位/16位/32位寄存器),  操作數2(可以是任意寄存器或者內存memory)

 

     2.指令作用: 將累加器AL/AX/EAX(也就是%eax)中的值與第2操作數(目的操作數)比較,如果相等,第首操作數(源操作數)的值裝載到第2操作數,zf置1。如果不等, 第2操作數的值裝載到AL/AX/EAX並將zf清0

 

     3.該指令只能用於486及其后繼機型。

lock前綴

本段的文字理論介紹引用自(http://www.weixianmanbu.com/article/736.html):

 在單處理器系統中是不需要加lock的,因為能夠在單條指令中完成的操作都可以認為是原子操作,中斷只能發生在指令與指令之間。

 在多處理器系統中,由於系統中有多個處理器在獨立的運行,即使在能單條指令中完成的操作也可能受到干擾。

在所有的 X86 CPU 上都具有鎖定一個特定內存地址的能力,當這個特定內存地址被鎖定后,它就可以阻止其他的系統總線讀取或修改這個內存地址。這種能力是通過 LOCK 指令前綴再加上下面的匯編指令來實現的。當使用 LOCK 指令前綴時,它會使 CPU 宣告一個 LOCK# 信號,這樣就能確保在多處理器系統或多線程競爭的環境下互斥地使用這個內存地址。當指令執行完畢,這個鎖定動作也就會消失。

#include<iostream>

using namespace std;

int main() {
    int cpp_eax = 0;
    int cpp_ebx = 0;
    int expect = 2222;
    int target = 8888;

    __asm__ __volatile__("movl    $2222, %%eax \n"  /*將字面量2222存入到eax寄存器中*/
                         "movl    %3   , %%ebx \n"  /*將8888存入到ebx寄存器中*/
                 "lock;" "cmpxchg %%ebx, %2    \n"  /*如果變量expect的值與寄存器eax的值不相等(失敗), 那么expect的值就賦給eax*/
                         "movl    %%eax, %0    \n"  /*將寄存器eax的值賦值給變量cpp_eax*/
                         "movl    %%ebx, %1    \n"  /*將寄存器ebx的值賦值給變量cpp_ebx*/
    :"=r"(cpp_eax), "=r"(cpp_ebx), "=m"(expect)     /*等於號表示可寫, 加號表示可寫可讀*/
    :"r"(target)
    /*cpp_eax是%0, cpp_ebx是%1, expect是%2, target是%3*/
    :"%eax", "%ebx", "memory", "cc");
    cout << "eax := " << cpp_eax << endl;
    cout << "ebx := " << cpp_ebx << endl;
    cout << "expect := " << expect << endl;
    cout << "target := " << target << endl;
}

 結果如下:

注意, 如果加lock前綴的話, 指令的第2操作數必須是存儲在內存中的, 語句格式是這樣`  lock; cmpxchg 操作數1(寄存器), 操作數2(內存) ` .所以, expect作為output的時候是這樣聲明的: "=m"(expect) 

 如果不是操作的內存, 那么會報異常. 操作數2必須是內存...在這里卡了很久


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM