AbstractQueuedSynchronizer:抽象同步隊列,簡稱AQS主要依賴一個int成員變量來表示同步狀態state,以及一個CLH等待隊列
AQS的等待隊列是一個CLH(Craig, Landin, and Hagersten lock queue)隊列:競爭資源同一時間只能被一個線程訪問,CLH為管理等待鎖的線程的隊列
AQS的子類被推薦定義為自定義同步組件的靜態內部類
AQS自身不實現任何同步接口
AQS既支持獨占式獲取同步狀態,也可以支持共享室獲取同步狀態,這樣以方便的實現不同類型同步組件
AQS是實現鎖(或者CountDownLatch,Semaphore)等同步組件的關鍵,在這些同步組件中聚合AQS
AQS面向同步組件的實現者,簡化了同步組件的實現方式,屏蔽了同步狀態state的管理,線程排隊,等待、喚醒等底層操作
同步組件面向使用者,定義了使用者和同步組件交互的基本接口
AQS持有的成員變量有head和tail,分別指向隊列頭結點和尾結點,都為node類型,
即通過持有等待隊列的頭尾指針來管理等待隊列,如下圖所示,節點左側為prev指針,右側為next指針
頭結點無前驅節點
尾結點無后繼節點.
AQS的獨占和共享
對於 AQS來說,線程同步關鍵是對狀態值state進行操作,根據state判斷是否屬於同一個線程,操作state方式分為獨占方式,共享方式
獨占方式入隊
public final void acquire(int arg) { //獲取成功直接返回 //獲取失敗則將當前線程封裝為Node.EXCLUSIVE的Node節點插入AQS阻塞隊列的尾部 //,並調用LockSupport.park(this)方式阻塞自己 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire方法失敗,則進行入隊操作
//等待隊列添加節點 private Node addWaiter(Node mode) { //(1)構建當前線程為Node類型 Node node = new Node(Thread.currentThread(), mode); //(2)用臨時變量取的尾結點的引用 Node pred = tail; //(3)當前尾結點是否存在 if (pred != null) { //(3.1)將當前節點在等待隊列尾部插入 node.prev = pred; //(3.1.1)CAS更新尾結點 if (compareAndSetTail(pred, node)) { //(3.1.2)更新成功將舊的尾結點的next指針指向新尾結點 pred.next = node; return node; } } //(3.2)當前尾結點不存在,說明當前線程是第一個加入等待隊列進行等待的線程 enq(node); return node; }
分析上述代碼,注釋已經寫的很明確,有個問題就是,3.1.1過程如果失敗了,而尾結點不為空,則同樣會執行無尾結點走的enq方法,因此,猜測enq()有兩個作用
- 當前等待隊列尾結點不存在的情況下的入隊操作
- CAS尾結點后插入節點失敗后的自旋(死循環)重試
下面來看看enq方法
private Node enq(final Node node) { //(2.3)失敗自旋 for (;;) { //(1)用臨時變量取的尾結點的引用 Node t = tail; if (t == null) { //(2.1)如果尾結點不存在,則初始化頭結點 if (compareAndSetHead(new Node())) //(2.1.1)讓尾結點也指向頭結點的引用 tail = head; } else { //(2.2)尾結點存在,將當前節點在等待隊列尾部插入 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
從上述源碼可以看到,
- 如果尾結點不存在,入隊操作前會創建一個頭結點,然后將尾指針也指向這個節點,在下次嘗試的時候,發現有了尾結點,走2.2步驟,在2.1步驟產生的節點后面插
- 如果一開始尾結點存在,說明addWaiter這一步CAS失敗,然后進入enq方法自旋嘗試尾部插入直到成功
在AQS等待隊列上的線程獲取獨占鎖
//等待隊列中的線程獲取鎖 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //(1)自旋 for (;;) { //(2)獲得當前節點的前驅節點(node相當於當前線程,上一個節點可以理解為上一個等待鎖的線程) final Node p = node.predecessor(); //(2.1)如果當前節點的前驅節點是頭節點(說明當前節點是順位第一位需要獲取鎖的線程)並且成功獲取同步狀態,即獲取到獨占鎖 // (tryAcquire在具體的同步組件中實現,比如ReentrantLock中的tryAcquire) if (p == head && tryAcquire(arg)) { //(2.1.1)將當前節點設置為新的頭節點(清除prev和thread信息) setHead(node); //(2.1.2)釋放當前節點的前驅節點,方便GC將該內存回收 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
當前線程獲取到鎖后,會成為新的頭節點,舊的頭節點將被GC回收
shouldParkAfterFailedAcquire通過以下規則,判斷當前線程是否需要被阻塞
- 如果前驅節點狀態為SIGNAL,表示當前節點需要被unpack(喚醒),此時返回true
- 如果前驅節點狀態為CANCELLED(ws>0),說明前驅節點已經被取消,則通過先前回溯找到一個有效(非CANCELLED狀態)的節點,並返回false
- 如果前驅節點為非SIGNAL,非CANCELLED,則設置前驅節點狀態為SIGNAL,並返回false
默認狀況下,節點的狀態都是0,為INITIAL,此時通過上述CAS將pred前驅節點的等待狀態改為Node.SIGNAL,返回false,進入下次自旋,直到shouldParkAfterFailedAcquire返回true,開始執行parkAndCheckInterrupt().
private final boolean parkAndCheckInterrupt() { //掛起當前線程 LockSupport.park(this); //返回線程中斷標志 return Thread.interrupted(); }