多線程與高並發(六) Lock


之前學習了如何使用synchronized關鍵字來實現同步訪問,Java SE 5之后,並發包中新增了Lock接口(以及相關實現類)用來實現鎖功能,它提供了與synchronized關鍵字類似的同步功能,只是在使用時需要顯式地獲取和釋放鎖。雖然它缺少了(通過synchronized塊或者方法所提供的)隱式獲取釋放鎖的便捷性,但是卻擁有了鎖獲取與釋放的可操作性、可中斷的獲取鎖以及超時獲取鎖等多種synchronized關鍵字所不具備的同步特性。

不同於synchronized是Java語言的關鍵字,是內置特性,Lock不是Java語言內置的,Lock是一個類,通過這個類可以實現同步訪問。而且synchronized同步塊執行完成或者遇到異常是鎖會自動釋放,而lock必須調用unlock()方法釋放鎖,因此在finally塊中釋放鎖。

一、 Lock 接口

先看看lock接口定義了哪些方法:

void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();

這里面lock()、tryLock()、tryLock(long time, TimeUnit unit)和lockInterruptibly()是用來獲取鎖的。這四個方法都是用來獲取鎖的,那有什么區別呢?

lock()方法是平常使用得最多的一個方法,就是用來獲取鎖。如果鎖已被其他線程獲取,則進行等待。

tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他線程獲取),則返回false,也就說這個方法無論如何都會立即返回。在拿不到鎖時不會一直在那等待。

tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,只不過區別在於這個方法在拿不到鎖時會等待一定的時間,在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。

lockInterruptibly()方法,當通過這個方法去獲取鎖時,如果線程正在等待獲取鎖,則這個線程能夠響應中斷,即中斷線程的等待狀態。也就使說,當兩個線程同時通過lock.lockInterruptibly()想獲取某個鎖時,假若此時線程A獲取到了鎖,而線程B只有在等待,那么對線程B調用threadB.interrupt()方法能夠中斷線程B的等待過程。

unLock()方法是用來釋放鎖的,這沒什么特別需要講的。

Condition newCondition() 是用於獲取與lock綁定的等待通知組件,當前線程必須獲得了鎖才能進行等待,進行等待時會先釋放鎖,當再次獲取鎖時才能從等待中返回。

Lock接口里面的方法我們已經知道,接下來實現Lock的類ReentrantLock開始學起,發現ReentrantLock並沒有多少代碼,另外有一個很明顯的特點是:基本上所有的方法的實現實際上都是調用了其靜態內存類Sync中的方法,而Sync類繼承了AbstractQueuedSynchronizer(AQS)。

我們先學AQS相關的知識

二、AQS

AQS(以下簡稱同步器)是用來構建鎖和其他同步組件的基礎框架,它的實現主要依賴一個int成員變量來表示同步狀態,通過內置的FIFO隊列來完成排隊工作。

子類通過繼承並實現它的抽象方法來管理同步狀態,通過使用getState,setState以及compareAndSetState這三個方法對同步狀態進行更改。子類推薦被定義為自定義同步組件的靜態內部類,同步器自身沒有實現任何同步接口,它僅僅是定義了若干同步狀態的獲取和釋放方法來供自定義同步組件的使用,同步器既支持獨占式獲取同步狀態,也可以支持共享式獲取同步狀態,這樣就可以方便的實現不同類型的同步組件。

同步器是實現鎖的關鍵,要實現鎖功能,子類繼承Lock,它定義了使用者與鎖交互的接口,就像上面那幾個接口,但是實現卻是通過同步器,同步器簡化了鎖的實現方式,實現了底層操作,如同步狀態管理,線程的排隊,等待和喚醒,而外面使用者去不用關心這些細節。

2.1 同步器的接口

同步器的設計模式是基於模板方法,也就是說,使用者要繼承同步器並重寫指定的方法,隨后將同步器組合在自定義同步器組合定義在自定義同步組件的實現中,並調用同步器提供的模板方法,而這些模板方法將會調用使用者重寫的方法。總結就是同步器將一些方法開放給子類進行重寫,而同步器給同步組件所提供模板方法又會重新調用被子類所重寫的方法

如在AQS中有此方法:

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

而ReentrantLock中重寫了方法:

那在AQS中的acquire調用了這個方法,這就相當於在父類定義了一套模板,這些模板會調用一些可重寫的方法,這些可重寫的方法具體的實現放在了子類。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

這就是模板方法方法的設計思路,如還有疑惑,可以去學習這種設計模式。

下面就是一些可以被重寫的方法:

 

方法名稱 描述
protected boolean tryAcquire(int arg) 獨占式獲取同步狀態,實現該方法需要查詢當前狀態並判斷同步狀態是否符合預期,然后再進行CAS設置同步狀態
protected boolean tryRelease(int arg) 獨占式釋放同步狀態,等待獲取同步狀態的線程將有機會獲取同步狀態
protected int tryAcquireShared(int arg) 共享式獲取同步狀態,返回大於等於0的值,表示獲取成功,反之,獲取失敗
protected boolean tryReleaseShared(int arg) 共享式釋放同步狀態
protected boolean isHeldExclusively() 當前同步器是否在獨占模式下被線程占用,一般該方法表示是否被當前線程獨占

實現自定義同步組件時,將會調用同步器提供的模板方法,這些(部分)模板方法與描述

方法名稱 描述
void acquire(int arg) 獨占式獲取同步狀態,如果當前線程獲取同步狀態成功,則由該方法返回,否則,將會進入同步隊列等待,該方法將會調用重寫的tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 與acquire(int arg)相同,但是該方法響應中斷,當前線程未獲取到同步狀態而進入同步隊列中,如果當前線程被中斷,則該方法會拋出InterruptedException並返回
boolean tryAcquireNanos(int arg, long nanosTimeout) 在void acquireInterruptibly(int arg)的基礎上增加了超時限制,如果當前線程在超時時間內沒有獲取到同步狀態,那么將會返回false,如果獲取到了返回true
void acquireShared(int arg) 共享式的獲取同步狀態,如果當前線程未獲取到同步狀態,將會進入同步隊列等待,與獨占式獲取的主要區別是在同一時刻可以有多個線程獲取到同步狀態
void acquireSharedInterruptibly(int arg) 與acquireShared(int arg)相同,該方法響應中斷
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 在acquireSharedInterruptibly(int arg)基礎上增加了超時限制
boolean release(int arg) 獨占式的釋放同步狀態,該方法會在釋放同步狀態之后,將同步隊列中第一個節點包含的線程喚醒
boolean releaseShared(int arg) 共享式的釋放同步狀態
Collection<Thread> getQueuedThreads() 獲取等待在同步隊列上的線程集合

同步器提供的模板方法基本上分為3類:

  1. 獨占式獲取與釋放同步狀態

  2. 共享式獲取與釋放同步狀態

  3. 查詢同步隊列中的等待線程情況。

下面看一個例子:

public class Mutex implements Lock {
 private static class Sync extends AbstractQueuedSynchronizer {
    // Reports whether in locked state
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }

    // Acquires the lock if state is zero
    public boolean tryAcquire(int acquires) {
        assert acquires == 1; // Otherwise unused
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    // Releases the lock by setting state to zero
    protected boolean tryRelease(int releases) {
        assert releases == 1; // Otherwise unused
        if (getState() == 0) throw new IllegalMonitorStateException();
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    // Provides a Condition
    Condition newCondition() {
        return new ConditionObject();
    }

    // Deserializes properly
    private void readObject(ObjectInputStream s)
            throws IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}

private final Sync sync = new Sync();

@Override
public void lock() {
    sync.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
    return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
    sync.release(1);
}

@Override
public Condition newCondition() {
    return sync.newCondition();
}
}

這個例子中,獨占鎖Mutex是一個自定義同步組件,它在同一時刻只允許一個線程占有鎖。Mutex中定義了一個靜態內部類,該內部類繼承了同步器並實現了獨占式獲取和釋放同步狀態。在tryAcquire(int acquires)方法中,如果經過CAS設置成功(同步狀態設置為1),則代表獲取了同步狀態,而在tryRelease(int releases)方法中只是將同步狀態重置為0。用戶使用Mutex時並不會直接和內部同步器的實現打交道,而是調用Mutex提供的方法,在Mutex的實現中,以獲取鎖的lock()方法為例,只需要在方法實現中調用同步器的模板方法acquire(int args)即可,當前線程調用該方法獲取同步狀態失敗后會被加入到同步隊列中等待,這樣就大大降低了實現一個可靠自定義同步組件的門檻。

2.2 同步隊列

同步器依賴內部的同步隊列(一個FIFO雙向隊列)來完成同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構造成為一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態。

同步隊列中的節點(Node)用來保存獲取同步狀態失敗的線程引用、等待狀態以及前驅和后繼節點。

volatile int waitStatus //節點狀態
volatile Node prev //當前節點/線程的前驅節點
volatile Node next; //當前節點/線程的后繼節點
volatile Thread thread;//加入同步隊列的線程引用
Node nextWaiter;//等待隊列中的下一個節點

看到節點的數據結構,知道這是一個雙向隊列,而在AQS中還存在兩個成員變量:

private transient volatile Node head;
private transient volatile Node tail;

AQS實際上通過頭尾指針來管理同步隊列,同時實現包括獲取鎖失敗的線程進行入隊,釋放鎖時對同步隊列中的線程進行通知等核心方法。其示意圖如下:

通過對源碼的理解以及做實驗的方式,現在我們可以清楚的知道這樣幾點:

  1. 節點的數據結構,即AQS的靜態內部類Node,節點的等待狀態等信息

  2. 同步隊列是一個雙向隊列,AQS通過持有頭尾指針管理同步隊列

三、 ReentrantLock

重入鎖ReentrantLock,顧名思義,就是支持重進入的鎖,它表示該鎖能夠支持一個線程對資源的重復加鎖。除此之外,該鎖的還支持獲取鎖時的公平和非公平性選擇。如果一個鎖不支持可重入,那當一個線程調用它的lock()方法獲取鎖之后,如果再次調用lock()方法,則該線程將會被自己所阻塞。

synchronized關鍵字隱式的支持重進入,比如一個synchronized修飾的遞歸方法,在方法執行時,執行線程在獲取了鎖之后仍能連續多次地獲得該鎖。ReentrantLock雖然沒能像synchronized關鍵字一樣支持隱式的重進入,但是在調用lock()方法時,已經獲取到鎖的線程,能夠再次調用lock()方法獲取鎖而不被阻塞。

3.1 實現可重入性

重進入是指任意線程在獲取到鎖之后能夠再次獲取該鎖而不會被鎖所阻塞,該特性的實現需要解決以下兩個問題。

  1. 線程再次獲取鎖。鎖需要去識別獲取鎖的線程是否為當前占據鎖的線程,如果是,則再次成功獲取。

  2. 鎖的最終釋放。線程重復n次獲取了鎖,隨后在第n次釋放該鎖后,其他線程能夠獲取到該鎖。鎖的最終釋放要求鎖對於獲取進行計數自增,計數表示當前鎖被重復獲取的次數,而鎖被釋放時,計數自減,當計數等於0時表示鎖已經成功釋放。

ReentrantLock是通過組合自定義同步器來實現鎖的獲取與釋放,以非公平性(默認的)實現為例

核心方法為nonfairTryAcquire:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //1. 如果該鎖未被任何線程占有,該鎖能被當前線程獲取
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //2.若被占有,檢查占有線程是否是當前線程
    else if (current == getExclusiveOwnerThread()) {
        // 3. 再次獲取,計數加一
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

該方法增加了再次獲取同步狀態的處理邏輯:通過判斷當前線程是否為獲取鎖的線程來決定獲取操作是否成功,如果是獲取鎖的線程再次請求,則將同步狀態值進行增加並返回true,表示獲取同步狀態成功。成功獲取鎖的線程再次獲取鎖,只是增加了同步狀態值,這也就要求ReentrantLock在釋放同步狀態時減少同步狀態值。

protected final boolean tryRelease(int releases) {
    //1. 同步狀態減1
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        //2. 只有當同步狀態為0時,鎖成功被釋放,返回true
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 3. 鎖未被完全釋放,返回false
    setState(c);
    return free;
}

如果該鎖被獲取了n次,那么前(n-1)次tryRelease(int releases)方法必須返回false,而只有同步狀態完全釋放了,才能返回true。可以看到,該方法將同步狀態是否為0作為最終釋放的條件,當同步狀態為0時,將占有線程設置為null,並返回true,表示釋放成功。

3.2 公平與非公平獲取鎖的區別

公平鎖非公平鎖何謂公平性,是針對獲取鎖而言的,如果一個鎖是公平的,那么鎖的獲取順序就應該符合請求上的絕對時間順序,滿足FIFO,ReentrantLock的構造方法無參時是構造非公平鎖

public ReentrantLock() {
    sync = new NonfairSync();
}

另外還提供了另外一種方式,可傳入一個boolean值,true時為公平鎖,false時為非公平鎖

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

在上面非公平鎖獲取時(nonfairTryAcquire方法)只是簡單的獲取了一下當前狀態做了一些邏輯處理,並沒有考慮到當前同步隊列中線程等待的情況。我們來看看公平鎖的處理邏輯是怎樣的,核心方法為:

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
  }
}

這段代碼的邏輯與nonfairTryAcquire基本上一直,唯一的不同在於增加了hasQueuedPredecessors的邏輯判斷,方法名就可知道該方法用來判斷當前節點在同步隊列中是否有前驅節點的判斷,如果有前驅節點說明有線程比當前線程更早的請求資源,根據公平性,當前線程請求資源失敗。如果當前節點沒有前驅節點的話,再才有做后面的邏輯判斷的必要性。公平鎖每次都是從同步隊列中的第一個節點獲取到鎖,而非公平性鎖則不一定,有可能剛釋放鎖的線程能再次獲取到鎖

公平鎖 VS 非公平鎖

  1. 公平鎖每次獲取到鎖為同步隊列中的第一個節點,保證請求資源時間上的絕對順序,而非公平鎖有可能剛釋放鎖的線程下次繼續獲取該鎖,則有可能導致其他線程永遠無法獲取到鎖,造成“飢餓”現象

  2. 公平鎖為了保證時間上的絕對順序,需要頻繁的上下文切換,而非公平鎖會降低一定的上下文切換,降低性能開銷。因此,ReentrantLock默認選擇的是非公平鎖,則是為了減少一部分上下文切換,保證了系統更大的吞吐量

四、 ReentrantReadWriteLock

之前學到的鎖都是獨占鎖,這些鎖在同一時刻只允許一個線程進行訪問,而讀寫鎖在同一時刻可以允許多個讀線程訪問,但是在寫線程訪問時,所有的讀線程和其他寫線程均被阻塞。讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,通過分離讀鎖和寫鎖,使得並發性相比一般的排他鎖有了很大提升。

除了保證寫操作對讀操作的可見性以及並發性的提升之外,讀寫鎖能夠簡化讀寫交互場景的編程方式。假設在程序中定義一個共享的用作緩存數據結構,它大部分時間提供讀服務(例如查詢和搜索),而寫操作占有的時間很少,但是寫操作完成之后的更新需要對后續的讀服務可見。

一般情況下,讀寫鎖的性能都會比排它鎖好,因為大多數場景讀是多於寫的。在讀多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的並發性和吞吐量。Java並發包提供讀寫鎖的實現是ReentrantReadWriteLock。

讀寫鎖主要有以下三個特性:

  1. 公平性選擇:支持非公平性(默認)和公平的鎖獲取方式,吞吐量還是非公平優於公平;

  2. 重入性:支持重入,讀鎖獲取后能再次獲取,寫鎖獲取之后能夠再次獲取寫鎖,同時也能夠獲取讀鎖;

  3. 鎖降級:遵循獲取寫鎖,獲取讀鎖再釋放寫鎖的次序,寫鎖能夠降級成為讀鎖

4.1 讀寫鎖的使用

ReadWriteLock僅定義了獲取讀鎖和寫鎖的兩個方法,即readLock()方法和writeLock()方法,而其實現——ReentrantReadWriteLock,除了接口方法之外,還提供了一些便於外界監控其內部工作狀態的方法,主要有:

int getReadLockCount()//返回當前讀鎖被獲取的次數。該次數不等於獲取讀鎖的線程數,如果一個線程連續獲取n次,那么返回的就是n
int getReadHoldCount()//返回當前線程獲取讀鎖的次數
boolean isWriteLocked()//判斷寫鎖是否被獲取
int getWriteHoldCount()//返回當前寫鎖被獲取的次數

讀寫鎖使用:

public class Cache {
    static Map<String, Object> map = new HashMap<>();
    static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    static Lock r = reentrantReadWriteLock.readLock();
    static Lock w = reentrantReadWriteLock.writeLock();
    // 獲取一個key對應的value
    public static final Object get(String key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }
    // 設置key對應的value,並返回舊的value
    public static final Object put(String key, Object value) {
        w.lock();
        try {
            return map.put(key, value);
        } finally {
            w.unlock();
        }
    }
    // 清空所有的內容
    public static final void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }
}

Cache組合一個非線程安全的HashMap作為緩存的實現,同時使用讀寫鎖的讀鎖和寫鎖來保證Cache是線程安全的。在讀操作get(String key)方法中,需要獲取讀鎖,這使得並發訪問該方法時不會被阻塞。寫操作put(String key,Object value)方法和clear()方法,在更新HashMap時必須提前獲取寫鎖,當獲取寫鎖后,其他線程對於讀鎖和寫鎖的獲取均被阻塞,而只有寫鎖被釋放之后,其他讀寫操作才能繼續。Cache使用讀寫鎖提升讀操作的並發性,也保證每次寫操作對所有的讀寫操作的可見性,同時簡化了編程方式。

4.2 實現原理

再分析下讀寫鎖的實現原理,主要的內容包括:讀寫狀態的設計,寫鎖的獲取與釋放,讀鎖的獲取與釋放以及鎖降級。

讀寫狀態的設計

讀寫鎖同樣依賴自定義同步器來實現同步功能,而讀寫狀態就是其同步器的同步狀態。回想ReentrantLock中自定義同步器的實現,同步狀態表示鎖被一個線程重復獲取的次數,而讀寫鎖的自定義同步器需要在同步狀態(一個整型變量)上維護多個讀線程和一個寫線程的狀態,使得該狀態的設計成為讀寫鎖實現的關鍵。

如果在一個整型變量上維護多種狀態,就一定需要“按位切割使用”這個變量,讀寫鎖將變量切分成了兩個部分,高16位表示讀,低16位表示寫,如圖:

寫鎖的獲取與釋放

寫鎖是一個支持重進入的排它鎖。如果當前線程已經獲取了寫鎖,則增加寫狀態。如果當前線程在獲取寫鎖時,讀鎖已經被獲取(讀狀態不為0)或者該線程不是已經獲取寫鎖的線程,則當前線程進入等待狀態:

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    // 1. 獲取寫鎖當前的同步狀態
    int c = getState();
    // 2. 獲取寫鎖獲取的次數
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // 3.1 當讀鎖已被讀線程獲取或者當前線程不是已經獲取寫鎖的線程的話
        // 當前線程獲取寫鎖失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // 3.2 當前線程獲取寫鎖,支持可重復加鎖
        setState(c + acquires);
        return true;
    }
    // 3.3 寫鎖未被任何線程獲取,當前線程可獲取寫鎖
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

寫鎖的釋放與ReentrantLock的釋放過程基本類似,每次釋放均減少寫狀態,當寫狀態為0時表示寫鎖已被釋放,從而等待的讀寫線程能夠繼續訪問讀寫鎖,同時前次寫線程的修改對后續讀寫線程可見。

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //1. 同步狀態減去寫狀態
    int nextc = getState() - releases;
    //2. 當前寫狀態是否為0,為0則釋放寫鎖
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    //3. 不為0則更新同步狀態
    setState(nextc);
    return free;
}

讀鎖的獲取與釋放

讀鎖是一個支持重進入的共享鎖,它能夠被多個線程同時獲取,在沒有其他寫線程訪問(或者寫狀態為0)時,讀鎖總會被成功地獲取,而所做的也只是(線程安全的)增加讀狀態。如果當前線程已經獲取了讀鎖,則增加讀狀態。如果當前線程在獲取讀鎖時,寫鎖已被其他線程獲取,則進入等待狀態。另外由於要增加一些外部功能,比如getReadHoldCount()方法,作用是返回當前線程獲取讀鎖的次數。讀狀態是所有線程獲取讀鎖次數的總和,而每個線程各自獲取讀鎖的次數只能選擇保存在ThreadLocal中,由線程自身維護,這使獲取讀鎖的實現變得復雜。

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    //1. 如果寫鎖已經被獲取並且獲取寫鎖的線程不是當前線程的話,當前
    // 線程獲取讀鎖失敗返回-1
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        //2. 當前線程獲取讀鎖
        compareAndSetState(c, c + SHARED_UNIT)) {
        //3. 下面的代碼主要是新增的一些功能,比如getReadHoldCount()方法
        //返回當前獲取讀鎖的次數
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    //4. 處理在第二步中CAS操作失敗的自旋已經實現重入性
    return fullTryAcquireShared(current);
}

讀鎖的每次釋放(線程安全的,可能有多個讀線程同時釋放讀鎖)均減少讀狀態,減少的 值是(1<<16)。

鎖降級

鎖降級指的是寫鎖降級成為讀鎖。如果當前線程擁有寫鎖,然后將其釋放,最后再獲取讀鎖,這種分段完成的過程不能稱之為鎖降級。鎖降級是指把持住(當前擁有的)寫鎖,再獲取到讀鎖,隨后釋放(先前擁有的)寫鎖的過程。接下來看一個鎖降級的示例。因為數據不常變化,所以多個線程可以並發地進行數據處理,當數據變更后,如果當前線程感知到數據變化,則進行數據的准備工作,同時其他處理線程被阻塞,直到當前線程完成數據的准備工作:

public void processData() {
readLock.lock();
if (!update) {
// 必須先釋放讀鎖
readLock.unlock();
// 鎖降級從寫鎖獲取到開始
writeLock.lock();
try {
if (!update) {
// 准備數據的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 鎖降級完成,寫鎖降級為讀鎖
}
try {
// 使用數據的流程(略)
} finally {
readLock.unlock();
}
}

當數據發生變更后,update變量(布爾類型且volatile修飾)被設置為false,此時所有訪問processData()方法的線程都能夠感知到變化,但只有一個線程能夠獲取到寫鎖,其他線程會被阻塞在讀鎖和寫鎖的lock()方法上。當前線程獲取寫鎖完成數據准備之后,再獲取讀鎖,隨后釋放寫鎖,完成鎖降級。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM