看看AQS阻塞隊列和條件隊列


  上一篇簡單介紹了AQS,我們大概知道AQS就是一個框架,把很多功能都給實現了(比如入隊規則,喚醒節點中的線程等),我們如果要使用的話只需要實現其中的一些方法(比如tryAcquire等)就行了!這次主要說說AQS中阻塞隊列的的入隊規則還有條件變量;

 

一.AQS入隊規則

  我們仔細分析一下AQS是如何維護阻塞隊列的,在獨占方式獲取資源的時候,是怎么將競爭鎖失敗的線程丟到阻塞隊列中的呢?

  我們看看acquire方法,這里首先會調用子類實現的tryAcquire方法嘗試修改state,修改失敗的話,說明線程競爭鎖失敗,於是會走到后面的這個條件;

  這個addWaiter方法就是將當前線程封裝成一個Node.EXCLUSIVE類型的節點,然后丟到阻塞隊列中;

 

  第一次還沒有阻塞隊列的時候,會到enq方法里面,我們仔細看看enq方法

 

  enq()方法中,我們在第一次進入這個方法的時候,下面圖一所示,tail和head都指向null;

  第一次循環,到首先會到圖二,然后判斷t所指向的節點是不是null,如果是的話,就用CAS更新節點,這個CAS我們可以看作:頭節點head為null,我們把head節點更新為一個哨兵節點(哨兵節點就是new Node()),再將tail也指向head,就是圖三了

 

  第二次for循環:走到上面的else語句,將新節點的前一個節點設置為哨兵節點;

 

  然后就是CAS更新節點,這里CAS的意思:如果最后的節點tail指向的和t是一樣的,那么就將tail指向node節點

 

  最后再將t的下一個節點設置為node,下圖所示,就ok了

 

二.AQS條件變量的使用

  什么是條件變量呢?我們在開始介紹AQS的時候,還有一個內部類沒有說,就是ConditionObject,還記得前面說過的Unsafe中的park和unpark方法嗎?而這個ConditionObject就對這兩個方法進行了一次封裝,await()和signal()方法,但是更靈活,可以創建多個條件變量,每個條件變量維護一個條件隊列(就是一個單向鏈表,可以看到Node這個內部類中個屬性是nextWaiter);

  注意:每一個條件變量里面都維護了一個條件隊列

  舉個例子,如下所示;

package com.example.demo.study;

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Study0201 { public static void main(String[] args) throws InterruptedException { // 創建鎖對象 ReentrantLock lock = new ReentrantLock(); // 創建條件變量 Condition condition = lock.newCondition(); // 以下創建兩個線程,里面都會獲取鎖和釋放鎖 Thread thread1 = new Thread(() -> { lock.lock(); try { System.out.println("await begin"); // 注意,這里調用條件變量的await方法,當前線程就會丟到condition條件變量中的條件隊列中阻塞  condition.await(); System.out.println("await end"); } catch (InterruptedException e) { // } finally { lock.unlock(); } }); Thread thread2 = new Thread(() -> { lock.lock(); try { System.out.println("signal begin"); // 喚醒被condition變量內部隊列中的某個線程  condition.signal(); System.out.println("signal end"); } finally { lock.unlock(); } }); thread1.start(); Thread.sleep(500); thread2.start(); } }

 

  還可以創建多個條件變量,如下所示,每一個條件變量都維護了一個條件隊列:

package com.example.demo.study;

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class Study0201 { public static void main(String[] args) throws InterruptedException { // 創建鎖對象 ReentrantLock lock = new ReentrantLock(); // 創建條件變量1 Condition condition1 = lock.newCondition(); //條件變量2 Condition condition2 = lock.newCondition(); // 以下創建兩個線程,里面都會獲取鎖和釋放鎖 Thread thread1 = new Thread(() -> { lock.lock(); try { System.out.println("await begin");//1  condition1.await(); System.out.println("await end");//5  System.out.println("condition2---signal---start");//6  condition2.signal(); System.out.println("condition2---signal---endend");//7 } catch (InterruptedException e) { // } finally { lock.unlock(); } }); Thread thread2 = new Thread(() -> { lock.lock(); try { System.out.println("signal begin");//2  condition1.signal(); System.out.println("signal end");//3  System.out.println("condition2---await---start");//4  condition2.await(); System.out.println("condition2---await---end");//8 } catch (InterruptedException e) { // } finally { lock.unlock(); } }); thread1.start(); Thread.sleep(500); thread2.start(); } }

 

 三.走進條件變量

  我們看看上面的獲取條件變量的方式Condition condition1 = lock.newCondition(),我們打開newCondition方法,最后就是創建一個ConditionObject實例;這個類是AQS的內部類,通過這個類可以訪問AQS內部的屬性和方法;

  注意:在調用await方法和signal方法之前,必須要先獲取鎖

 

  然后我們再看看條件變量的await方法,下圖所示,我們可以進入到addConditionWaiter()方法內部看看:

public final void await() throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException(); //新建一個Node.CONDITION節點放到條件隊列最后面 Node node = addConditionWaiter(); //釋放當前線程獲取的鎖 int savedState = fullyRelease(node); int interruptMode = 0; //調用park()方法阻塞掛起當前線程 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled  unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

 

private Node addConditionWaiter() {
    Node t = lastWaiter; //第一次進來,這個lastWaiter是null,即t = null,不會進入到這個if語句 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //創建一個Node.CONDITION類型的節點,然后下面這個if中就是將第一個節點firstWaiter和最后一個節點都指向這個新創建的節點 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }

 

  順便在看看signal方法:

public final void signal() {
    if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //條件隊列移除第一個節點,然后把這個節點丟到阻塞隊列中,然后激活這個線程 Node first = firstWaiter; if (first != null) doSignal(first); }

  

  我們想一想在AQS中阻塞隊列和條件隊列有什么關系啊?

  1.當多個線程調用lock.lock()方法的時候,只有一個線程獲取到可鎖,其他的線程都會被轉為Node節點丟到AQS的阻塞隊列中,並做CAS自旋獲取鎖;

  2.當獲取到鎖的線程對應的條件變量的await()方法被調用的時候,該線程就會釋放鎖,並把當前線程轉為Node節點放到條件變量對應的條件隊列中;

  3.這個時候AQS的阻塞隊列中又會有一個節點中的線程能得到鎖了,如果這個線程又恰巧調用了對應條件變量的await()方法時,又會重復2的步驟,然后阻塞隊列中又會有一個節點中的線程獲得鎖

  4.然后,又有一個線程調用了條件變量的signal()或者signalAll()方法,就會把條件隊列中一個或者所有的節點都移動到AQS阻塞隊列中,然后調用unpark方法進行授權,就等着獲得鎖了;

  一個鎖對應一個阻塞隊列,但是對應多個條件變量,每一個條件變量對應一個條件隊列;其中,這兩種隊列中存放的都是Node節點,Node節點中封裝了線程及其狀態,下圖所示:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM