JUC並發編程基石AQS之主流程源碼解析


前言

由於AQS的源碼太過凝練,而且有很多分支比如取消排隊、等待條件等,如果把所有的分支在一篇文章的寫完可能會看懵,所以這篇文章主要是從正常流程先走一遍,重點不在取消排隊等分支,之后會專門寫一篇取消排隊和等待條件的分支邏輯。讀源碼千萬別在每個代碼分支中來回游走,先按一個正常的分支把流程看明白,之后再去重點關注其他分支,各個擊破。我相信看完正常流程,你再去分析其他分支會更加得心應手。本篇將主要方法名都做了目錄索引,查看時可通過目錄快速跳到指定方法的邏輯。

執行流程

AQS的執行流程大體為當線程獲取鎖失敗時,會加入到等待隊列中,在等待隊列中的線程會按照從頭至尾的順序依次再去嘗試獲取鎖執行。

當線程獲取鎖后如果還需要等待特定的條件才能執行,那么線程就加入到條件隊列排隊,當等待的條件到來時再從條件隊列中按照從頭至尾的順序加入到等待隊列中,然后再按照等待隊列的執行流程去獲取鎖。所以AQS最核心的數據結構其實就兩個隊列,等待隊列和條件隊列,然后再加上一個獲取鎖的同步狀態。

AQS數據結構

AQS最核心的數據結構就三個

  • 等待隊列

    源碼中head和tail為等待隊列的頭尾節點,在通過前后指向則構成了等待隊列,為雙向鏈表,學名為CLH隊列。

  • 條件隊列

    ConditionObject中的firstWaiter和lastWaiter為等待隊列的頭尾節點,然后通過next指向構成了條件隊列,是個單向鏈表。

  • 同步狀態

    state為同步狀態,通過CAS操作來實現獲取鎖的操作。

public abstract class AbstractQueuedSynchronizer{
  
  /**
     * 等待隊列的頭節點
     */
    private transient volatile Node head;

    /**
     * 等待隊列的尾節點
     */
    private transient volatile Node tail;
  
    /**
     * 同步狀態
     */
    private volatile int state;
  
    public class ConditionObject implements Condition, java.io.Serializable {

          /** 條件隊列的頭節點 */
          private transient Node firstWaiter;
      
          /** 條件隊列的尾節點 */
          private transient Node lastWaiter;
    }
}

Node節點

兩個隊列中的節點都是通過AQS中內部類Node來實現的。主要字段:

  • waitStatus

    當前節點的狀態,具體看源碼列出的注釋。很重要,之后會在源碼中講解。

  • Node prev

    等待隊列節點指向的前置節點

  • Node next

    待隊列節點指向的后置節點

  • Node nextWaiter

    條件隊列中節點指向的后置節點

  • Thread thread

    當前節點持有的線程

static final class Node {
    /**  */
    static final Node SHARED = new Node();
    /**  */
    static final Node EXCLUSIVE = null;

    /** 標明當前節點線程取消排隊 */
    static final int CANCELLED =  1;
  
    /** 標明該節點的后置節點需要自己去喚醒 */
    static final int SIGNAL    = -1;
  
    /** 標明當前節點在等待某個條件,此時節點在條件隊列中 */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    /**
     * 等待狀態,值對於上面的四個常量
     */
    volatile int waitStatus;

    /**
     * 等待隊列節點指向的前置節點
     */
    volatile Node prev;

    /**
     * 等待隊列節點指向的后置節點
     */
    volatile Node next;

    /**
     * 當前節點持有的線程
     */
    volatile Thread thread;

    /**
     * 條件隊列中節點指向的后置節點
     */
    Node nextWaiter;

加鎖

上面說明的數據結構我們先大致有個印象,現在通過加鎖來一步步說明下具體的流程,上篇文章JUC並發編程基石AQS之結構篇,我們知道了AQS加鎖代碼執行的是acquire方法,那么我們從這個方法說起,從源碼中看出執行流程為:tryAcquire——>addWaiter——>acquireQueued

tryAcquire為自己實現的具體加鎖邏輯,當加鎖失敗時返回false,則會執行addWaiter,將線程加入到等待隊列中,Node.EXCLUSIVE為獨占鎖的模式,即同時只能有一個線程獲取鎖去執行。

例子說明

首先假設有四個線程t0-t4調用tryAcquire獲取鎖,t0線程為天選之子獲取到了鎖,則t1-t4線程接着去執行addWaiter。

acquire

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

addWaiter分支1

addWaiter方法,首先會初始化一個node節點,將當前線程設置到node節點中。然后判斷head和tail節點是否為空,head和tail節點是懶加載的,當AQS初始化時為null,則第一次進來時if (pred != null) 條件不成立,執行enq方法。

例子說明

假如t1和t2線程同時執行到該方法,head節點未初始化則執行enq。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

enq

此時可能多個線程會同時調用enq方法,所以該方法中也使用CAS操作。for (;;)是個死循環,首先會CAS操作初始化head節點,且head節點是個空節點,沒有設置線程。然后第二次循環時通過CAS操作將該節點設置我尾部節點,並將前置節點指向head,之后會跳出循環,返回生成的Node節點到addWaiter,從源碼可以看到addWaiter方法后面沒有邏輯,之后會調用acquireQueued。

例子說明

t1和t2線程同時執行,t1線程上天眷顧CAS成功,則流程為

  • 初始化head

  • t1線程的node節點加入等待隊列

  • t2線程執行,node節點加入等待隊列

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter分支2

現在在來說t3和t4,t3和t4線程這時終於獲取到了cpu的執行權,此時head節點已經初始化,則進入條件中的代碼,其實也是通過CAS操作將節點加入到等待隊列尾部,之后會調用acquireQueued。

例子說明

假如t3線程先CAS成功,之后t4成功,此時的數據結構為

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

acquireQueued

這個方法有兩個邏輯,首先如果該節點的前置節點是head會走第一個if,再次去嘗試獲取鎖???

獲取鎖成功,則將頭節點設置為自己,並返回到acquire方法,此時acquire方法執行完,代表獲取鎖成功,線程可以執行自己的邏輯了。這里有下面幾個注意點

  • p.next = null; // help GC 設置舊的head節點的后置節點為null
  • setHead方法 將t1節點設置為頭節點,因為頭節點是個空節點,所以設置t1線程節點線程為null,設置t1前置節點為null,此時舊的head節點已經沒有任何指向和關聯,可以被gc回收,所以上面那一步會寫個help GC 的注釋。

例子說明

現在t1線程的前置節點為頭結點,如果t1執行tryAcquire成功則結果為

當獲取鎖失敗或者前置節點不是頭節點都會走第二個if邏輯,首先會判斷當前線程是否需要掛起,如果需要則執行線程掛起。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

shouldParkAfterFailedAcquire

判斷線程是否需要掛起,首先需要注意的是這個方法的參數是當前節點的前置節點。當線程需要掛起的時候,它需要把身后事安排明白,掛起后讓誰來把我喚醒。這個方法就主要做這個操作。我們再來看Node節點中的waitStatus狀態,這個狀態有一個Node.SIGNAL=-1,代表了當前節點需要將后置節點喚醒。這個理解可能有點繞。首先我們要理解一點,如果我需要被喚醒,那么我就要設置我們的前置節點的狀態為Node.SIGNAL,這樣當我的前置節點發現waitStatus=Node.SIGNAL時,它才知道,我執行完后需要去喚醒后置節點讓后置節點去執行。所以這個方法是當前節點去設置自己的前置節點的狀態為Node.SIGNAL

waitStatus初始化后是0,

第一次進入該方法,發現自己的前置節點不是Node.SIGNAL,需要先設置為Node.SIGNAL狀態

第二次進入時發現前置節點已經是Node.SIGNAL狀態,那么我就可以安心的掛起了,有人會喚醒我的。

所以這個方法其實是兩個邏輯,先設置前置節點狀態,再判斷是否可以掛起。因為前面acquireQueued方法中for (;😉 也是個循環,所以會重復進入。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt

將自己的前置節點設置為可喚醒的狀態后進入該方法,線程掛起。

例子說明

此時t2-t4線程都執行到了此方法,則t2-t4線程都已經掛起不再執行,並且head-t3節點的waitStatus都為Node.SIGNAL,因為t4沒有后置節點。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

解鎖

release

解鎖方法的入口是AQS的release方法,首先會調用tryRelease方法,這個是AQS實現類自己實現的方法,去CAS改變state狀態,如果解鎖成功,則會進入if里的代碼,獲取head節點,判斷waitStatus!=0,如果等於0代表沒有后置節點需要去喚醒。之后調用unparkSuccessor方法。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

waitStatus>0時,代表為CANCELLED = 1狀態,即線程取消排隊,這個以后會細講。先將頭結點的waitStatus狀態設為初始值0,之后查看后置節點的狀態,如果>0代表后置節點取消了排隊,不需要喚醒。但是當前節點需要去喚醒后續的節點讓后續節點再去執行,所以會從尾結點開始尋找找到離當前線程最近的一個且waitStatus<0的去喚醒。之后會調用LockSupport.unpark(s.thread);取消后續節點的掛起,讓后續節點繼續執行。

unparkSuccessor

private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

例子說明

此時等待隊列的數據,當t0線程執行完成后執行解鎖操作,此時所有等待的線程都沒有取消等待。

則t0線程會喚醒t1線程

如果t1和t3線程取消的排隊時,t0線程會喚醒t2,從后往前找離head最近的一個沒有取消派對的節點

線程執行到parkAndCheckInterrupt方法時被掛起,當被頭節點喚醒后會繼續執行,設置interrupted=true,表示被中斷,會繼續執行for循環邏輯,到現在一個正常的獲取鎖失敗——>加入等待隊列——>掛起——>被喚醒繼續執行的流程已經整體走了一遍。

本篇文章都是自己根據源碼寫出的閱讀心得,可能有的地方沒有揣摩到Doug Lea大神的意圖,如果有理解不對的地方歡迎一起探討。

如有不實,還望指正


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM