ReentrantLock源碼


ReentrantLock源碼

  • JUC 指java.util.concurrent包下,一系列關於並發的類,JUC就是包名的首字母

  • CAS 比較並交換,可以看另一篇文章

  • AQS 指主要利用CAS來實現的輕量級多線程同步機制,並且不會在CPU上出現上下文切換和調度的情況

自定義鎖

如何在自己實現一個鎖?

可以定義一個屬性來判斷當前是否有其線程在運行,如果正在運行那么其他線程需要等待

如何實現? 例如有兩個線程T1和T2,都執行同一段代碼

自定義兩個方法

public void lock();
public void unlock();
public void addI(){
    i++;
}
將上面的addI方法更改為下面的
public void addI(){
    lock();
    i++;
    unlock();
}   

這里忽略程序出錯導致死鎖的情況,正常解鎖需要放在finally代碼塊中

當T1進入代碼,將鎖的改為被持有的狀態

/**
*  0為未持有
*  1為被持有
*/
private volatile int i=0;

public void lock(){
    //CAS修改成功返回true
    while(CAS(i,1)){
        return
    }
}

public void unlock(){
   i=0;
}

上面的偽代碼當T1進入lock方法后,因為是第一個進入的,鎖的狀態還是0,通過cas可以改為1,修改成功返回true,進入循環return到addI方法,執行i++操作,然后進入unLock方法,將狀態改為0,方法結束

假設當T1進入方法將狀態改為1,那么T2進入會一直循環CAS修改,線程一直在自旋不會走下面的代碼,直到鎖的狀態改為0,才會繼續業務代碼

那么我們就實現了一個簡單的鎖,但是這個鎖有什么缺點呢? 沒有獲取到鎖的線程會一直自旋,消耗系統資源,這個是我們不想看到的

在java中還有一個類LockSupport,其中有一個park方法

public static void park() {
    UNSAFE.park(false, 0L);
}

public native void park(boolean var1, long var2);

里面繼續調用UNSAFE類,這個類里的方法是使用C/C++實現,park方法的作用是將當前線程立即休眠,讓出CPU,直到被喚醒,還有一個喚醒的方法

public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

public native void unpark(Object var1);

這個同樣也是其他語言實現,傳入需要被喚醒的線程,那么我們上面的代碼可以改造為

/**
*  0為未持有
*  1為被持有
*/
private volatile int i=0;

//存放等待獲取鎖的線程
private Thread t;

public void lock(){
    //CAS修改成功返回true
    if(CAS(i,1)){
        return
    }
    //將沒有獲取到鎖的線程存放
    t=Thread.currentThread()
    //如果沒有獲取到鎖則進行休眠
    LockSupport.park();
    
}

public void unlock(){
   i=0;
   if(t!=null){ 
       LockSupport.unpark(t);
   }
}

我們修改完后即使沒有獲取到鎖的線程也不會占用CPU的資源,但是如果出現2個以上的線程同時進行操作,那么會出現丟失線程的情況,可以再進行優化,將等待的線程存放到隊列中,就不再演示了,而ReentrantLock就是主要使用CAS,park,自旋來實現的,接下來看ReentrantLock的源碼

ReentrantLock

當初始化一個ReentrantLock使用默認構造時創建的是一個非公平鎖

public ReentrantLock() {
    sync = new NonfairSync();
}

如果想創建一個公平鎖則使用有參構造

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

這篇文章先來看公平鎖的實現

public void service() {
    //創建一個公平鎖
    ReentrantLock reentrantLock = new ReentrantLock(true);
    reentrantLock.lock();
    try {
        System.out.println("==這里有一堆的業務===");
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        reentrantLock.unlock();
    }
}

加鎖

沒有競爭情況

public void lock() {
    sync.lock();
}
調用的sync是一個ReentrantLock的內部抽象類
abstract static class Sync extends AbstractQueuedSynchronizer{
    ......
}

它的公平鎖的實現方法,是FairSync類中的,也是一個內部類,在ReentrantLock中,繼承了Sync類,實現lock方法

static final class FairSync extends Sync {
    final void lock() {
        acquire(1);
    }
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

點進tryAcquire方法

protected final boolean tryAcquire(int acquires) {
    //獲取當前執行的線程
    final Thread current = Thread.currentThread();
    //得到鎖的狀態
    int c = getState();
    //如果鎖狀態為0說明當前鎖沒有被其他線程持有
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

繼續點進hasQueuedPredecessors方法,該方法定義在AbstractQueuedSynchronizer抽象類中的

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

其中tail和head這兩個變量是在AbstractQueuedSynchronizer抽象類中定義的,用來存放等待線程頭和尾部

因為當前線程執行前鎖的狀態是未被持有的,所以還沒有初始化過隊列,那么等待隊列的頭和尾部都為null,return的第一個判斷h!=t為false,后面的&&運算符,所以直接返回

那么回到tryAcquire方法,hasQueuedPredecessors返回false,而前面有一個取反!符號,則繼續執行compareAndSetState(0, acquires)方法,通過cas改變當前鎖的狀態為1,然后執行setExclusiveOwnerThread方法,該方法就是簡單的賦值

protected final void setExclusiveOwnerThread(Thread thread) {
    //當前持有鎖的線程
    exclusiveOwnerThread = thread;
}

繼續返回到acquire方法,為true,取反false,使用了&&阻斷符,則不會執行后面的acquireQueued方法,直接結束lock()方法,執行自定義的業務代碼

tryAcquire方法什么時候走到 else if (current == getExclusiveOwnerThread()) 判斷呢

ReentrantLock的特性之一就是體現在這里-重入鎖

啥叫重入鎖?簡單講就是在加鎖后又加鎖

public void addI(){
    ReentrantLock rLock =new ReentrantLock(true);
    rLock.lock();
    
    //執行業務==
    
    rLock.lock();
    
    //執行業務==
    
    //解鎖最后加鎖的
    rLock.unlock();
    //解鎖最先加鎖的
    rLock.unlock();
}

當線程和該鎖已經持有的線程相同時則會進入這個判斷,將鎖的狀態加1,賦值給state,下面的判斷state小於0可能是判斷溢出的問題,即數值超出int類型最大容量則為負數,一般這種情況很少見吧

存在競爭情況

那么上面是沒有其他線程競爭的情況,如果在T1加鎖后,T2,T3..來嘗試獲取鎖改怎么辦呢?->進等待隊列

這個還是tryAcquire方法的代碼,拿下來方便查看

protected final boolean tryAcquire(int acquires) {
   //獲取當前執行的線程
   final Thread current = Thread.currentThread();
   //得到鎖的狀態
   int c = getState();
   //如果鎖狀態為0說明當前鎖沒有被其他線程持有
   if (c == 0) {
       if (!hasQueuedPredecessors() &&
           compareAndSetState(0, acquires)) {
           setExclusiveOwnerThread(current);
           return true;
       }
   }
   else if (current == getExclusiveOwnerThread()) {
       int nextc = c + acquires;
       if (nextc < 0)
           throw new Error("Maximum lock count exceeded");
       setState(nextc);
       return true;
   }
   return false;
}

如果在T1進行完加鎖后T2來嘗試獲取鎖,因為state狀態不為0,而當前線程和鎖持有的線程又不同,則直接返回false

那么返回acquire方法中

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Node.EXCLUSIVE 返回一個Node節點

取反為true,則執行acquireQueued方法,而acquireQueued方法中有執行了addWaiter方法,先來看addWaiter方法

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;	
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

使用鏈表的形式來存儲阻塞排隊的線程,來看node的內部結構

主要的三個屬性

//存放上一個節點
volatile Node prev;
//存放下一個節點
volatile Node next;
//存放當前等待的線程
volatile Thread thread;
Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
}

當進入這個方法后,首先將AbstractQueuedSynchronizer類中的尾部節點賦值給一個臨時變量,判斷尾部是否為空,假設現在線程為T2,隊列還沒有被初始化,尾部為空,則進入enq方法,繼續點進

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { 
            //CAS設置頭節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            //CAS設置尾巴節點
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

還是將AbstractQueuedSynchronizer類中尾部節點賦值給臨時變量t 然后判斷t是否為空,因為隊列還沒有初始化,所以尾巴節點為空,則使用cas來設置 AbstractQueuedSynchronizer類中的頭節點,之后將設置的頭節點賦值給尾部

當執行完節點的關系如下

這時候有個疑問,怎么沒有設置傳入的Node節點呢?而是設置新new出來的Node,和參數傳入的Node節點沒有一點關系?

注意看上面的代碼for(;;) 死循環,當下次循環的時候t已經不為空了,因為上次循環給加了一個空節點,然后將傳入的Node節點的上一個賦值為t,然后通過CAS獲取AbstractQueuedSynchronizer類中的尾部節點,如果尾部節點還是為t,則更改為傳入的node對象,如果CAS失敗,即在CAS設置前被其他線程對AbstractQueuedSynchronizer類中的尾部節點進行了修改,則進行下一次for循環,直至設置成功,當操作完成后,節點結構如下圖

之后代碼返回到acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

還是一個for死循環,首先獲取上一個節點和AbstractQueuedSynchronizer類中的頭節點進行判斷,如果相同則調用tryAcquire()方法嘗試獲取鎖,因為在初始化隊列過程中可能獲取鎖執行的線程已經執行完了,並且釋放了鎖,所以這里嘗試一下獲取鎖,假設沒有獲取到鎖,則不會進入if (p == head && tryAcquire(arg)) {}代碼塊,繼續下面的判斷,進入shouldParkAfterFailedAcquire()方法,從名稱可以看到[在獲取鎖失敗后應該睡眠]

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

判斷上一個node節點的狀態,將上一個節點的Node.SIGNAL狀態的值為-1,而上面的代碼中並沒有對waitStatus的值進行更改,默認初始化為0,則進入最后的else代碼塊,通過CAS將waitStatus的值改為-1,方法返回false結束,回到acquireQueued方法中,繼續進行for循環,假設還是沒有獲取到鎖,則再次進入shouldParkAfterFailedAcquire方法中,因為上次for循環將waitStatus的值改為了-1,則這次進入了if (ws == Node.SIGNAL)的代碼塊,返回true,返回到 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())判斷中,因為shouldParkAfterFailedAcquire方法返回了true,則繼續執行parkAndCheckInterrupt方法

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

當執行完parkAndCheckInterrupt方法后,T2線程就在這里進行休眠

為什么不開始就把waitStatus設置為-1呢?還要多自旋一次,有一個原因是盡量不使用park,能嘗試獲取到鎖最好

那么假設現在又來一個線程T3

public final void acquire(int arg) {
    //嘗試獲取鎖肯定不會成功,則進入acquireQueued,addWaiter方法
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //這時tail已經是t2節點了
    Node pred = tail;
    //不為空進入
    if (pred != null) {
        //將當前節點上一個節點設置為t2
        node.prev = pred;
        //通過CAS來設置AQS類中的尾節點
        if (compareAndSetTail(pred, node)) {
            //然后設置T2的下一個節點
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

完成操作后節點關系如下

之后繼續執行acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //獲取上一個節點:T2
            final Node p = node.predecessor();
            //T2不是頭節點,則不進入下面的代碼塊
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
           	//之后調用shouldParkAfterFailedAcquire方法
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //同樣的代碼,第一次獲取T3的前一個節點T2,判斷T2的ws值為0,
    //CAS修改后返回,外層循環再次進入這時T2的ws值為-1,返回true,方法結束
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

解鎖

假設現在T1執行unlock方法,T2,T3在隊列中

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

進入tryRelease方法

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        //把當前持有鎖的線程清空
        setExclusiveOwnerThread(null);
    }
    //設置鎖的狀態
    setState(c);
    return free;
}

首先將狀態數值-1,判斷如果當前線程和持有鎖的線程不是同一個則拋出異常,即解鎖的線程和加鎖的不是同一個線程

判斷如果c==0,也就是沒有重入鎖的情況,將free改為true,然后進入setExclusiveOwnerThread方法

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

protected final void setState(int newState) {
    state = newState;
}

方法返回,沒有重入鎖的情況,則free為true,獲取AQS類中的頭節點,假設不為空,ws=-1,則進入unparkSuccessor(h)方法

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

首先獲取頭結點的狀態,小於0進入代碼塊,將頭結點的鎖狀態改為0,獲取下一個節點,那么s就是t2,而t2的ws也是-1,所以直接進入最下面的代碼塊,if(s!=null),unpark(t2)線程

那么回到t2線程休眠的地方

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    //在這里醒來
    return Thread.interrupted();
}

下面的是判斷線程是否被中斷過,native方法,無法看到實現了,那么假設沒有被中斷過則返回false,那么返回上一個方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //返回代碼后繼續執行這里
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

因為parkAndCheckInterrupt方法返回false,所以進不去代碼塊,那么繼續執行for,當執行if (p == head && tryAcquire(arg))時p==head成立,而調用tryAcquire方法嘗試獲取鎖成功,因為t1已經釋放了,那么進入下面的代碼塊

if (p == head && tryAcquire(arg)) {
    setHead(node);
    //設置t2上一個節點,也就是空節點的下一個節點設置為null
    p.next = null; // help GC p節點沒有任何引用指向了,幫助垃圾回收
    failed = false;
    return interrupted;
}

private void setHead(Node node) {
    //將t2節點設置為頭部
    head = node;
    //然后將t2節點的thread設置為null
    node.thread = null;
    //節點的上一個節點設置為null
    node.prev = null;
}

經過上面的操作后節點關系如下

如果這個節點在頭說明它正在執行代碼,而不是排隊,即使初始化時T1沒有進隊列,但是給它添加了一個空node,來代替它正在執行

例如有T2,T3在排隊,T1線程unpark后T2線程執行,上面的代碼也能說明T2會先把當前節點的線程,上下節點都設置為null,而T2線程去執行代碼去了,已經在運行過程中了

看別的博客有一段解釋:比如你去買車票,你如果是第一個這個時候售票員已經在給你服務了,你不算排隊,你后面的才算排隊

注意一點:隊列頭始終為空Node

如何保證公平

情況1

T1執行完unpark后,釋放完鎖,還沒來的及喚醒隊列中的T2,這時T3線程來嘗試獲取到鎖

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t && 
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

這種情況隊列中肯定有節點排隊,如果沒有節點直接獲取到鎖也是公平的,那么有節點排隊h就不等於t,true,&&運算符繼續判斷,h的next節點也不為null,返回false

s.thread != Thread.currentThread() 如果當前來嘗試獲取鎖的對象不是在排隊的第一個(也就是頭結點的下一個節點,頭結點正在運行,不算在排隊的隊列中)也就是其他線程插隊的情況,則返回true,結果就是(true&&(false||true)) 整體返回true,外層代碼取反為false,不會嘗試CAS獲取鎖,則T3去排隊

情況2

T2嘗試獲取鎖時發現T1持有鎖,於是去初始化隊列,在初始化過程中T1執行完釋放鎖,T2執行初始化隊列代碼時間片用完,這時T3來嘗試獲取鎖

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { 
            if (compareAndSetHead(new Node()))<------假設T2初始化隊列執行到這里CPU時間片用完
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

此時節點關系如下

那么回到hasQueuedPredecessors方法,看最后的return

return h != t && 
    ((s = h.next) == null || s.thread != Thread.currentThread());

h頭節點為一個空node,而t為節點為null,不等於true繼續判斷,h頭結點下一個為null,整體返回true,外層代碼取反為false,則去排隊

if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
}

遺留問題

1 初始化隊列以及后面的入隊為什么要設置空的頭節點

2 在parkAndCheckInterrupt()方法中最后調用的Thread.interrupted();一系列方法最后不改變任何東西,不明白它這個的作用,也有說是為了復用lockInterruptibly()方法,但是感覺有點牽強

太笨了看不明白,希望不吝賜教,也可以加qq群一起探討:737698533


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM