AQS與重入鎖ReetrantLock原理


一、AQS原理

AQS(AbstractQueuedSynchronizer)隊列同步器是用來構建鎖、同步組件的基礎框架。

AQS內部通過一個volatile int類型的成員變量state控制同步狀態【0代表鎖未被占用,1表示已占用】,通過內部類Node構成FIFO的同步隊列實現等待獲取鎖的線程排隊工作,通過內部類ConditionObject構建條件等待隊列,來完成等待條件線程的排隊工作。當線程調用Condition對象的wait方法后會被加入等待隊列中,當有線程調用Condition的signal方法后,線程將從等待隊列移動到同步隊列進行鎖競爭。AQS內部會有一個同步隊列和可能多個等待隊列,前者存放等待獲取鎖的線程,后者分別存放等待不同條件的線程。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer{
//指向同步隊列隊頭
private transient volatile Node head;

//指向同步的隊尾
private transient volatile Node tail;

//同步狀態,0代表鎖未被占用,1代表鎖已被占用
private volatile int state;

static final class Node {
    //共享模式
    static final Node SHARED = new Node();
    //獨占模式
    static final Node EXCLUSIVE = null;

    //標識線程已處於結束狀態
    static final int CANCELLED =  1;
    //等待被喚醒狀態
    static final int SIGNAL    = -1;
    //條件狀態,
    static final int CONDITION = -2;
    //在共享模式中使用表示獲得的同步狀態會被傳播
    static final int PROPAGATE = -3;

    //等待狀態,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4種
    volatile int waitStatus;

    //同步隊列中前驅結點
    volatile Node prev;

    //同步隊列中后繼結點
    volatile Node next;

    //請求鎖的線程
    volatile Thread thread;

    //等待隊列中的后繼結點,這個與Condition有關,稍后會分析
    Node nextWaiter;

    //判斷是否為共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //獲取前驅結點
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    //.....
}
//上面Node head、tail對象構建同步隊列,這里用ConditionObject類的對象創建條件等待隊列
class ConditionObject implements Condition, java.io.Serializable { //等待隊列第一個等待結點 private transient Node firstWaiter; //等待隊列最后一個等待結點 private transient Node lastWaiter; //省略其他代碼....... } //AQS中的模板方法,由其子類實現 //獨占模式下獲取鎖的方法 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //獨占模式下解鎖的方法 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } //共享模式下獲取鎖的方法 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //共享模式下解鎖的方法 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } //判斷是否為持有獨占鎖 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } }

二、AQS的應用

AQS通過state狀態管理、同步隊列、等待隊列實現了多線程同步鎖獲取與釋放,多線程並發排隊、條件等待等復雜功能。

作為基礎組件,它對鎖的兩種模式【獨占模式和共享模式】都提供支持。設計上AQS采用模板方法模式構建,其內部提供了並發操作的核心方法、而將一些實現不同模式下實現可能有差異的操作定義為模板方法,讓其子類實現。如ReentrantLock通過內部類Sync及其子類繼承AQS實現tryAcuire()和tryRelease()方法來實現獨占鎖,而SemaPhore則通過內部類繼承AQS實現tryAcquireShared()方法和tryReleaseShared()方法實現共享模式鎖。AQS的繼承關系圖如下:

三、ReetrantLock非公平鎖實現分析AQS的用法

ReetrantLock中非公平鎖
//加鎖操作
public void lock() {
     sync.lock();
}

/**
 * 非公平鎖實現sync.lock()
 */
static final class NonfairSync extends Sync {
    //加鎖
    final void lock() {
        //執行CAS操作,獲取同步狀態
        if (compareAndSetState(0, 1))
       //成功則將獨占鎖線程設置為當前線程  
          setExclusiveOwnerThread(Thread.currentThread());
        else
            //否則再次請求同步狀態
            acquire(1);
    }
}

//acquire為AQS本身實現的方法,其實現如下:
public final void acquire(int arg) {
    //再次嘗試獲取同步狀態
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}


//AQS子類Sync的子類NonfairSync中實現的tryAcquire方法
    protected final boolean tryAcquire(int acquires) {
         return nonfairTryAcquire(acquires);
     }

  //nonfairTryAcquire方法
  final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      //判斷同步狀態是否為0,並嘗試再次獲取同步狀態
      if (c == 0) {
          //執行CAS操作
          if (compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
          }
      }
      //如果當前線程已獲取鎖,屬於重入鎖,再次獲取鎖后將status值加1
      else if (current == getExclusiveOwnerThread()) {
          int nextc = c + acquires;
          if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
          //設置當前同步狀態,當前只有一個線程持有鎖,因為不會發生線程安全問題,可以直接執行 setState(nextc);
          setState(nextc);
          return true;
      }
      return false;
  }
 
//AQS本身實現的方法,將當前獲取鎖失敗的線程構造成node結點加入的同步隊列尾部,若隊列為空或者並發入隊失敗,則調用enq方法重試。
private Node addWaiter(Node mode) {
    //將請求同步狀態失敗的線程封裝成結點
    Node node = new Node(Thread.currentThread(), mode);

    Node pred = tail;
    //如果是第一個結點加入肯定為空,跳過。
    //如果非第一個結點則直接執行CAS入隊操作,嘗試在尾部快速添加
    if (pred != null) {
        node.prev = pred;
        //使用CAS執行尾部結點替換,嘗試在尾部快速添加
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //如果第一次加入或者CAS操作沒有成功執行enq入隊操作
    enq(node);
    return node;
}
//AQS本身實現方法,通過循環CAS操作將當前線程構造的node結點入隊,解決上面隊列為空或者是並發入隊失敗的情況;
private Node enq(final Node node) {
    //死循環
    for (;;) {
         Node t = tail;
         //如果隊列為null,即沒有頭結點
         if (t == null) { // Must initialize
             //創建並使用CAS設置頭結點
             if (compareAndSetHead(new Node()))
                 tail = head;
         } else {//隊尾添加新結點
             node.prev = t;
             if (compareAndSetTail(t, node)) {
                 t.next = node;
                 return t;
             }
         }
     }
    }
//AQS本身方法,隊列中結點循環觀察,當自己的前驅是head結點執行的結點時嘗試獲取鎖,若成功將其設置為頭結點,否則循環嘗試;若當前結點前驅結點不是頭結點,則在設置其前驅結點狀態后將自己掛起
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋,死循環
        for (;;) {
            //獲取前驅結點
            final Node p = node.predecessor();
            當且僅當p為頭結點才嘗試獲取同步狀態
            if (p == head && tryAcquire(arg)) {
                //將node設置為頭結點
                setHead(node);
                //清空原來頭結點的引用便於GC
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果前驅結點不是head,判斷是否掛起線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            //最終都沒能獲取同步狀態,結束該線程的請求
            cancelAcquire(node);
    }
}
//設置為頭結點
private void setHead(Node node) {
        head = node;
        //清空結點數據
        node.thread = null;
        node.prev = null;
}
//如果前驅結點不是head,判斷是否掛起線程
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())

      interrupted = true;
}

//AQS本身的方法,若前驅結點狀態為Node.SIGNAL則返回true,表示可以掛起當前結點,否則找到非結束狀態的前驅結點,並設置其狀態后,返回false
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //獲取當前結點的等待狀態
        int ws = pred.waitStatus;
        //如果為等待喚醒(SIGNAL)狀態則返回true
        if (ws == Node.SIGNAL)
            return true;
        //如果ws>0 則說明是結束狀態,
        //遍歷前驅結點直到找到沒有結束狀態的結點
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果ws小於0又不是SIGNAL狀態,
            //則將其設置為SIGNAL狀態,代表該結點的線程正在等待喚醒。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
//AQS本身方法,掛起當前線程並檢查其中斷狀態
private final boolean parkAndCheckInterrupt() {
        //將當前線程掛起
        LockSupport.park(this);
        //獲取線程中斷狀態,interrupted()是判斷當前中斷狀態,
        //並非中斷線程,因此可能true也可能false,並返回
        return Thread.interrupted();
}

//ReentrantLock類的unlock
public void unlock() {
    sync.release(1);
}

//AQS類的release()方法
public final boolean release(int arg) {
    //嘗試釋放鎖
    if (tryRelease(arg)) {

        Node h = head;
        if (h != null && h.waitStatus != 0)
            //喚醒后繼結點的線程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//ReentrantLock類中的內部類Sync實現的tryRelease(int releases) 
protected final boolean tryRelease(int releases) {

      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread())
          throw new IllegalMonitorStateException();
      boolean free = false;
      //判斷狀態是否為0,如果是則說明已釋放同步狀態
      if (c == 0) {
          free = true;
          //設置Owner為null
          setExclusiveOwnerThread(null);
      }
      //設置更新同步狀態
      setState(c);
      return free;
  }
//AQS本身方法,喚醒后續掛起的結點
private void unparkSuccessor(Node node) {
    //這里,node一般為當前線程所在的結點。
    int ws = node.waitStatus;
    if (ws < 0)//置零當前線程所在的結點狀態,允許失敗。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一個需要喚醒的結點s
    if (s == null || s.waitStatus > 0) {//如果為空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)//從這里可以看出,<=0的結點,都是還有效的結點。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//喚醒
}

 

 剖析基於並發AQS的重入鎖(ReetrantLock)及其Condition實現原理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM