AQS的原理在於,每當有新的線程請求資源時,該線程會進入一個等待隊列(Waiter Queue),只有當持有鎖的線程釋放資源后,該線程才能持有資源。該等待隊列的實現方式是雙向鏈表,線程會被包裹在鏈表節點Node中。Node即隊列的節點對象,它封裝了各種等待狀態(典型的狀態機模式),前驅和后繼節點信息,以及它對應的線程。
AQS定義兩種資源共享方式:Exclusive(獨占,在特定時間內,只有一個線程能夠執行,如ReentrantLock)和share(共享,多個線程可以同時執行,如ReadLock、Semaphore、CountDownLatch),可見不同的實現方式征用共享資源的方式不同,由此,自定義同步器在實現時要根據需求來實現共享資源state的獲取與釋放方式即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經實現好了。
Node節點
1 //標記節點為共享模式
2 static final Node SHARED = new Node(); 3 //標記節點為獨占模式
4 static final Node EXCLUSIVE = null; 5 //等待狀態
6 volatile int waitStatus; 7 //前驅結點
8 volatile Node prev; 9 //后繼節點
10 volatile Node next; 11 //線程
12 volatile Thread thread;
自定義同步器時主要需要實現以下幾種方法:
- isHeldExclusively():該線程是否正在獨占資源,只有用到condition時才需要去使用它。
- tryAcquire(int):獨占方式,嘗試獲取資源,返回boolean。
- tryRelease(int):獨占方式,嘗試釋放資源,返回boolean。
- tryAcquireShared(int):共享方式,嘗試獲取資源,返回int。
- tryReleaseShared(int):共享方式,嘗試釋放資源,返回boolean。
等待隊列節點對象Node有四種不同的狀態:
- CANCELLED(1)已取消
- SIGNAL(-1)競爭獲勝需要喚醒
- CONDITION(-2)在condition隊列中等待
- PROPAGATE(-3)后續節點傳播喚醒操作,共享模式下使用
acquire方法執行流程
1 public final void acquire(int arg) { 2 if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { 3 selfInterrupt(); 4 } 5 }
1) 嘗試獲取鎖tryAcquire,返回值表示當前線程是否獲取鎖。
2) 如果獲取成功,那么說明當前對象已經持有鎖,執行中斷操作,中斷操作會解除線程阻塞。
3) 如果獲取失敗,那么把當前線程封裝為Waiter節點,等待隊列沒有節點時初始化隊列,有則使用compareAndSetTail()添加進Waiter隊列尾端。
4) acquiredQueue自旋獲取資源,並且返回Waiter節點持有的線程的應當具備的中斷狀態。
5) 根據返回結果來確定是否需要執行線程中斷操作。
1 private Node addWaiter(Node mode) { 2 //封裝當前節點為node節點
3 Node node = new Node(Thread.currentThread(), mode); 4 Node pred = tail; 5 if(pred != null) { 6 //將node節點的前驅節點設置為tail
7 node.prev = pred; 8 //多線程環境下,tail可能已經被其它線程修改了,這里校驗pred是否依然是為節點 9 //如果是,那么將node設置為尾結點,原尾結點的后繼節點設置為node,返回node
10 if(compareAndSelfTail(pred, node)) { 11 pred.next = node; 12 return node; 13 } 14 } 15 //執行到這里,說明tail為null,或者tail已經發生了變動
16 enq(node); 17 return node; 18 }
1 private Node enq(final Node node) { 2 //下面這個死循環用於把node節點插入到隊尾,由於多線程環境下,tail節點可能 3 //隨時變動,必須不停的嘗試,讓下面兩個操作不會被其它線程干涉。 4 //1,node.prev必須為當前尾結點 5 //2,node設置為新的尾結點
6 for(;;) { 7 Node t = tail; 8 //tail為空,也說明head為空,此時初始化隊列
9 if(t == null) { 10 //CAS方式初始化隊頭
11 if(compareAndSetHead(new Node())) 12 tail = head; 13 } else { 14 //設置node.prev為當前尾結點
15 node.prev = t; 16 //多線程環境下,此時尾結點可能已經被其它訪問修改了,需要CAS來進行比較 17 //如果t依然是尾結點,那么node設置為尾結點、
18 if(compareAndSetTail(t, node)) { 19 t.next = node; 20 return t; 21 } 22 } 23 } 24 }
acquiredQueued(Node)方法會接收addWaiter封裝好的Node對象,該方法的本質在於以自旋的方式獲取資源,即自旋鎖。它做了兩件事,如果指定節點的前驅節點時頭結點,那么再次嘗試獲取鎖,反之,嘗試阻塞當前線程。自旋不能構成死循環,否則會浪費大量CPU資源,在AQS中如果p==head&&tryAcquire(arg)條件不足時不會一直循環下去。通常,在p==head之前,必然會有一個線程得到鎖,此時tryAcquire()通過,循環結束。如果發生了極端情況,那么node.predecessor()也會在node==head的情況下拋出空指針異常,循環結束。shouldParkAfterFailedAcquire(p,node)檢測前驅節點的等待狀態,需要阻塞則調用partAndCheckInterrupt()方法會阻塞當前線程,該循環也不會無限制的消耗資源。
1 final boolean acquireQueued(final Node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for(;;) { 6 //找到node的前驅節點,如果node已經為head,那么會拋出空指針異常 7 //空指針異常說明整個等待隊列都沒有能夠獲取鎖的線程。
8 final Node p = node.predecessor(); 9 //前驅節點為頭結點時,當前線程嘗試獲取鎖 10 //如果獲取成功,那么node會成為新的頭結點,這個過程會清空node的線程信息。
11 if(p == head && tryAcquire(arg)) { 12 setHead(node); 13 p.next = null; 14 failed = false; 15 return interrupted; 16 } 17 //當前線程不能獲取鎖,則說明該節點需要阻塞 18 //shouldParkAfterFailedAcquire()用於檢查和設置節點阻塞狀態 19 //如果為通過檢查,那么說明沒有阻塞,parkAndCheckInterrupt()用於阻塞當前線程。
20 if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) 21 interrupted = true; 22 } 23 finally { 24 if(failed) cancelAcquire(node); 25 } 26 } 27 }
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 //前驅節點的等待狀態
3 int ws = pred.waitStatus; 4 if(ws == Node.SIGNAL) { 5 //SIGNAL表示前驅節點需要被喚醒,此時node是一定可以安全阻塞的,所以返回true
6 return true; 7 } 8 if(ws > 0) { 9 //大於0的等待狀態只有CANCELLED,從隊列里移除所有前置的CANCELLED節點。
10 do { 11 node.prev = pred = pred.prev; 12 } while (pred.waitStatus > 0); 13 pred.next = node; 14 } else { 15 //運行到這里,說明前驅節點處於0、CONDITION或者PROPAGATE狀態下 16 //此時該節點需要被置為SIGNAL狀態,等待被喚醒。
17 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 18 } 19 }
1 private final boolean parkAndCheckInterrupt() { 2 //LockSupport.park()用於阻塞當前線程
3 LockSupport.park(this); 4 return Thread.interruupted(); 5 }
由此可以得出結論,當一個新的線程節點入隊之后,會檢查它的前驅節點,只要有一個節點的狀態是SIGNAL,就表示當前節點之前的節點正在被等待喚醒,那么當前線程就需要被阻塞,以等待RentrantLock.unlock()喚醒之前的線程。
在過程2中,node1剛剛入隊,沒有爭搶到鎖,此時head狀態為初始化的0狀態,於是調用了compareAndSetWaitStatus(pred,ws,Node.SIGNAL),這個方法會把head的狀態改為SIGNAL。
在過程3中,acquired()方法里的for循環會在執行一次,此時,node1的前驅節點依然是head,如果它依然沒有競爭鎖,那么由於head的waitStatus屬性的值為SIGNAL,這會導致shouldParkAfterFailedAcquire()方法返回true,當前線程(node1持有的線程)被阻塞,代碼不在繼續往下執行。這樣就達到了讓等待隊列里的線程阻塞的目的,由此可以類推更多線程入隊的過程。由此可以類推更多線程入隊的過程:
SIGNAL狀態由release()方法進行修改,這個方法首先調用tryRelease()方法嘗試釋放鎖,它返回的是鎖是否處於可用狀態,如果鎖可用,那么該方法也不負責中斷等待線程的阻塞,它僅僅把鎖的線程持有者設為null;然后,如果成功的釋放鎖,那么判斷隊頭狀態,隊頭為空則說明隊列沒有等待線程,不再做其它操作,反之再判斷隊頭的狀態waitStatus,只要它不為0,就說明等待隊列中有被阻塞的節點。
1 public final boolean release(int arg) { 2 if(tryRelease(arg)) { 3 Node h = head; 4 if(h != null && h.waitStatus != 0) { 5 unparkSuccessor(h); 6 } 7 return true; 8 } 9 return false; 10 }
1 private void unparkSuccessor(Node node) { 2 int ws = node.waitStatus; 3 //小於0的狀態waitStatus只有SIGNAL和CONDITION
4 if(ws < 0) { 5 compareAndSetWaitStatus(node, ws, 0); 6 } 7 Node s = node.next; 8 //前驅查找需要喚醒的節點
9 if(s == null || s.waitStatus > 0) { 10 s = null; 11 for(Node t = tail; t != null && t != node; t = t.prev) { 12 if(t.waitStatus <= 0) s = t; 13 } 14 } 15 if(s != null) { 16 LockSupport.unpark(s.thread); 17 } 18 }
unparkSuccessor()負責確保中斷正確的線程阻塞。在ReentrantLock.unlock()的調用過程中,unparkSuccessor(Node node)的形參node始終為head節點,這個方法執行的主要操作為:
- 首先把head節點的waitStatus設置為0,表示隊列里沒有需要中斷阻塞的線程。
- 然后確定需要被喚醒的節點,該節點是隊列中第一個waitStatus小於等於0的節點。
- 最后,調用LockSupport.unlock()方法中斷指定線程的阻塞狀態。
需要注意的是,node1對應的線程此時已經中斷了阻塞,它會開始繼續執行AQS的AacquireQueued()方法中for循環的代碼final Node p = node.predecessor();顯然node1的前驅節點head由於鎖已經被釋放,隊列變化為
這部分代碼比較巧妙,可以注意到,在釋放的過程中,代碼里並沒有改變head的waitStatus為SIGNAL,而是直接使用node1替代了原先的head。換言之,原本需要修改head/node2的前驅和后置,並且把head的waitStatus修改為SIGNAL,使用當前的代碼,只需要釋放node1的持有線程,然后移除head節點,這樣可以更快的到達隊列規整的目的。
AQS如何阻塞線程和中斷阻塞
在acquired()方法中,當前線程嘗試獲取鎖,如果沒有獲得,那么會把線程加入等待隊列中,加入到隊列的線程會被阻塞。
線程阻塞有三種常見的實現方式:Object.wait()、Thread.join()、或者Thread.sleep()。
中斷阻塞則通過Thread.interrupt()方法來實現,這個方法會發出一個中斷信號量從而導致線程拋出中斷異常InterruptedException,已達到結束阻塞的目的。需要注意的是Interrupt不會中斷用戶循環體造成阻塞,它僅僅是拋出信號量,具體處理方式還是由用戶處理。Thread.isInterrupted可以得到中斷狀態。
對於wait、sleep、join等會造成線程阻塞的方法,由於它們都會拋出Interrupted Exception,處理方式如下
1 try { 2 Thread.currentThread().sleep(500); 3 } catch (InterruptedException e) { 4 //中斷后拋出異常,在異常捕獲里可以對中斷定制處理
5 }
對循環體處理方式如下表示:
1 //使用Thread.isInterrupted方法獲取中斷信號量
2 while(!Thread.currentThread().isInterrupted && 用戶自定義條件) { 3 }