JDK源碼之AQS源碼剖析


     除特別注明外,本站所有文章均為原創,轉載請注明地址   

 

     AbstractQueuedSynchronizer(AQS)是JDK中實現並發編程的核心,平時我們工作中經常用到的ReentrantLock,CountDownLatch等都是基於它來實現的。

     AQS類中維護了一個雙向鏈表(FIFO隊列), 如下圖所示:

    

    隊列中的每個元素都用一個Node表示,我們可以看到,Node類中有幾個靜態常量表示的狀態:

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;

        volatile Node next;
volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }

  此外,AQS中通過一個state的volatile變量表示同步狀態。

  那么AQS是如何通過隊列實現鎖操作的呢?

一.獲取鎖操作

   下面的是AQS中執行獲取鎖的代碼:

public final void acquire(int arg) {
/**通過tryAcquire獲取鎖,如果成功獲取到鎖直接終止(selfInterrupt),否則將當前線程插入隊列
* 這里的Node.EXCLUSIVE表示創建一個獨占模式的節點
*/
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

  然而實際上,AQS中並沒有實現上面的tryAcquire(arg)方法,具體獲取鎖的操作需要由其子類比如ReentrantLock中的Sync實現:

 

protected final boolean tryAcquire(int acquires) {
//取到當前線程
final Thread current = Thread.currentThread();
//獲取到state值(前文提到)
int c = getState();
//state為0標識當前沒有線程占有鎖
//如果隊列中前面沒有元素(因為是公平鎖的原因,非公平鎖中不進行判斷,如果state為0直接獲取到鎖),CAS修改當前值
if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
//標識當前線程成功獲取鎖 setExclusiveOwnerThread(current);
return true; } }
//state不為0,且占有鎖的線程是當前線程(這里涉及到一個可重入鎖的概念)
else if (current == getExclusiveOwnerThread()) {
//增加重入次數
int nextc = c + acquires;
//如果次數值溢出,拋出異常
if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; }
//如果鎖已經被其它線程占用,獲取鎖失敗
return false; }

       上面的代碼注釋中提到了可重入鎖的概念,可重入鎖又叫遞歸鎖,簡單來講就是已經獲取到鎖的線程還可以再次獲取到同一個鎖,我們通常使用的syschronized操作,ReentrantLock都屬於可重入鎖。自旋鎖則不屬於可重入鎖。

 下面我們再看一下如果tryAcquire失敗,AQS是如何處理的:

private Node addWaiter(Node mode) {
//創建一個隊列的Node Node node
= new Node(Thread.currentThread(), mode); //獲取當前隊列尾部 Node pred = tail; if (pred != null) {
//CAS操作嘗試插入Node到等待隊列,這里只嘗試一次 node.prev
= pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } }
//如果添加失敗,enq這里會做自旋操作,知道插入成功。 enq(node);
return node; }
//自旋操作添加元素到隊列尾部
private Node enq(final Node node) { for (;;) {
//獲取尾節點 Node t
= tail;
//如果尾節點為空,說明當前隊列是空,需要初始化隊列
if (t == null) {
//初始化當前隊列 if (compareAndSetHead(new Node())) tail = head; } else {
//通過CAS操作插入Node,設置Node為隊列的尾節點,並返回Node node.prev
= t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
/**
* 如果插入的節點前面是head,嘗試獲取鎖,
*/
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false;
//自旋操作
for (;;) {
//獲取當前插入節點的前置節點
final Node p = node.predecessor();
//前置節點是head,嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
//設置head為當前節點,表示獲取鎖成功 setHead(node); p.next
= null; // help GC failed = false; return interrupted; }
//是否掛起當前線程,如果是,則掛起線程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

    上面的代碼有些復雜,這里解釋一下,之前的addWaiter代碼已經將node加入了等待隊列,所以這里需要讓節點隊列中掛起,等待喚醒。隊列的head節點代表的是當前占有鎖的節點,首先判斷插入的node的前置節點是否是head,如果是,嘗試獲取鎖(tryAcquire),如果獲取成功則將head設置為當前節點;如果獲取失敗需要判斷是否掛起當前線程。

 
/**
* 判斷是否可以掛起當前線程
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//ws為node前置節點的狀態
int ws = pred.waitStatus; if (ws == Node.SIGNAL)        //如果前置節點狀態為SIGNAL,當前節點可以掛起 return true; if (ws > 0) {

//通過循環跳過所有的CANCELLED節點,找到一個正常的節點,將當前節點排在它后面

//GC會將這些CANCELLED節點回收 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {
//將前置節點的狀態修改為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

 

//通過LockSupport掛起線程,等待喚醒
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
}

 

 

二.釋放鎖操作

    有了獲取鎖的基礎,再來看釋放鎖的源碼就比較容易了,下面的代碼執行的是AQS中釋放鎖的操作:

//釋放鎖的操作
public
final boolean release(int arg)
//嘗試釋放鎖,這里tryRelease同樣由子類實現,如果失敗直接返回false
if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

 下面的代碼是嘗試釋放鎖的操作:

 protected final boolean tryRelease(int releases) {
//獲取state值,釋放一定值
int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false;
//如果差是0,表示鎖已經完全釋放
if (c == 0) { free = true;
//下面設置為null表示當前沒有線程占用鎖 setExclusiveOwnerThread(
null); }
//如果c不是0表示鎖還沒有完全釋放,修改state值 setState(c);
return free; }

 釋放鎖后,還需要喚醒隊列中的一個后繼節點:

private void unparkSuccessor(Node node) {
        //將當前節點的狀態修改為0
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //從隊列里找出下一個需要喚醒的節點
//首先是直接后繼 Node s
= node.next;
//如果直接后繼為空或者它的waitStatus大於0(已經放棄獲取鎖了),我們就遍歷整個隊列,
//獲取第一個需要喚醒的節點
if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null)
//將節點喚醒 LockSupport.unpark(s.thread); }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM