AQS共享鎖應用之Semaphore原理


我們調用Semaphore方法時,其實是在間接調用其內部類或AQS方法執行的。Semaphore類結構與ReetrantLock類相似,內部類Sync繼承自AQS,然后其子類FairSync和NoFairSync分別實現公平鎖和非公平鎖的獲取鎖方法tryAcquireShared(int arg),而釋放鎖的tryReleaseShared(int arg)方法則有Sync類實現,因為非公平或公平鎖的釋放過程都是相同的。

AQS通過state值來控制對共享資源訪問的線程數,有線程請求同步狀態成功state值減1,若超過共享資源數量獲取同步狀態失敗,則將線程封裝共享模式的Node結點加入到同步隊列等待。有線程執行完任務釋放同步狀態后,state值會增加1,同步隊列中的線程才有機會獲得執行權。公平鎖與非公平鎖不同在於公平鎖申請獲取同步狀態前都會先判斷同步隊列中釋放存在Node,若有則將當前線程封裝成Node結點入隊,從而保證按FIFO的方式獲取同步狀態,而非公平鎖則可以直接通過競爭獲取線程執行權。

//Semaphore的acquire()
public void acquire() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
  }

/**
*  注意Sync類繼承自AQS
*  AQS的acquireSharedInterruptibly()方法
*/ 
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //判斷是否中斷請求
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果tryAcquireShared(arg)不小於0,則線程獲取同步狀態成功
    if (tryAcquireShared(arg) < 0)
        //未獲取成功加入同步隊列等待
        doAcquireSharedInterruptibly(arg);
}
//Semaphore中非公平鎖NonfairSync的tryAcquireShared()
protected int tryAcquireShared(int acquires) {
    //調用了父類Sync中的實現方法
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
         //使用死循環
         for (;;) {
             int available = getState();
             int remaining = available - acquires;
             //判斷信號量是否已小於0或者CAS執行是否成功
             if (remaining < 0 ||
                 compareAndSetState(available, remaining))
                 return remaining;
         }
     }
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
     //創建共享模式的結點Node.SHARED,並加入同步隊列
   final Node node = addWaiter(Node.SHARED);
     boolean failed = true;
     try {
         //進入自旋操作
         for (;;) {
             final Node p = node.predecessor();
             //判斷前驅結點是否為head
             if (p == head) {
                 //嘗試獲取同步狀態
                 int r = tryAcquireShared(arg);
                 //如果r>0 說明獲取同步狀態成功
                 if (r >= 0) {
                     //將當前線程結點設置為頭結點並傳播               
                     setHeadAndPropagate(node, r);
                     p.next = null; // help GC
                     failed = false;
                     return;
                 }
             }
           //調整同步隊列中node結點的狀態並判斷是否應該被掛起
           //並判斷是否需要被中斷,如果中斷直接拋出異常,當前結點請求也就結束
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 throw new InterruptedException();
         }
     } finally {
         if (failed)
             //結束該結點線程的請求
             cancelAcquire(node);
     }
    }

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //獲取當前結點的等待狀態
        int ws = pred.waitStatus;
        //如果為等待喚醒(SIGNAL)狀態則返回true
        if (ws == Node.SIGNAL)
            return true;
        //如果ws>0 則說明是結束狀態,
        //遍歷前驅結點直到找到沒有結束狀態的結點
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果ws小於0又不是SIGNAL狀態,
            //則將其設置為SIGNAL狀態,代表該結點的線程正在等待喚醒。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

private final boolean parkAndCheckInterrupt() {
        //將當前線程掛起
        LockSupport.park(this);
        //獲取線程中斷狀態,interrupted()是判斷當前中斷狀態,
        //並非中斷線程,因此可能true也可能false,並返回
        return Thread.interrupted();
}

//不可中的acquireShared()
public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //沒有拋出異常中的。。。。
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);//設置為頭結點
        /* 
         * 嘗試去喚醒隊列中的下一個節點,如果滿足如下條件: 
         * 調用者明確表示"傳遞"(propagate > 0), 
         * 或者h.waitStatus為PROPAGATE(被上一個操作設置) 
         * 並且 
         *   下一個節點處於共享模式或者為null。 
         * 
         * 這兩項檢查中的保守主義可能會導致不必要的喚醒,但只有在有
         * 有在多個線程爭取獲得/釋放同步狀態時才會發生,所以大多
         * 數情況下會立馬獲得需要的信號
         */  
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
            //喚醒后繼節點,因為是共享模式,所以允許多個線程同時獲取同步狀態
                doReleaseShared();
        }
    }

//Semaphore的release()
public void release() {
       sync.releaseShared(1);
}

//調用到AQS中的releaseShared(int arg) 
public final boolean releaseShared(int arg) {
       //調用子類Semaphore實現的tryReleaseShared方法嘗試釋放同步狀態
      if (tryReleaseShared(arg)) {
          doReleaseShared();
          return true;
      }
      return false;
  }

//在Semaphore的內部類Sync中實現的
protected final boolean tryReleaseShared(int releases) {
       for (;;) {
              //獲取當前state
             int current = getState();
             //釋放狀態state增加releases
             int next = current + releases;
             if (next < current) // overflow
                 throw new Error("Maximum permit count exceeded");
              //通過CAS更新state的值
             if (compareAndSetState(current, next))
                 return true;
         }
        }

private void doReleaseShared() {
    /* 
     * 保證釋放動作(向同步等待隊列尾部)傳遞,即使沒有其他正在進行的  
     * 請求或釋放動作。如果頭節點的后繼節點需要喚醒,那么執行喚醒  
     * 動作;如果不需要,將頭結點的等待狀態設置為PROPAGATE保證   
     * 喚醒傳遞。另外,為了防止過程中有新節點進入(隊列),這里必  
     * 需做循環,所以,和其他unparkSuccessor方法使用方式不一樣  
     * 的是,如果(頭結點)等待狀態設置失敗,重新檢測。 
     */  
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            // 獲取頭節點對應的線程的狀態
            int ws = h.waitStatus;
            // 如果頭節點對應的線程是SIGNAL狀態,則意味着頭
            //結點的后繼結點所對應的線程需要被unpark喚醒。
            if (ws == Node.SIGNAL) {
                // 修改頭結點對應的線程狀態設置為0。失敗的話,則繼續循環。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 喚醒頭結點h的后繼結點所對應的線程
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果頭結點發生變化,則繼續循環。否則,退出循環。
        if (h == head)                   // loop if head changed
            break;
    }
}


//喚醒傳入結點的后繼結點對應的線程
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
      if (ws < 0)
          compareAndSetWaitStatus(node, ws, 0);
       //拿到后繼結點
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
          s = null;
          for (Node t = tail; t != null && t != node; t = t.prev)
              if (t.waitStatus <= 0)
                  s = t;
      }
      if (s != null)
          //喚醒該線程
          LockSupport.unpark(s.thread);
    }

 剖析基於並發AQS的共享鎖的實現(基於信號量Semaphore)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM