AQS 詳解之共享鎖模式


概括

 

AQS框架數據結構是一個先進先出的雙向隊列,當多個線程進行競爭資源時,那些競爭失敗的線程會加入到隊列中。他向上層提供了很多接口,其中一個是acquireShared獲取共享模式的接口。本文將會根據這個接口一步步分析,獲取資源失敗的線程是怎么進入到隊列中的,進入到隊列中又是怎么出隊列再次競爭資源的,下面是acquireShared執行的一個大致流程:

  1. 多個線程通過調用tryAcquireShared方法獲取共享資源,返回值大於等於0則獲取資源成功,返回值小於0則獲取失敗。

  2. 當前線程獲取共享資源失敗后,通過調用addWaiter方法把該線程封裝為Node節點,並設置該節點為共享模式。然后把該節點添加到隊列的尾部。

  3. 添加到尾部后,判斷該節點的上一個節點是不是隊列的頭節點,如果是頭節點,那么該節點的上一個節點出隊列並獲取共享資源,同時調用setHeadAndPropagate方法把該節點設置為新的頭節點,同時喚醒隊列中所有共享類型的節點,去獲取共享資源。如果獲取失敗,則再次加入到隊列中。

  4. 如果該節點的前驅節點不是頭節點,那么通過for循環進行自旋轉等待,直到當前節點的前驅節點是頭節點,結束自旋。

這就是AQS共享模式競爭資源失敗的大致流程,這里先讓大家有一個大致的印象,下面通過源碼具體分析是怎么進行操作的。

 

AQS共享鎖模式

 

AQS獲取共享鎖是通過調用acquireShared() 這個頂層方法,我們看一下這個方法的源代碼:

public final void acquireShared(int arg) {
   if (tryAcquireShared(arg) < 0)
       doAcquireShared(arg);
}

這個方法中有一個if判斷,當tryAcquireShared()這個返回值是小於0的時候獲取鎖失敗,進入doAcquireShared()方法。tryAcquireShared方法是用來獲取共享模式下的鎖,對於tryAcquireShared()這個方法我們重點看一下他的返回值。jdk1.8中是這樣寫的

* @return a negative value on failure; zero if acquisition in shared
*         mode succeeded but no subsequent shared-mode acquire can
*         succeed; and a positive value if acquisition in shared
*         mode succeeded and subsequent shared-mode acquires might
*         also succeed, in which case a subsequent waiting thread
*         must check availability. (Support for three different
*         return values enables this method to be used in contexts
*         where acquires only sometimes act exclusively.)  Upon
*         success, this object has been acquired.

當失敗的時候返回的是負值,如果返回的是0表示獲取共享模式成功但是它下一個節點的共享模式無法獲取成功。如果返回的是正數也就是大於0,表示當前線程獲取共享模式成功,並且它后面的線程也可以獲取共享模式。

當共享模式獲取失敗的時候,我們看一下doAcquireShared源代碼做了哪些操作

private void doAcquireShared(int arg) {
   final Node node = addWaiter(Node.SHARED);
   boolean failed = true;
   try {
       boolean interrupted = false;
       for (;;) {
           final Node p = node.predecessor();
           if (p == head) {
               int r = tryAcquireShared(arg);
               if (r >= 0) {
                   setHeadAndPropagate(node, r);
                   p.next = null; // help GC
                   if (interrupted)
                       selfInterrupt();
                   failed = false;
                   return;
              }
          }
           if (shouldParkAfterFailedAcquire(p, node) &&
               parkAndCheckInterrupt())
               interrupted = true;
      }
  } finally {
       if (failed)
           cancelAcquire(node);
  }
}

 

首先調用addWaiter()方法,它主要是封裝為Node節點,並且把該節點添加到隊列的尾部。此處傳入共享模式的參數,節點就變成了共享模式。

當前線程添加到隊列后,然后通過自旋(for(;;))獲取前驅節點,如果前驅節點是頭節點,那么調用tryAcquireShared()方法獲取當前節點的狀態,注意此方法的返回值在上面已經介紹過,等於0表示不用喚醒后繼節點,只有大於0才會喚醒后面的所有節點。

如果獲取共享資源成功,調用setHeadAndPropagate方法設置當前節點為頭節點,並讓原來的頭節點出隊列。如果在獲取鎖自旋的過程中中斷過,那么將當前線程中斷。

如果當前節點的前驅節點不是頭節點,通過shouldParkAfterFailedAcquire判斷當前線程的狀態,如果線程阻塞返回true,否則返回false. parkAndCheckInterrupt方法是指當前線程在獲取鎖的過程中是否被中斷喚醒,如果當前線程狀態阻塞並且被中斷過那么就把標志為interrupted更新為true。

如果發生異常調用cancelAcquire方法,此方法是把當前節點先更新為取消狀態,並清除該節點。

setHeadAndPropagate我們看一下這個方法的源代碼

private void setHeadAndPropagate(Node node, int propagate) {
   Node h = head; // Record old head for check below
   setHead(node);//設置當前節點為頭節點
   if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {//符合狀態的將全部喚醒
       Node s = node.next;
       if (s == null || s.isShared())
           doReleaseShared();
  }
}

此方法傳遞了2個參數,一個是當前節點,一個是tryAcquireShared方法的返回值。從源代碼中我們看到它首先記錄了當前頭節點,然后它通過setHead()方法把當前獲取到鎖的節點設置為頭節點。通過if語句把符合條件的繼續喚醒后繼節點,如果下一個節點為空那么調用doReleaseShared方法,doReleaseShared方法繼續喚醒后面的節點。此方法會在共享鎖釋放詳細講解。

 

共享鎖釋放

我們來看一下releaseShared的源代碼,此方法是共享模式釋放資源的頂層方法。

public final boolean releaseShared(int arg) {
   if (tryReleaseShared(arg)) {//
       doReleaseShared();
       return true;
  }
   return false;
}

tryReleaseShared方法獲取共享模式資源釋放,如果釋放成功那么會調用doReleaseShared繼續喚醒下一個節點.

我們繼續看一下具體的喚醒操作doReleaseShared() 這個方法

private void doReleaseShared() {
   for (;;) {
       Node h = head;
       if (h != null && h != tail) {
           int ws = h.waitStatus;
           if (ws == Node.SIGNAL) {
               if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                   continue;            // loop to recheck cases
               unparkSuccessor(h);
          }
           else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
               continue;                // loop on failed CAS
      }
       if (h == head)                   // loop if head changed
           break;
  }
}

通過源代碼我們發現,當前線程狀態如果是Node.SIGNAL,Node.SIGNAL的值是-1,是一個靜態常量,此值表示當前線程被掛起。如果當前線程被掛起,那么更新當前線程的狀態值為0.如果更新失敗那么就繼續。更新成功后調用unparkSuccessor()此方法是喚醒共享鎖的第一個節點。如果本身頭節點屬於重置狀態waitStatus==0,並且把它設置為傳播狀態那么就向下一個節點傳播。

我們再看一下unparkSuccessor這個方法的源碼

private void unparkSuccessor(Node node) {

   int ws = node.waitStatus;
   if (ws < 0)
       compareAndSetWaitStatus(node, ws, 0);

   Node s = node.next;
   if (s == null || s.waitStatus > 0) {
       s = null;
       for (Node t = tail; t != null && t != node; t = t.prev)
           if (t.waitStatus <= 0)
               s = t;
  }
   if (s != null)
       LockSupport.unpark(s.thread);
}

從這個方法中我們發現如果當先線程的狀態是小於0,那么就把當前線程重置為0.為什么是小於0呢,上篇文章已經講過,waitStatus<0為等待或掛起狀態。也就是如果當前線程是等待掛起狀態,那么把當前線程狀態重置為0。然后找到下一個節點,如果下一個節點是空或下一個線程已經被取消,那么就從頭部找下一個沒有被取消的節點。當下一個節點不為空的時候,調用LockSupport.unpark方法喚醒當前線程。LockSupport.unpark會調用Unsafe這個類調用native方法進行執行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM