AQS同步隊列器之二:等待通知機制


一、簡介

    Condition是在java 1.5中才出現的,它用來替代傳統的Object的wait()、notify()實現線程間的協作,相比使用Object的wait()、notify(),使用Condition的await()、signal()這種方式實現線程間協作更加安全和高效。簡單說,他的作用是使得某些線程一起等待某個條件(Condition),只有當該條件具備(signal 或者 signalAll方法被調用)時,這些等待線程才會被喚醒,從而重新爭奪鎖。wait()、notify()這些都更傾向於底層的實現開發,而Condition接口更傾向於代碼實現的等待通知效果。兩者之間的區別與共通點也可以了解一下:

對比項 Object監視器 Condition
前置條件 獲取對象的鎖

調用Lock.lock()獲取鎖

Lock.newCondition獲取Condition對象

調用方式 直接調用Object.notify() 直接調用condition.await()
等待隊列的個數 一個 多個
當前線程釋放鎖進入等待狀態 支持 支持
當前線程釋放鎖進入等待狀態在等待狀態中不斷響應中斷 不支持 支持
當前線程釋放鎖並進入等待超時狀態 支持 支持
當前線程釋放鎖並進入等待狀態直到將來的某個時間 不支持 支持
喚醒等待隊列中的一個線程 支持notify() 支持condition.signal()
喚醒等待隊列中的全部線程 支持notifyAll() 支持condition.signalAll()

相比之下Condition提供了比Object監視器更方便更全面的處理方式,而且使用起來也依舊很簡單。

二、簡單使用示例

 1 package cn.memedai;  2 
 3 import java.util.concurrent.locks.Condition;  4 import java.util.concurrent.locks.Lock;  5 import java.util.concurrent.locks.ReentrantLock;  6 
 7 /**
 8  * Lock與Condition接口示例  9  */
 10 public class LockConditionDemo {  11 
 12     //存儲地方
 13     class Depot {  14         private int capacity;  15         private int size;  16         private Lock lock;  17         private Condition fullCondition;  18         private Condition emptyCondition;  19 
 20         public Depot(int capacity) {  21             this.capacity = capacity;  22             this.size = 0;  23             this.lock = new ReentrantLock();  24             this.fullCondition = lock.newCondition();  25             this.emptyCondition = lock.newCondition();  26  }  27 
 28         //生產操作
 29         public void produce(int newSize) throws InterruptedException {  30  lock.lock();  31             int left = newSize;  32             try {  33                 while (left > 0) {  34                     //代表超過了容量就不能再生產了
 35                     while (size >= capacity) {  36                         fullCondition.await();//進行等待處理
 37  }  38                     //獲取實際生產的數量(及庫存中新增的數量)  39                     //如果庫存+要生產的大於了總的容量那么新增的就是總容量的數量相減
 40                     int inc = (size + left) > capacity ? (capacity - size) : left;  41                     size += inc;  42                     left -= inc;  43                     System.out.println(Thread.currentThread().getName() + "------left剩余" + left + "------size容量" + size + "-------inc增長" + inc);  44  emptyCondition.signal();  45  }  46             } finally {  47                 lock.unlock();//解鎖
 48  }  49  }  50 
 51         //消費操作
 52         public void consume(int newSize) throws InterruptedException {  53  lock.lock();  54             try {  55                 int left = newSize;  56                 while (left > 0) {  57                     //庫存為0等待生產者進行生產的操作
 58                     while (size <= 0) {  59  emptyCondition.await();  60  }  61                     int dec = (size < left) ? size : left;  62                     size -= dec;  63                     left -= dec;  64                     System.out.println(Thread.currentThread().getName() + "-------left剩余" + left + "-------size容量" + size + "--------減少量dec" + dec);  65  fullCondition.signal();  66  }  67             } finally {  68  lock.unlock();  69  }  70  }  71  }  72 
 73     //生產者
 74     class Producer{  75         private Depot depot;  76 
 77         public Producer(Depot depot) {  78             this.depot = depot;  79  }  80 
 81         //往存儲地方生產
 82         public void produce(final int newSize){  83             new Thread(){  84  @Override  85                 public void run() {  86                     try {  87  depot.produce(newSize);  88                     } catch (InterruptedException e) {  89  e.printStackTrace();  90  }  91  }  92  }.start();  93  }  94  }  95    //消費者
 96     class Customer{  97         private Depot depot;  98 
 99         public Customer(Depot depot) { 100             this.depot = depot; 101  } 102      //進行消費
103         public void consume(final int newSize){ 104             new Thread(){ 105  @Override 106                 public void run() { 107                     try { 108  depot.consume(newSize); 109                     } catch (InterruptedException e) { 110  e.printStackTrace(); 111  } 112  } 113  }.start(); 114  } 115  } 116 
117     public static void main(String[] args) { 118         Depot depot = new LockConditionDemo().new Depot(100); 119         Producer producer = new LockConditionDemo().new Producer(depot); 120         Customer customer = new LockConditionDemo().new Customer(depot); 121         producer.produce(60); 122         producer.produce(120); 123         customer.consume(90); 124         customer.consume(150); 125         producer.produce(110); 126  } 127 }

下面是這段代碼運行的一種結果:

Thread-1------left剩余20------size容量100-------inc增長100 Thread-2-------left剩余0-------size容量10--------減少量dec90 Thread-3-------left剩余140-------size容量0--------減少量dec10 Thread-4------left剩余10------size容量100-------inc增長100 Thread-3-------left剩余40-------size容量0--------減少量dec100 Thread-4------left剩余0------size容量10-------inc增長10 Thread-3-------left剩余30-------size容量0--------減少量dec10 Thread-1------left剩余0------size容量20-------inc增長20 Thread-3-------left剩余10-------size容量0--------減少量dec20 Thread-0------left剩余0------size容量60-------inc增長60 Thread-3-------left剩余0-------size容量50--------減少量dec10 

通過簡單的示例,使用Condition具備兩個條件,首先線程一定需要獲取到當前的同步狀態,其次必須從鎖中獲取到Condition對象,而condition.await()方法就對應了Object.wait()方法使得當前線程在滿足某種條件的時候就進行等待,condition.signal()就是在某種條件下喚醒當前線程。其配合lock接口的使用非常方便。

三、Condition等待/通知機制的實現原理

     首先可以看一下Condition接口的定義的相關方法:

    

    await():使當前線程進入等待狀態直到被signal()、signalAll()方法喚醒或者被中斷

    signal():喚醒等待中的一個線程

    signalAll():喚醒等待中的全部線程

Condition接口只是定義了相關的處理等待通知的方法,真正實現其等待通知效果的在AQS中的ConditionObject類,在了解源碼之前先講一下同步隊列和等待隊列:

    前面的文章講過當線程未獲取到同步狀態的時候,會創建一個Node節點並把這個節點放入同步隊列的尾部,進入同步隊列的中的線程都是阻塞的。

    在AQS中同步隊列和等待隊列都復用了Node這個節點類,一個同步狀態可以含有多個等待隊列,同時等待隊列只是一個單向的隊列。

 

一個AQS可以存在一個同步對列多個等待隊列

接下來可以看一些重點方法的源碼了解一下原理

await():使當前線程進入等待狀態

 1       public final void await() throws InterruptedException {  2 if (Thread.interrupted())//響應中斷  3 throw new InterruptedException();  4 Node node = addConditionWaiter();//放入到等待隊列中  5 int savedState = fullyRelease(node);//釋放同步狀態(同步隊列頭節點釋放狀態喚醒后繼節點獲取同步狀態)  6 int interruptMode = 0;
         //判斷是否在同步隊列中
7 while (!isOnSyncQueue(node)) { 8 LockSupport.park(this);//存在等待隊列中就阻塞該線程 9 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//判斷等待過程中是否被中斷過 10 break; 11 }
         //自旋去獲取同步狀態【在AQS中了解】獲取成功並且在退出等待時不拋出中斷異常(拋出了異常就會立馬被中斷)
12 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 13 interruptMode = REINTERRUPT;//在退出等待時重新中斷 14 if (node.nextWaiter != null) //如果存在其他節點 15 unlinkCancelledWaiters();//移除所有不是等待狀態的節點 16 if (interruptMode != 0) 17 reportInterruptAfterWait(interruptMode);//如果在等待過程中發現被中斷,就執行中斷的操作 18 }

 

addConditionWaiter():往等待隊列中添加元素

 1       private Node addConditionWaiter() {  2 Node t = lastWaiter;//等待隊列中的最后一個元素  4 if (t != null && t.waitStatus != Node.CONDITION) {//如果尾節點部位null,並且尾節點不是等待狀態中說明這個節點不應該待在等待隊列中  5  unlinkCancelledWaiters();//從等待隊列中移除  6 t = lastWaiter;  7  }  8 Node node = new Node(Thread.currentThread(), Node.CONDITION);//創建一個等待狀態的節點  9 if (t == null) 10 firstWaiter = node; 11 else 12 t.nextWaiter = node; 13 lastWaiter = node;//加入等待隊列的尾部 14 return node; 15 }

 

unlinkCancelledWaiters():將不是等待狀態的節點從等待隊列中移除

 1     private void unlinkCancelledWaiters() {  2             Node t = firstWaiter;//頭節點  3             Node trail = null;  4             while (t != null) {//存在節點  5                 Node next = t.nextWaiter;//下一個節點  6                 if (t.waitStatus != Node.CONDITION) {//如果不是出於等待中的狀態  7                     t.nextWaiter = null;//t的后指針引用清除  8                     if (trail == null)//前面是否存在節點  9                         firstWaiter = next;//下一個節點就是頭節點 10                     else
11                         trail.nextWaiter = next;//賦值給前節點的后指針引用 12                     if (next == null)//代表不存在元素了 13                         lastWaiter = trail; 14  } 15                 else
16                     trail = t;//將t賦值給trail 17                 t = next;//next賦值給t 18  } 19         }

 

fullyRelease(Node node):釋放當前狀態值,返回同步狀態

 1    final int fullyRelease(Node node) {  2         boolean failed = true;//失敗狀態  3         try {  4             int savedState = getState();//獲取當前同步狀態值  5             if (release(savedState)) {//獨占模式下釋放同步狀態,AQS獨占式釋放鎖、前面文章講過  6                 failed = false;//失敗狀態為false  7                 return savedState;//返回同步狀態  8             } else {  9                 throw new IllegalMonitorStateException(); 10  } 11         } finally { 12             if (failed) 13                 node.waitStatus = Node.CANCELLED;//取消等待狀態 14  } 15     }

 

isOnSyncQueue:判斷線程是否在同步隊列中

    final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null)//如果等待狀態為等待中,或者前繼節點為null代表第一種情況該節點出於等待狀態,第二種情況可能已經被喚醒不在等待隊列中了 return false; if (node.next != null) //如果后繼節點不為null代表肯定在等待隊列中
            return true;
        return findNodeFromTail(node);//從后往前找判斷是否在等待隊列中 }

總結一下等待操作:

    首先等待操作沒有進行CAS或者任何的同步操作,因為調用await()方法的是獲取當前lock鎖對象的線程,也就是同步隊列中的首節點,當調用await()方法后,將同步隊列的首節點創建一個等待節點放入等待隊列的尾部,然后釋放出同步狀態(不釋放同步狀態就會造成死鎖),喚醒同步隊列中的后繼節點,然后當前線程進入等待的狀態

 

調用await()方法過程   

signal():喚醒等待隊列中的一個線程

1     public final void signal() { 2             if (!isHeldExclusively())//判斷當前線程是否已經獲取同步狀態
3                 throw new IllegalMonitorStateException(); 4             Node first = firstWaiter;//等待隊列頭節點
5             if (first != null) 6                 doSignal(first);//具體實現方法喚醒第一個node
7         }

 

doSignal(Node node):具體處理喚醒節點的操作

     private void doSignal(Node first) { do { if((firstWaiter = first.nextWaiter) == null)//執行移除頭節點的操作 lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }

 

transferForSignal(Node node):喚醒的具體實現方式

   final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//將節點的等待狀態設置更改為初始狀態如果改變失敗就會被取消 return false; Node p = enq(node);//往同步隊列中添加節點【死循環方式】 int ws = p.waitStatus;//獲取節點的等待狀態 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//如果該結點的狀態為cancel 或者修改waitStatus失敗,則直接喚醒(這一步判斷是為了不立刻喚醒脫離等待中的線程,因為他要等同步隊列中的頭節點釋放同步狀態再去競爭) LockSupport.unpark(node.thread);//具體的喚醒操作 return true; }

 總結一下喚醒操作的流程:當調用signal()方法時,將等待隊列中的首節點拿出來,加入到同步隊列中,此時該節點不會立刻被喚醒因為就算被喚醒也是需要重新去獲取同步狀態的,而是在調用lock.unlock()方法釋放鎖以后將其喚醒獲取同步狀態。

 調用signal()喚醒方法過程

 

到現在為止,基本的Condition的等待通知機制已經講解完畢,至於附加功能的比如超時等待或者喚醒全部的功能在源碼上都差不了多少稍微新增一些功能需要,在原有的await()方法上增加了一些處理邏輯,真正的原理還是相差無幾的。

 

 

 

================================================================================== 

不管歲月里經歷多少辛酸和艱難,告訴自己風雨本身就是一種內涵,努力的面對,不過就是一場命運的漂流,既然在路上,那么目的地必然也就是前方。


==================================================================================


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM