原文出處:http://www.yund.tech/zdetail.html?type=1&id=ef94715a2838f06ab03b8621c23d1613
作者:jstarseven
ReentrantLock主要利用CAS+CLH隊列來實現。它支持公平鎖和非公平鎖,兩者的實現類似。
-
CAS:Compare and Swap,比較並交換。CAS有3個操作數:內存值V、預期值A、要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改為B,否則什么都不做。該操作是一個原子操作,被廣泛的應用在Java的底層實現中。在Java中,CAS主要是由sun.misc.Unsafe這個類通過JNI調用CPU底層指令實現。
-
CLH隊列:帶頭結點的雙向非循環鏈表(如下圖所示):
ReentrantLock的基本實現可以概括為:先通過CAS嘗試獲取鎖。如果此時已經有線程占據了鎖,那就加入CLH隊列並且被掛起。當鎖被釋放之后,排在CLH隊列隊首的線程會被喚醒,然后CAS再次嘗試獲取鎖。在這個時候,如果:
1.非公平鎖:如果同時還有另一個線程進來嘗試獲取,那么有可能會讓這個線程搶先獲取;
2. 公平鎖:如果同時還有另一個線程進來嘗試獲取,當它發現自己不是在隊首的話,就會排到隊尾,由隊首的線程獲取到鎖。
ReentrantLock是java concurrent包提供的一種鎖實現。不同於synchronized,ReentrantLock是從代碼層面實現同步的。
圖1 reentrantLock的類層次結構圖
Lock定義了鎖的接口規范。
ReentrantLock實現了Lock接口。
AbstractQueuedSynchronizer中以隊列的形式實現線程之間的同步。
ReentrantLock的方法都依賴於AbstractQueuedSynchronizer的實現。
Lock接口定義了如下方法:
圖2 lock接口規范
1、lock()方法的實現
進入lock()方法,發現其內部調用的是sync.lock();
public void lock() { sync.lock(); }
sync是在ReentrantLock的構造函數中實現的。其中fair參數的不同可實現公平鎖和非公平鎖。由於在鎖釋放的階段,鎖處於無線程占有的狀態,此時其他線程和在隊列中等待的線程都可以搶占該鎖,從而出現公平鎖和非公平鎖的區別。
非公平鎖:當鎖處於無線程占有的狀態,此時其他線程和在隊列中等待的線程都可以搶占該鎖。
公平鎖:當鎖處於無線程占有的狀態,在其他線程搶占該鎖的時候,都需要先進入隊列中等待。
本文以非公平鎖NonfairSync的sync實例進行分析。
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = (fair)? new FairSync() : new NonfairSync(); }
由圖1可知,NonfairSync繼承自Sync,因此也繼承了AbstractQueuedSynchronizer中的所有方法實現。接着進入NonfairSync的lock()方法。
final void lock() { // 利用cas置狀態位,如果成功,則表示占有鎖成功 if (compareAndSetState(0, 1)) // 記錄當前線程為鎖擁有者 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
在lock方法中,利用cas實現ReentrantLock的狀態置位(cas即compare and swap,它是CPU的指令,因此賦值操作都是原子性的)。如果成功,則表示占有鎖成功,並記錄當前線程為鎖擁有者。當占有鎖失敗,則調用acquire(1)方法繼續處理。
public final void acquire(int arg) { //嘗試獲得鎖,如果失敗,則加入到隊列中進行等待 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire()是AbstractQueuedSynchronizer的方法。它首先會調用tryAcquire()去嘗試獲得鎖,如果獲得鎖失敗,則將當前線程加入到CLH隊列中進行等待。tryAcquire()方法在NonfairSync中有實現,但最終調用的還是Sync中的nonfairTryAcquire()方法。
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 獲得狀態 int c = getState(); // 如果狀態為0,則表示該鎖未被其他線程占有 if (c == 0) { // 此時要再次利用cas去嘗試占有鎖 if (compareAndSetState(0, acquires)) { // 標記當前線程為鎖擁有者 setExclusiveOwnerThread(current); return true; } } // 如果當前線程已經占有了,則state + 1,記錄占有次數 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); // 此時無需利用cas去賦值,因為該鎖肯定被當前線程占有 setState(nextc); return true; } return false; }
在nonfairTryAcquire()中,首先會去獲得鎖的狀態,如果為0,則表示鎖未被其他線程占有,此時會利用cas去嘗試將鎖的狀態置位,並標記當前線程為鎖擁有者;如果鎖的狀態大於0,則會判斷鎖是否被當前線程占有,如果是,則state + 1,這也是為什么lock()的次數要和unlock()次數對等;如果占有鎖失敗,則返回false。
在nonfairTryAcquire()返回false的情況下,會繼續調用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,將當前線程加入到隊列中繼續嘗試獲得鎖。
private Node addWaiter(Node mode) { // 創建當前線程的節點 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 如果尾節點不為空 if (pred != null) { // 則將當前線程的節點加入到尾節點之后,成為新的尾節點 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { // CAS方法有可能失敗,因此要循環調用,直到當前線程的節點加入到隊列中 for (;;) { Node t = tail; if (t == null) { // Must initialize Node h = new Node(); // Dummy header,頭節點為虛擬節點 h.next = node; node.prev = h; if (compareAndSetHead(h)) { tail = node; return h; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter()是AbstactQueuedSynchronizer的方法,會以節點的形式來標記當前線程,並加入到尾節點中。enq()方法是在節點加入到尾節點失敗的情況下,通過for(;;)循環反復調用cas方法,直到節點加入成功。由於enq()方法是非線程安全的,所以在增加節點的時候,需要使用cas設置head節點和tail節點。此時添加成功的結點狀態為Node.EXCLUSIVE。
在節點加入到隊列成功之后,會接着調用acquireQueued()方法去嘗試獲得鎖。
final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { // 獲得前一個節點 final Node p = node.predecessor(); // 如果前一個節點是頭結點,那么直接去嘗試獲得鎖 // 因為其他線程有可能隨時會釋放鎖,沒必要Park等待 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }
在acquireQueued()方法中,會利用for (;;)一直去獲得鎖,如果前一個節點為head節點,則表示可以直接嘗試去獲得鎖了,因為占用鎖的線程隨時都有可能去釋放鎖並且該線程是被unpark喚醒的CLH隊列中的第一個節點,獲得鎖成功后返回。
如果該線程的節點在CLH隊列中比較靠后或者獲得鎖失敗,即其他線程依然占用着鎖,則會接着調用shouldParkAfterFailedAcquire()方法來阻塞當前線程,以讓出CPU資源。在阻塞線程之前,會執行一些額外的操作以提高CLH隊列的性能。由於隊列中前面的節點有可能在等待過程中被取消掉了,因此當前線程的節點需要提前,並將前一個節點置狀態位為SIGNAL,表示可以阻塞當前節點。因此該函數在判斷到前一個節點為SIGNAL時,直接返回true即可。此處雖然存在對CLH隊列的同步操作,但由於局部變量節點肯定是不一樣的,所以對CLH隊列操作是線程安全的。由於在compareAndSetWaitStatus(pred, ws, Node.SIGNAL)執行之前可能發生pred節點搶占鎖成功或pred節點被取消掉,因此此處需要返回false以允許該節點可以搶占鎖。
當shouldParkAfterFailedAcquire()返回true時,會進入parkAndCheckInterrupt()方法。parkAndCheckInterrupt()方法最終調用safe.park()阻塞該線程,以免該線程在等待過程中無線循環消耗cpu資源。至此,當前線程便被park了。那么線程何時被unpark,這將在unlock()方法中進行。
這里有一個小細節需要注意,在線程被喚醒之后,會調用Thread.interrupted()將線程中斷狀態置位為false,然后記錄下中斷狀態並返回上層函數去拋出異常。我想這樣設計的目的是為了可以讓該線程可以完成搶占鎖的操作,從而可以使當前節點稱為CLH的虛擬頭節點。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park */ return true; if (ws > 0) { // 如果前面的節點是CANCELLED狀態,則一直提前 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); }
2、unlock()方法的實現
同lock()方法,unlock()方法依然調用的是sync.release(1)。
public final boolean release(int arg) { // 釋放鎖 if (tryRelease(arg)) { Node h = head; // 此處有個疑問,為什么需要判斷h.waitStatus != 0 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
可以看到,tryRelease()方法實現了鎖的釋放,邏輯上即是將鎖的狀態置為0。當釋放鎖成功之后,通常情況下不需要喚醒隊列中線程,因此隊列中總是有一個線程處於活躍狀態。
總結:
ReentrantLock的鎖資源以state狀態描述,利用CAS則實現對鎖資源的搶占,並通過一個CLH隊列阻塞所有競爭線程,在后續則逐個喚醒等待中的競爭線程。ReentrantLock繼承AQS完全從代碼層面實現了java的同步機制,相對於synchronized,更容易實現對各類鎖的擴展。同時,AbstractQueuedSynchronizer中的Condition配合ReentrantLock使用,實現了wait/notify的功能。
-END-