AQS的數據結構及實現原理


  接下來從實現角度來分析同步器是如何完成線程同步的。主要包括:同步隊列、獨占式同步狀態獲取與釋放、共享式同步狀態獲取與釋放以及超時獲取同步狀態等。
1、同步隊列
  同步器依賴內部的一個同步隊列來完成同步狀態的管理。當線程獲取同步狀態失敗時,會被加入到隊列中,並同時阻塞線程。當同步狀態釋放時,會把首節點中的線程喚醒,使其在冊嘗試獲取同步狀態。(疑問: 1、確定只喚醒首節點么,這不就是公平方式獲取了么?2、首節點是否能一定獲取到鎖,為什么?
  a、確定只喚醒了首節點,的確這個獲取方式相對比較公平,雖然新節點也可能優先獲取到鎖。待稍后對比重入鎖獲取過程;
    [對比過后]:重入鎖獲取邏輯,就是先判斷是否有人在用鎖,如果有,判斷是不是當前線程,如果不是,則當前線程排隊,一旦進入排隊隊列,其實對這個隊列來說,真的就是公平鎖了,不公平的地方在於后來新來的線程可能會由於首節點剛好釋放而獲取到鎖,造成“插隊”;
  b、首節點不一定能獲取到,因為可能有新線程正好進入,然后獲取了鎖;
  同步隊列中的數據類型為Node,其中各個屬性描述如下:
int waitStatus
等待狀態:
1 ,在隊列中等待的線程等待超時或者被中斷,從隊列中取消等待;
-1,后繼節點處於等待;
-2,節點在等待隊列中,當condition被signal()后,會從等待隊列轉到同步隊列;
-3,表示下一次共享式同步狀態獲取將會被無條件傳播下去;
0,初始狀態
Node prev
前驅節點
Node next
后繼節點
Node nextWaiter
等待隊列中的后繼節點,如果當前節點是共享的,則這個字段將是一個SHARED常量,也就是說節點類型(獨占或共享)和等待隊列中的后繼節點共用同一字段
Thread thread
獲取同步狀態的線程
   看到這里有些懵b,疑問如下:
   1、狀態1是被中斷的,那compareAndSetState(0,1)不是設置為1就獲取到鎖了么,在這里貌似還是等待的?
  2、狀態-1,后繼節點處於等待,當前節點在干啥,不是等待么?
  3、狀態-3,說了個啥意思?
    就是覺得,這幾個狀態究竟怎么變化的,沒理清楚。
  4、節點next跟nextWaiter有啥區別,完全沒搞懂。
  這里,自己測試了一下,結論如下:
  a、狀態1是被中斷的,注意這里是說的waitStatus;compareAndSetState(0,1),這里說的是state,這在AQS里邊是兩個變量,不是一個。
    waitStatus用於記錄節點的狀態,state用於描述AQS的狀態(標記是否處於同步中,以及記錄重入次數)。
  b、狀態-1,后繼節點等待,當前節點不一定。
    如果當前節點為head節點,可能在等待(正好被新來的節點搶走了),也可能在執行;
    如果當前節點不是head節點,肯定在等待;
    也就是head節點執行時的狀態一樣也是-1,我們看一個debug的截圖:
  可以看到除了尾節點狀態是0,其它的都是-1,包括正在執行的head節點。
   c、狀態-3,說了個啥,,,還沒搞懂
  d、沒搞懂,,,,(后續來補充)
  同步器結構如下:
  看到這個圖,我想都知道是啥意思,只是有一點注意的,就是往隊尾添加元素的時候,因為可能多個線程都在獲取失敗后把自己往后邊加,所以這個操作應該保證線程安全。因此同步器提供了一個  compareAndSetTail(Node expect, Node update),它需要傳遞當前線程“認為”的尾節點和當前節點,只有設置成功后,當前節點才正式與之前的尾節點建立關聯。
  同步隊列遵循FIFO, 首節點是獲取同步狀態成功的節點,首節點的線程在釋放同步狀態時將會喚醒后繼節點,而后繼節點將會在獲取同步狀態成功時將自己設置為首節點。
2、獨占式同步狀態的獲取與釋放
  通過調用同步器的acquire(int arg)方法可以獲取同步狀態,該方法對中斷不敏感,也就是由於線程獲取同步狀態失敗后進入同步隊列中,后續對線程進行中斷操作時,線程不會從同步隊列移出。該方法代碼:
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  tryAcquire(int arg) 是獲取同步狀態,addWaiter(Node node)是構建node節點並添加到隊列尾部,acquireQuired(Node node,int arg)使得該節點以“死循環”的方式獲取同步狀態,selfInterrupt()是喚醒當前線程。如果獲取不到則阻塞節點中的線程,而被阻塞線程的喚醒主要依靠前驅節點的出隊或者阻塞線程被中斷來實現。
  這里,實際不會是死循環,因為其中的shouldParkAfterFailedAcquire()方法會把線程掛起而阻塞住。
  死循環獲取同步狀態的這個操作有點超出意外,看一下代碼:
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();   //獲取當前節點的前節點
            if (p == head && tryAcquire(arg)) {   // 前節點為head,並且自己獲取同步成功,說明前節點線程已經執行結束
                setHead(node);      //既然前節點的線程結束了,那就把自己設為head節點
                p.next = null; // help GC      //斷開前節點,便於回收
                failed = false;    
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())  //這里會將線程掛起
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  shouldParkAfterFailedAcquire(p, node)會掛起線程,該方法主要作用是根據前節點判斷當前線程是否應該被阻塞,如果前節點處於CANCELLED狀態,則順便刪除這些節點。阻塞的方法在parkAndCheckInterrupt中的LockSupport.park(this),這里最終調用了UNSAFE.park(false,0l)這個本地方法。關於LockSupport跟UNSAFE,可以參考: https://blog.csdn.net/secsf/article/details/78560013
  同步狀態的釋放是通過同步器的release(int arg)進行的,該方法在釋放了同步狀態后會喚醒其后繼節點(進而使后繼節點重新嘗試獲取同步狀態)。代碼如下:
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);   //喚醒后繼節點的操作在這里
        return true;
    }
    return false;
}
3、共享式同步狀態獲取與釋放
  共享式獲取與獨占式的主要區別就是是否允許同時多個線程獲取同步狀態,當然,這些線程必須也是共享式獲取。比如有一個線程在讀,那么再來幾個線程讀也是沒問題的,但如果來個線程要寫(獨占式)那就是不可以的。其主要代碼如下:
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)   //小於0,說明沒獲取成功
        doAcquireShared(arg);  //繼續獲取
}
  繼續獲取的邏輯:
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);   //節點類型為共享類型
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();  //尾節點的前一個節點(當前節點就是尾節點)
            if (p == head) { //前節點是head,則再次獲取鎖
                int r = tryAcquireShared(arg);   // 這個tryAcquireShared()的返回值是共享資源的剩余量,就是還可以允許訪問的線程數
                if (r >= 0) { //如果獲取到了鎖,進行相關設置
                    setHeadAndPropagate(node, r);  // 進行head節點替換,並且如果剩余量有剩余,則繼續往下傳遞
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&  parkAndCheckInterrupt()) //沒獲取到鎖,則將線程掛起
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  感覺跟獨占式並沒有多大差別,這里無非要注意釋放鎖必須是同步的,因為可能同時有多個線程進行釋放操作。
4、獨占式超時獲取同步狀態
  doAcquireNanos(int arg, long nanosTimeout)方法可以超時獲取同步狀態,即在指定時間段內獲取同步狀態,成功則返回true,否則返回false。該方法可以響應中斷。
  上代碼:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);   //節點類型是獨占式的
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();  
            if (p == head && tryAcquire(arg)) {  //前節點為head,則嘗試獲取鎖
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();   // 計算剩余時間
            if (nanosTimeout <= 0L) // 剩余時間已經到了
                return false;
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)  //沒獲取到鎖,時間也沒到,則掛起一小段時間。注意如果時間剩余非常小了,比spinForTimeoutThreshold還小,則不掛起了,直接死循環一小會兒,進行獲取鎖
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  感覺邏輯比較清楚,沒啥好說。
  做一個共享鎖的例子試一下:
public class TwinsLock implements Lock {
    private final Sync sync ;
    private static final class Sync extends AbstractQueuedSynchronizer{
        Sync(int count){
            if(count <= 0){
                throw new IllegalArgumentException("count must large than zero .");
            }
            setState(count); // 這里可以看到,count就是可重入的線程數
        }
        public int tryAcquireShared(int reduceCount){
            for(;;){
                int current = getState();
                int newCount = current - reduceCount;
                if(newCount < 0 || compareAndSetState(current, newCount)){//能把數量減掉並設置,就相當於獲取鎖成功
                    return newCount;
                }
            }
        }
        public boolean tryReleaseShared(int returnCount){
            for(;;){
                int current = getState();
                int newCount = current + returnCount;
                if(compareAndSetState(current, newCount)){
                    return true;
                }
            }
        }
    }
    public TwinsLock (int count){
        this.sync = new Sync(count);
    }
    @Override
    public void lock() {
        sync.acquireShared(1);
    }
    @Override
    public void unlock() {
        sync.releaseShared(1);
    }
// 其它方法
}
  測試:如果我們count設為1,則每次只允許一個線程進入,看上去應該跟排它鎖類似,測試代碼:
public class SharedTest {
    private int count = 0;
    private final TwinsLock twinsLock = new TwinsLock(1);
    @Test
    public void test(){
        MyThread mt1 = new MyThread();
        MyThread mt2 = new MyThread();
        MyThread mt3 = new MyThread();
        MyThread mt4 = new MyThread();
        MyThread mt5 = new MyThread();

        mt1.start();
        mt2.start();
        mt3.start();
        mt4.start();
        mt5.start();

        try {
            mt1.join();
            mt2.join();
            mt3.join();
            mt4.join();
            mt5.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("最終結果:" + count);
    }
    class MyThread extends Thread{
        @Override
        public void run() {
            for(int i=0;i<1000;i++){
                twinsLock.lock();
                try {
                    count = count + 1;
                }catch (Exception e){
                    System.out.println("異常啦 ~ ~ " +e.getMessage());
                }finally {
                    twinsLock.unlock();
                }
            }
        }
    }
}
  多次運行結果:
  測試代碼第3行改為new TwinsLock(2)后,執行結果就經常是一個小於5000的數,這是由於兩個線程相互覆蓋的原因。當然,這並不能證明每次就2個線程進入了,更好的測試代碼應該參考《java並發編程藝術》中的相關代碼;只是我讀的時候臨時想到了這個,就用這個測試了。
   比較好奇的是,共享跟獨占是怎么具體實現的,除了一個節點類型,具體判斷邏輯是咋寫的,兩個使用的是同一個同步隊列么?這個稍后結合讀寫鎖來理一下。
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM