AbstractQueuedSynchronized 以下簡稱AQS,是用來構建鎖或者其他同步組件的基礎框架。
在AQS中,為鎖的獲取和釋放提供了一些模板方法,而實現鎖的類(AQS的子類)需要實現這些模板方法中的同步方法。
這些方法包括:
·tryAcquire():嘗試以獨占模式獲取鎖
·tryRelease():嘗試釋放獨占模式的鎖
·tryAcquireShared():嘗試以共享模式獲取鎖
·tryReleaseShared():嘗試釋放共享模式的鎖
·isHeldExclusiverly():返回是否以獨占模式獲有鎖
在分析AQS的原理之前,我們先看看LockSupport、Lock,Condition、AQS、ReentrantLok等等之間的關系和使用方式。
關系圖:
使用方式有兩種:一種是不帶條件的普通的鎖,另一種是帶條件的鎖。
不帶條件:
/* * ReentrantLock是Lock接口的實現類之一 * 實現的是一種可重入的鎖 */ Lock lock = new ReentrantLock(); lock.lock(); try { //同步處理 }finally { lock.unlock(); }
帶條件的鎖:
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition();//創建和該鎖關聯的條件鎖 public void conditionWait() throws InterruptedException{ lock.lock(); try { condition.await(); }finally { lock.unlock(); } } public void ConditionSignal() throws InterruptedException{ lock.lock(); try { condition.signal(); }finally { lock.unlock(); } }
從使用方式可以看到,主要調用的方法就是:lock.lock()、lock.unlock()、lock.newCondition()、condition.await()、condition.signal()
下面分別來看看這幾個方法:
lock.lock():由於Lock是一個接口,所以需要通過其子類來實例化,所以lock.lock()其實調用的是子類的lock()方法,在上面的例子中自然調用的就是ReentrantLock.lock()方法。
public void lock() { sync.lock(); }
可以看到,ReentrantLock.lock()方法調用的是同步器內部類的lock()方法,而ReentrantLock的內部同步器類Sync又分為FairSync和NoFairSync。
源碼如下(省略了一部分內容):
abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); } static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } } static final class FairSync extends Sync { final void lock() { acquire(1); } }
可以看到在非公平的獲取鎖的方法中,會首先嘗試直接去獲取鎖,而不會通過同步隊列去排隊獲取鎖。否則的話,就通過AQS同步器的acquire(1)去獲取鎖。
※RetrantLock在初始化的時候可以通過參數指定是公平的還是不公平的鎖。默認是非公平的鎖。
lock.unlock():調用的是同步器中的release(1)方法,也就是AQS中的release(1)方法。
public void unlock() { sync.release(1); }
lock.newCondition():該方法調用的也是內部同步器類中的newCondition()方法。
public Condition newCondition() { return sync.newCondition(); }
在ReentrantLock的內部同步器類中的newCondition()方法如下:
final ConditionObject newCondition() { return new ConditionObject(); }
可以看到,該方法直接返回了一個AQS中的內部類ConditonObject對象。所以,在每次生成一個條件鎖的時候,都會創建一個ConditionObject對象,
而每個ConditionObject對象都會在內部維護一個條件等待隊列。
conditon.await():通過調用LockSupport中的park()方法,將當前掛起。
源碼如下:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
condition.signal():通過調用LockSupport的unpark()方法,來喚醒線程。
源碼如下:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
了解完上面的概述之后,我們知道了,在不帶條件的鎖中,主要是通過調用AQS中的acquire(1)herelease(1)方法來獲取鎖和釋放鎖的。
下面就來具體看看AQS中的實現
在AQS中通過一個雙向FIFO同步隊列來維護獲取對象鎖的線程,當獲取鎖失敗的時候,會用當前線程構建一個Node節點,加入到同步隊列中去。
在AQS中維護了同步隊列的head和tail節點,和同步狀態。。
Node節點中維護了當前線程的status和前驅節點、后繼節點、下一個等待節點(條件等待的時候用)。
waitStatus包含如下狀態默認值為0:
CANCELLED = 1 : 表示當前線程在等待的時候已經超時了或者被取消了
SIGNAL = -1 :當前線程釋放了同步狀態或者被取消的時候會通知后繼節點,使后繼節點得以運行
CONDITINO = -2:節點在等待隊列中,等待Condition,當其他線程在Condition上調用signal的時候,該線程會從等待隊列轉移到同步隊列中去,加入到同步狀態的獲取。
PROPAGATE = -3:表示下一次共享式同步狀態會無條件的傳播下去
AQS中同步器的結構如下:
當有節點加入到同步隊列的時候,只需要對tail節點重新指向就可以了
同步隊列是一個FIFO的隊列,在獲取鎖的時候總是首節點是獲取同步狀態的節點,首節點的線程在釋放同步狀態時,將會喚醒后繼節點,而后繼節點在獲取同步狀態成功時,會將自己設置為首節點。
下面就來具體看看在獲取鎖的時候lock.lock()調用的同步器中的acquire(1)方法的具體實現。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
以獨占模式獲取鎖,並且不相應中斷。在AQS中還有以下acquire方法:
acquireInterruptibly(int arg) :以獨占模式獲取鎖,並且響應中斷
acquireShared(int arg):以共享模式獲取鎖,並且不響應中斷
acquireSharedInterruptibly(int arg):以共享模式獲取鎖,並且響應中斷
該方法首先會調用tryAcquire(arg)來嘗試獲取鎖。從源碼可以看到,在AQS中該方法只是單純的拋出一個UnsupportedOperationException異常,該方法需要實現AQS同步器的具體類中實現。
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
我們看看ReentrantLock中的具體實現。
非公平鎖:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //如果當前的同步狀態為0,就嘗試直接設置同步狀態和設置獨占的線程為自己,來強制獲取鎖 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //如果當前同步狀態不為0,就判斷是不是自己獲取了鎖,這里是實現可重入的 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平鎖:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && //當前state為0 ,並且同步隊列中沒有前繼節點,就嘗試設置自己為獲得鎖的線程 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //實現可重入 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
可以看到,不論是在公平鎖還是非公平鎖的tryAcquire中,當獲取到鎖的時候返回的都是true,否則返回false。
所以,如果當前線程沒有獲取到鎖的時候,則會繼續執行后面的acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
我們先看看addWaiter(Node.EXCLUSIVE)的實現。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 首先為當前線程以指定模式構建一個Node節點,然后嘗試快速入隊的方式加入到同步隊列中去 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); //否則的話就調用enq(node)方法來加入到同步隊列中去 return node; }
再看看enq(node)的源碼實現
private Node enq(final Node node) { for (;;) { //通過一個無條件的循環,知道將構建一個空的head,然后將當前節點加入到空的head的后面,構成一個同步隊列 Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
再看看acquireQueued(node,int)的源碼實現
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //判斷前一個節點是不是head節點,如果是的話,則會再次嘗試去獲取鎖 setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //如果前一個節點不是head節點,則設置前一個非取消節點的狀態是signal,以確保在前一個線程釋放鎖的時候能喚醒當前線程 parkAndCheckInterrupt()) //掛起當前線程,並且返回當前線程是否被中斷過(會清空中斷狀態,只有在被喚醒的時候才能從park()方法返回) interrupted = true; } } finally { if (failed) cancelAcquire(node); //如果被中斷過,則把該節點從同步隊列中刪除 } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
acquire(1)的總結:
①首先會去調用子類實現的具體tryAcquire(1)方法來獲取鎖,根據子類不同,實現的也不同,例如ReentrantLock中的實現就分為公平鎖和非公平鎖。非公平鎖就是不會去同步隊列中排隊,
而是直接去獲取鎖。如果獲取失敗的話,就會跟公平鎖一樣,進入FIFO的同步隊列排隊。
②加入同步隊列的時候,首先會判斷tail節點是否為空,如果不為空,則會嘗試快速入隊,如果為空的話,則會先創建一個空的head節點,然后在將當前線程的節點加入到同步隊列中去
③加入到同步隊列之后,會再次判斷前一個節點是不是head節點,如果是的話,則會再次嘗試去獲取鎖,如果獲取失敗的話,則會掛起當前線程。
④直到當前線程被喚醒的時候,會判斷當前線程是否被中斷過,如果被中斷過,則會從同步隊列中刪除當前線程節點。並且中斷當前線程。
下面在看看lock.unlock()方法調用的同步器中的sync.release(1)方法的具體實現
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
首先會調用tryRelease(arg)方法來釋放鎖。然后喚醒后面的線程。
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
tryRelease()同樣也是子類需要實現的方法。ReentrantLock中的實現如下:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
release(1)的總結:
①首先會判斷當前線程是不是獨占的擁有鎖。如果是的話,則會去釋放鎖。如果當前線程都已經退出了獲取鎖(可重入的原因),則會設置當前線程的state為0,獨占線程為null
②在釋放鎖之后,會喚醒下一個等待中的線程。
AQS中條件隊列的實現方式參考ConditonObject實現分析點此參考