1.什么是可重鎖ReentrantLock?
就是支持重新進入的鎖,表示該鎖能夠支持一個線程對資源的重復加鎖。底層實現原理主要是利用通過繼承AQS來實現的,也是利用通過對volatile state的CAS操作+CLH隊列來實現;
支持公平鎖和非公平鎖。
CAS:Compare and Swap 比較並交換。CAS的思想很簡單:3個參數,一個當前內存值V、預期值A,即將更新的值B,當前僅當預期值A和內存值V相等的時候,將內存值V修改為B,否則什么都不做。該操作是一個原子操作被廣泛的用於java的底層實現中,在java中,CAS主要是有sun.misc.Unsafe這個類通過JNI調用CPU底層指令實現;更多底層的思想參考狼哥的文章cas的底層原理:https://www.jianshu.com/p/fb6e91b013cc
CLH隊列:也叫同步隊列,是帶頭結點的雙向非循環列表,是AQS的主要實現原理(結構如下圖所示)
2.ReentrantLock分為公平鎖和非公平鎖:區別是在於獲取鎖的機制上是否公平。
(1)公平鎖:公平的獲取鎖,也就是等待時間最長的線程最優獲取到鎖,ReentraantLock是基於同步隊列AQS來管理獲取鎖的線程。
在公平的機制下,線程依次排隊獲取鎖,先進入隊列排隊的線程,等到時間越長的線程最優獲取到鎖。
(2)非公平鎖:而在“非公平”的機制下,在鎖是可獲取狀態時,不管自己是否在對頭都會獲取鎖。
(3) 公平鎖和非公平鎖的對比:1、公平鎖用來解決“線程飢餓”的問題,即先進入CLH同步隊列等待的線程,即同步隊列中的頭結點總是先獲取到鎖。而非公平鎖 會出現 一個線程連續多次獲取鎖的情況,使得其他線程只能在同步隊列中等待。
2、經過測試,10個線程,每一個線程獲取100 000次鎖,通過vmstat統計運行時系統線程上下文切換的次數;公平鎖總耗時 為:5754ms,而費公平鎖總耗時為61ms。總耗時:公平鎖/費公平鎖=94.3倍,總切換次數 :公平鎖/費公平鎖=133倍。非 公平鎖的線程切換更少,保證了更大的吞吐量。
(4)可重入鎖的結構如下:
3.非公平鎖獲取的實現源碼如下:按照代碼的執行順序
(1)調用lock方法
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 1.Performs lock. Try immediate barge, backing up to normal * acquire on failure.調用lock方法 */ final void lock() {
//判斷當前state是否為0,即沒有被任何線程獲取的狀態,如果是,CAS更新為1,當前線程獲取到了非公平鎖 if (compareAndSetState(0, 1))
//設置當前鎖的持有者線程 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } }
(2)如果狀態不是0,CAS更新狀態放回false走acquire(1)方法:
在講aqs的時候說過這個方法,這里不做重復:
/**2 獲取非公平鎖 * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
(3).調用靜態內部類NonfairSync重寫AQS的tryAcquire(1)方法:
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; //3.調用靜態內部類NonfairSync重寫AQS的tryAcquire方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
(4) 走nonfairTryAcquire(1)非公平鎖的實現方法:重點分析下這個方法
/** * Performs non-fair tryLock. tryAcquire is * implemented in subclasses, but both need nonfair * try for trylock method. */ final boolean nonfairTryAcquire(int acquires) {
//獲取當前線程 final Thread current = Thread.currentThread();
//獲取鎖的狀態 int c = getState();
//c == 0 說明鎖沒有被任何線程所擁有,則CAS設置鎖的狀態為acquires if (c == 0) { if (compareAndSetState(0, acquires)) {
//設置當前線程為鎖的擁有者 setExclusiveOwnerThread(current); return true; } }
//如果鎖的持有者已經是當期線程,更新鎖的狀態,這個地方就是為什么可重入的原因,如果獲取鎖的線程再次請求,則將同步狀態的值增加,並返回true
//表明獲取同步狀態值成功 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
4.獲取公平鎖的過程
(1)公平鎖源碼獲取源碼如下:
/** * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; //直接調用acquire(1)方法 final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread();
//獲取獨占鎖的狀態 int c = getState(); if (c == 0) {
//c==0表示鎖沒有被任何線程鎖擁有,首先判斷當前線程是否為CLH同步隊列的第一個線程;是的話,獲取該鎖,設置鎖的狀態 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
//如果獨占鎖已經是為當前線程鎖擁有 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
從源碼可以看出,和nonfairTryAcquire(int acquires)比較唯一不同的是 ,判斷條件多了hasQueuePredecessors()方法,加入了當前節點是否有前驅節點的判斷。如果返回true,表示有線程比當前線程等待的時間長,需要等待前驅線程獲取並釋放鎖之后才能獲取鎖。返回false,表示當前的線程所在的節點為隊列(CLH隊列)的對頭,可以直接獲取鎖。
/** * Queries whether any threads have been waiting to acquire longer * than the current thread. * * <p>An invocation of this method is equivalent to (but may be * more efficient than): * <pre> {@code * getFirstQueuedThread() != Thread.currentThread() && * hasQueuedThreads()}</pre> * * <p>Note that because cancellations due to interrupts and * timeouts may occur at any time, a {@code true} return does not * guarantee that some other thread will acquire before the current * thread. Likewise, it is possible for another thread to win a * race to enqueue after this method has returned {@code false}, * due to the queue being empty. * * <p>This method is designed to be used by a fair synchronizer to * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>. * Such a synchronizer's {@link #tryAcquire} method should return * {@code false}, and its {@link #tryAcquireShared} method should * return a negative value, if this method returns {@code true} * (unless this is a reentrant acquire). For example, the {@code * tryAcquire} method for a fair, reentrant, exclusive mode * synchronizer might look like this: * * <pre> {@code * protected boolean tryAcquire(int arg) { * if (isHeldExclusively()) { * // A reentrant acquire; increment hold count * return true; * } else if (hasQueuedPredecessors()) { * return false; * } else { * // try to acquire normally * } * }}</pre> * * @return {@code true} if there is a queued thread preceding the * current thread, and {@code false} if the current thread * is at the head of the queue or the queue is empty * @since 1.7 */ public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
4.釋放鎖:
(1)首先調用ReetrantLock重寫父類AQS的unlock方法:
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link } is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); }
(2)調用AQS里面實現的釋放互斥鎖的方法:首先進入tryRelease()方法來嘗試釋放當前線程持有的鎖,如果成功的話,調用unparkSuccessor喚醒后繼線程。
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
(3)進入tryRelease方法我們重點分析:重寫父類AQS里面的模板方法,進行鎖的釋放:
protected final boolean tryRelease(int releases) {
//c是本次釋放鎖之后的狀態 int c = getState() - releases;
// 如果當前線程不是鎖的持有者線程,則拋出異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false;
//c==0表示鎖被當前線程已經徹底釋放,則將占有同步狀態的線程設置為Null,即鎖變為可獲取的狀態,這個時候才能返回true,否則返回false if (c == 0) { free = true; setExclusiveOwnerThread(null); }
//否則設置同步狀態 setState(c); return free; }
(4)釋放鎖成功后,即鎖變為可獲取的狀態后,調用unparkSuccessor喚醒后繼線程,進入unparkSuccessor的源碼:
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor.
獲取當前節點的有效的后繼節點,這的有效是指后繼節點s不為null並且waitStatus是<=0的,
既沒有被取消的狀態。無效的話,通過for循環遍歷,一直找到一個有效的節點
*/ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; }
//喚醒有效后繼節點對應的線程 if (s != null) LockSupport.unpark(s.thread); }
常用的方法:

1.創建一個 ReentrantLock ,默認是“非公平鎖”。 ReentrantLock() 2. 創建策略是fair的 ReentrantLock。fair為true表示是公平鎖,fair為false表示是非公平鎖。 ReentrantLock(boolean fair) 3. 查詢當前線程保持此鎖的次數。 int getHoldCount() 4. 返回目前擁有此鎖的線程,如果此鎖不被任何線程擁有,則返回 null。 protected Thread getOwner() 5.返回目前擁有此鎖的線程,如果此鎖不被任何線程擁有,則返回 null。 protected Thread getOwner() 6.返回一個 collection,它包含可能正等待獲取此鎖的線程。 protected Collection<Thread> getQueuedThreads() 7.返回正等待獲取此鎖的線程估計數。 int getQueueLength() 8.返回一個 collection,它包含可能正在等待與此鎖相關給定條件的那些線程。 protected Collection<Thread> getWaitingThreads(Condition condition) 9.返回等待與此鎖相關的給定條件的線程估計數。 int getWaitQueueLength(Condition condition) 10.查詢給定線程是否正在等待獲取此鎖。 boolean hasQueuedThread(Thread thread) 11.查詢是否有些線程正在等待獲取此鎖。 boolean hasQueuedThreads() 12.查詢是否有些線程正在等待與此鎖有關的給定條件。 boolean hasWaiters(Condition condition) 13.如果是“公平鎖”返回true,否則返回false。 boolean isFair() 14.查詢當前線程是否保持此鎖。 boolean isHeldByCurrentThread() 15.查詢此鎖是否由任意線程保持。 boolean isLocked() 16.獲取鎖。 void lock() 17.如果當前線程未被中斷,則獲取鎖。 void lockInterruptibly() 18.返回用來與此 Lock 實例一起使用的 Condition 實例。 Condition newCondition() 19.僅在調用時鎖未被另一個線程保持的情況下,才獲取該鎖。 boolean tryLock() 20.如果鎖在給定等待時間內沒有被另一個線程保持,且當前線程未被中斷,則獲取該鎖。 boolean tryLock(long timeout, TimeUnit unit) 21.試圖釋放此鎖。 void unlock()
應用多個線程的計數器:結果為 20000。
package concurrentMy.Volatiles; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * * (類型功能說明描述) * * <p> * 修改歷史: <br> * 修改日期 修改人員 版本 修改內容<br> * -------------------------------------------------<br> * 2016年4月8日 下午6:07:36 user 1.0 初始化創建<br> * </p> * * @author Peng.Li * @version 1.0 * @since JDK1.7 */ public class VolatiteLock implements Runnable{ // 不能保證原子性,如果不加lock的話 private volatile int inc = 0; Lock lock = new ReentrantLock(); /** * * 理解:高速緩存 - 主存 * 通過ReentrantLock保證原子性:讀主存,在高速緩存中計算得到+1后的值,寫回主存 * (方法說明描述) * */ public void increase() { lock.lock(); try { inc++; } catch (Exception e) { e.printStackTrace(); }finally{ lock.unlock(); } } public void run() { for (int i = 0; i < 10000; i++) { increase(); } } public static void main(String[] args) throws InterruptedException { VolatiteLock v = new VolatiteLock(); // 線程1 Thread t1 = new Thread(v); // 線程2 Thread t2 = new Thread(v); t1.start(); t2.start(); // for(int i=0;i<100;i++){ // System.out.println(i); // } System.out.println(Thread.activeCount() + Thread.currentThread().getId() + Thread.currentThread().getName()); while (Thread.activeCount() > 1) // 保證前面的線程都執行完 Thread.yield(); //20000 System.out.println(v.inc); } }