鎖機制-AQS和CAS


一、AQS

  1、AQS原理

    AQS:AbstractQuenedSynchronizer抽象的隊列式同步器。是除了java自帶的synchronized關鍵字之外的鎖機制。
    AQS的全稱為(AbstractQueuedSynchronizer),這個類在java.util.concurrent.locks包

  2、AQS的核心思想

    如果被請求的共享資源空閑,則將當前請求資源的線程設置為有效的工作線程,並將共享資源設置為鎖定狀態,如果被請求的共享資源被占用,那么就需要一套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制AQS是用CLH隊列鎖實現的,即將暫時獲取不到鎖的線程加入到隊列中。
    CLH(Craig,Landin,and Hagersten)隊列是一個虛擬的雙向隊列,虛擬的雙向隊列即不存在隊列實例,僅存在節點之間的關聯關系。
AQS是將每一條請求共享資源的線程封裝成一個CLH鎖隊列的一個結點(Node),來實現鎖的分配。

    AQS就是基於CLH隊列,用volatile修飾共享變量state,線程通過CAS去改變狀態符,成功則獲取鎖成功,失敗則進入等待隊列,等待被喚醒。

    注意:AQS是自旋鎖:在等待喚醒的時候,經常會使用自旋(while(!cas()))的方式,不停地嘗試獲取鎖,直到被其他線程獲取成功

    實現了AQS的鎖有:自旋鎖、互斥鎖、讀鎖寫鎖、條件產量、信號量、柵欄都是AQS的衍生物
AQS實現的具體方式如下:

  

  3、AQS底層使用了模板方法模式

  同步器的設計是基於模板方法模式的,如果需要自定義同步器一般的方式是這樣(模板方法模式很經典的一個應用):

    使用者繼承AbstractQueuedSynchronizer並重寫指定的方法。(這些重寫方法很簡單,無非是對於共享資源state的獲取和釋放)

    將AQS組合在自定義同步組件的實現中,並調用其模板方法,而這些模板方法會調用使用者重寫的方法。這和我們以往通過實現接口的方式有很大區別,這是模板方法模式很經典的一個運用。

    自定義同步器在實現的時候只需要實現共享資源state的獲取和釋放方式即可,至於具體線程等待隊列的維護,AQS已經在頂層實現好了。自定義同步器實現的時候主要實現下面幾種方法:  
      (1)isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
      (2)tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
      (3)tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
      (4)tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
      (5)tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。

    ReentrantLock為例,(可重入獨占式鎖):state初始化為0,表示未鎖定狀態,A線程lock()時,會調用tryAcquire()獨占鎖並將state+1.之后其他線程再想tryAcquire的時候就會失敗,直到A線程unlock()到state=0為止,其他線程才有機會獲取該鎖。A釋放鎖之前,自己也是可以重復獲取此鎖(state累加),這就是可重入的概念。

    注意:獲取多少次鎖就要釋放多少次鎖,保證state是能回到零態的。

    以CountDownLatch為例,任務分N個子線程去執行,state就初始化 為N,N個線程並行執行,每個線程執行完之后countDown()一次,state就會CAS減一。當N子線程全部執行完畢,state=0,會unpark()主調用線程,主調用線程就會從await()函數返回,繼續之后的動作。

    一般來說,自定義同步器要么是獨占方法,要么是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現獨占和共享兩種方式,如ReentrantReadWriteLock。

    在acquire() acquireShared()兩種方式下,線程在等待隊列中都是忽略中斷的,acquireInterruptibly()/acquireSharedInterruptibly()是支持響應中斷的。

二、CAS

  1、什么是CAS 

    CAS:Compare and Swap,即比較交換;

    jdk1.5增加了並發包java.util.concurrent.*,其下面的類使用CAS算法實現了區別於synchronized同步鎖的一種樂觀鎖。jdk1.5之前java語言是靠synchronized關鍵字保證           同步的,這是一種獨占鎖,也是悲觀鎖;

  2、CAS算法理解

    2.1 與鎖相比

      使用比較交換會使程序看起來更加復雜一些。但由於其非阻塞性,它對死鎖問題天生免疫,並且,線程間的相互影響也遠遠比基於鎖的方式要小;更為要                     的是,使用無鎖的方式完全沒有鎖競爭帶來的系統開銷,也沒有線程間頻繁調度帶來的開銷;因此,它要比基於鎖的方式擁有更優越的性能;

    2.2 無鎖的好處:

      2.2.1 在高並發情況下,它比有鎖的程序擁有更好的性能;

      2.2.2 它天生就是死鎖免疫的;

    2.3 CAS算法的過程:

      它包含三個參數CAS(V,E,N):V表示更新的變量,E表示預期值,N表示新值;    

      2.3.1 線程訪問時,先會將主內存中的數據同步到線程的工作內存當中
      2.3.2 假設線程A和線程B都有對數據進行更改,那么假如線程A先獲取到執行權限
      2.3.3 線程A先會對比工作內存當中的數據和主內存當中的數據是否一致,如果一致(V==E)則進行更新,不一致則刷新數據,重新循環判斷
      2.3.4 這時更新完畢后,線程B也要進行數據更新,主內存數據和工作內存數據做對比,如果一致則進行更新,不一致則將主內存數據重新更新到工作內存,然                        環再次對比兩個內存中的數據直到一致為止

      

    2.4 CAS操作是抱着樂觀的態度進行的

      它總是認為自己可以成功完成操作;當多個線程同時使用CAS操作一個變量時,只有一個會勝出,並成功更新,其余均會失 敗;失敗的線程不會被掛起,僅是被                    告知失敗,並且允許再次嘗試,當然也允許失敗的線程放棄操作;基於這樣的原理,CAS操作即使沒有鎖,也可以發現其他線程對當前線程的干擾,並進行恰當的處                    理;

    2.5 簡單的說

      CAS需要你額外給出一個期望值,也就是你認為這個變量現在應該是什么樣子的;如果變量不是你想象的那樣,那說明它已經被別人修改過了;你就要重新讀取,                  再次嘗試修改就好了;

    2.6 在硬件層面

      大部分的現代處理器都已經支持原子性的CAS指令;在jdk1.5以后,虛擬機便可以使用這個指令來實現並發操作和並發數據結構,並且,這種操作在虛擬機中可以                  說是無處不在;

  3.CAS缺點

    CAS存在一個很明顯的問題,即ABA問題;

    問題:如果變量V初次讀取的時候是A,並且在准備賦值的時候檢查到它任然是A,那能說明它的值沒有被其他線程修改過了嗎?

    如果在這段時間曾經被改成B,然后有改回A,那CAS操作就會誤任務它從來沒有被修改過。正對這種情況,java並發包提供了一個帶有標記的原子應用類                                    AtomicStampedRefernce,它可以通過變量值的版本來保證CAS的正確性;

  4.原子類

    java中的原子類大致可以分為四個類:

      原子更新基本類型;

      原子更新數組類型;

      原子更新引用類型;

      原子更新屬性類型;

    這些原子類中都是用了無鎖的概念,有的地方直接使用CAS操作的線程安全的類型

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerTest implements Runnable {

    private static Integer count=1;
    private static AtomicInteger atomicInteger=new AtomicInteger();

    @Override
    public void run() {
        while (true){
            int count=getCountAtomic();
            System.out.println(count);
            if (count>=150){
                break;
            }
        }
    }

    public synchronized Integer getCount(){
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return count++;
    }

    public Integer getCountAtomic(){
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return atomicInteger.incrementAndGet();
    }

    public static void main(String[] args){
        AtomicIntegerTest test = new AtomicIntegerTest();
        Thread thread1 = new Thread(test);
        Thread thread2 = new Thread(test);
        thread1.start();
        thread2.start();
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM