J.U.C 簡介
Lock

Lock接口
void lock() // 如果鎖可用就獲得鎖,如果鎖不可用就阻塞直到鎖釋放 void lockInterruptibly() // 和lock()方法相似, 但阻塞的線程 可 中 斷 , 拋 出java.lang.InterruptedException 異常 boolean tryLock() // 非阻塞獲取鎖;嘗試獲取鎖,如果成功返回 true boolean tryLock(long timeout, TimeUnit timeUnit) //帶有超時時間的獲取鎖方法 void unlock() // 釋放鎖
public class ReentrantDemo{ public synchronized void demo(){ System.out.println("begin:demo"); demo2(); } public void demo2(){ System.out.println("begin:demo1"); synchronized (this){ } } public static void main(String[] args) { ReentrantDemo rd=new ReentrantDemo(); new Thread(rd::demo).start(); } }
ReentrantLock 的使用案例
public class AtomicDemo { private static int count=0; static Lock lock=new ReentrantLock(); public static void inc(){ lock.lock(); try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } count++; lock.unlock(); } public static void main(String[] args) throws InterruptedException { for(int i=0;i<1000;i++){ new Thread(()->{AtomicDemo.inc();}).start();; } Thread.sleep(3000); System.out.println("result:"+count); } }
package com.lf.threaddemo; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class LockDemo { static Map<String, Object> cacheMap = new HashMap<>(); static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock read = rwl.readLock(); static Lock write = rwl.writeLock(); public static final Object get(String key) { System.out.println("開始讀取數據"); read.lock(); //讀鎖 try { return cacheMap.get(key); } finally { read.unlock(); } } public static final Object put(String key, Object value) { write.lock(); System.out.println("開始寫數據"); try { return cacheMap.put(key, value); } finally { write.unlock(); } } }
StampedLock
StampedLock 支持三種模式,分別是:寫鎖、悲觀讀鎖和樂觀讀。其中,寫鎖、悲觀讀鎖的語義和 ReadWriteLock 的寫鎖、讀鎖的語義非常類似,
允許多個線程同時獲取悲觀讀鎖,但是只允許一個線程獲取寫鎖,寫鎖和悲觀讀鎖是互斥的。不同的是:StampedLock 里的寫鎖和悲觀讀鎖加鎖成功之后,
都會返回一個 stamp;然后解鎖的時候,需要傳入這個 stamp。相關的示例代碼如下。
final StampedLock sl = new StampedLock(); // 獲取/釋放悲觀讀鎖示意代碼 long stamp = sl.readLock(); try { //省略業務相關代碼 } finally { sl.unlockRead(stamp); } // 獲取/釋放寫鎖示意代碼 long stamp = sl.writeLock(); try { //省略業務相關代碼 } finally { sl.unlockWrite(stamp); }
StampedLock 的性能之所以比 ReadWriteLock 還要好,其關鍵是 StampedLock 支持樂觀讀的方式。
ReadWriteLock 支持多個線程同時讀,但是當多個線程同時讀的時候,
所有的寫操作會被阻塞;而 StampedLock 提供的樂觀讀,是允許一個線程獲取寫鎖的,也就是說不是所有的寫操作都被阻塞。
注意這里,我們用的是“樂觀讀”這個詞,而不是“樂觀讀鎖”,是要提醒你,樂觀讀這個操作是無鎖的,所以相比較 ReadWriteLock 的讀鎖,樂觀讀的性能更好一些。
StampedLock 使用注意事項對於讀多寫少的場景 StampedLock 性能很好,簡單的應用場景基本上可以替代 ReadWriteLock,
但是 StampedLock 的功能僅僅是 ReadWriteLock 的子集,在使用的時候,還是有幾個地方需要注意一下。
StampedLock 在命名上並沒有增加 Reentrant,想必你已經猜測到 StampedLock 應該是不可重入的。
事實上,的確是這樣的,StampedLock 不支持重入。這個是在使用中必須要特別注意的。
另外,StampedLock 的悲觀讀鎖、寫鎖都不支持條件變量,這個也需要你注意。還有一點需要特別注意,
那就是:如果線程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上時,此時調用該阻塞線程的 interrupt() 方法,會導致 CPU 飆升。
例如下面的代碼中,線程 T1 獲取寫鎖之后將自己阻塞,線程 T2 嘗試獲取悲觀讀鎖,也會阻塞;
如果此時調用線程 T2 的 interrupt() 方法來中斷線程 T2 的話,你會發現線程 T2 所在 CPU 會飆升到 100%。
final StampedLock lock = new StampedLock(); Thread T1 = new Thread(()->{ // 獲取寫鎖 lock.writeLock(); // 永遠阻塞在此處,不釋放寫鎖 LockSupport.park(); }); T1.start(); // 保證T1獲取寫鎖 Thread.sleep(100); Thread T2 = new Thread(()-> //阻塞在悲觀讀鎖 lock.readLock() ); T2.start(); // 保證T2阻塞在讀鎖 Thread.sleep(100); //中斷線程T2 //會導致線程T2所在CPU飆升 T2.interrupt(); T2.join();
所以,使用 StampedLock 一定不要調用中斷操作,如果需要支持中斷功能,一定使用可中斷的悲觀讀鎖 readLockInterruptibly() 和寫鎖 writeLockInterruptibly()。這個規則一定要記清楚
StampedLock 的使用看上去有點復雜,但是如果你能理解樂觀鎖背后的原理,使用起來還是比較流暢的。建議你認真揣摩 Java 的官方示例,這個示例基本上就是一個最佳實踐。
我們把 Java 官方示例精簡后,形成下面的代碼模板,建議你在實際工作中盡量按照這個模板來使用 StampedLock。
StampedLock 讀模板:
final StampedLock sl = new StampedLock(); // 樂觀讀 long stamp = sl.tryOptimisticRead(); // 讀入方法局部變量 ...... // 校驗stamp if (!sl.validate(stamp)){ // 升級為悲觀讀鎖 stamp = sl.readLock(); try { // 讀入方法局部變量 ..... } finally { //釋放悲觀讀鎖 sl.unlockRead(stamp); } } //使用方法局部變量執行業務操作 ......
StampedLock 寫模板:
long stamp = sl.writeLock(); try { // 寫共享變量 ...... } finally { sl.unlockWrite(stamp); }
ReentrantLock 的實現原理





public void lock() { sync.lock(); }
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
CAS 的實現原理
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
在 unsafe.cpp 文件中,可以找到 compareAndSwarpInt 的實現 UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); oop p = JNIHandles::resolve(obj); //將 Java 對象解析成 JVM 的 oop(普通對象指針), jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); //根據對象 p和地址偏移量找到地址 return (jint)(Atomic::cmpxchg(x, addr, e)) == e; //基於 cas 比較並替換, x 表示需要更新的值,addr 表示 state 在內存中的地址,e 表示預期值 UNSAFE_END
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread();//獲取當前執行的線程 int c = getState();//獲得 state 的值 if (c == 0) {//表示無鎖狀態 if (compareAndSetState(0, acquires)) {//cas 替換 state 的值,cas 成功表示獲取鎖成功 setExclusiveOwnerThread(current);//保存當前獲得鎖的線程,下次再來的時候不要再嘗試競爭鎖 return true; } } else if (current == getExclusiveOwnerThread()) {//如果同一個線程來獲得鎖,直接增加重入次數 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;
}
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//把當前線程封裝為 Node Node pred = tail; //tail 是 AQS 中表示同比隊列隊尾的屬性,默認是 null if (pred != null) {//tail 不為空的情況下,說明隊列中存在節點 node.prev = pred;//把當前線程的 Node 的 prev 指向 tail if (compareAndSetTail(pred, node)) {//通過 cas 把 node加入到 AQS 隊列,也就是設置為 tail pred.next = node;//設置成功以后,把原 tail 節點的 next指向當前 node return node; } } enq(node);//tail=null,把 node 添加到同步隊列 return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }
}

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//獲取當前節點的 prev 節點 if (p == head && tryAcquire(arg)) {//如果是 head 節點,說明有資格去爭搶鎖 setHead(node);//獲取鎖成功,也就是ThreadA 已經釋放了鎖,然后設置 head 為 ThreadB 獲得執行權限 p.next = null; //把原 head 節點從鏈表中移除 failed = false; return interrupted; }//ThreadA 可能還沒釋放鎖,使得 ThreadB 在執行 tryAcquire 時會返回 false if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true; //並且返回當前線程在等待過程中有沒有中斷過。 } } finally { if (failed) cancelAcquire(node); }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//前置節點的waitStatus if (ws == Node.SIGNAL)//如果前置節點為 SIGNAL,意味着只需要等待其他前置節點的線程被釋放, return true;//返回 true,意味着可以直接放心的掛起了 if (ws > 0) {//ws 大於 0,意味着 prev 節點取消了排隊,直接移除這個節點就行 do { node.prev = pred = pred.prev; //相當於: pred=pred.prev; node.prev=pred; } while (pred.waitStatus > 0); //這里采用循環,從雙向列表中移除 CANCELLED 的節點 pred.next = node; } else {//利用 cas 設置 prev 節點的狀態為 SIGNAL(-1) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;
}
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
static void selfInterrupt() { Thread.currentThread().interrupt(); }


鎖的釋放流程
public final boolean release(int arg) { if (tryRelease(arg)) { //釋放鎖成功 Node h = head; //得到 aqs 中 head 節點 if (h != null && h.waitStatus != 0)//如果 head 節點不為空並且狀態!=0.調用 unparkSuccessor(h)喚醒后續節點 unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
private void unparkSuccessor(Node node) { int ws = node.waitStatus;//獲得 head 節點的狀態 if (ws < 0) compareAndSetWaitStatus(node, ws, 0);// 設置 head 節點狀態為 0 Node s = node.next;//得到 head 節點的下一個節點 if (s == null || s.waitStatus > 0) { //如果下一個節點為 null 或者 status>0 表示 cancelled 狀態. //通過從尾部節點開始掃描,找到距離 head 最近的一個waitStatus<=0 的節點 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //next 節點不為空,直接喚醒這個線程即可 LockSupport.unpark(s.thread); }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }
}

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }
}

公平鎖和非公平鎖的區別
final void lock() { acquire(1); }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;
}
Condition
public class ConditionDemoWait implements Runnable{ private Lock lock; private Condition condition; public ConditionDemoWait(Lock lock, Condition condition){ this.lock=lock; this.condition=condition; } @Override public void run() { System.out.println("begin -ConditionDemoWait"); try { lock.lock(); condition.await(); System.out.println("end - ConditionDemoWait"); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } }
}
public class ConditionDemoSignal implements Runnable{ private Lock lock; private Condition condition; public ConditionDemoSignal(Lock lock, Condition condition){ this.lock=lock; this.condition=condition; } @Override public void run() { System.out.println("begin -ConditionDemoSignal"); try { lock.lock(); condition.signal(); System.out.println("end - ConditionDemoSignal"); }finally { lock.unlock(); } }
}
Condition 源碼分析
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //創建一個新的節點,節點狀態為 condition,采用的數據結構仍然是鏈表 int savedState = fullyRelease(node); //釋放當前的鎖,得到鎖的狀態,並喚醒 AQS 隊列中的一個線程 int interruptMode = 0; //如果當前節點沒有在同步隊列上,即還沒有被 signal,則將當前線程阻塞 while (!isOnSyncQueue(node)) {//判斷這個節點是否在 AQS 隊列上,第一次判斷的是 false,因為前面已經釋放鎖了 LockSupport.park(this); // 第一次總是 park 自己,開始阻塞等待 // 線程判斷自己在等待過程中是否被中斷了,如果沒有中斷,則再次循環,會在 isOnSyncQueue 中判斷自己是否在隊列上. // isOnSyncQueue 判斷當前 node 狀態,如果是 CONDITION 狀態,或者不在隊列上了,就繼續阻塞. // isOnSyncQueue 判斷當前 node 還在隊列上且不是 CONDITION 狀態了,就結束循環和阻塞. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 當這個線程醒來,會嘗試拿鎖, 當 acquireQueued 返回 false 就是拿到鎖了. // interruptMode != THROW_IE -> 表示這個線程沒有成功將 node 入隊,但 signal 執行了 enq 方法讓其入隊了. // 將這個變量設置成 REINTERRUPT. if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 如果 node 的下一個等待者不是 null, 則進行清理,清理 Condition 隊列上的節點. // 如果是 null ,就沒有什么好清理的了. if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); // 如果線程被中斷了,需要拋出異常.或者什么都不做 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
public final void signal() { if (!isHeldExclusively()) //先判斷當前線程是否獲得了鎖 throw new IllegalMonitorStateException(); Node first = firstWaiter; // 拿到 Condition 隊列上第一個節點 if (first != null) doSignal(first); }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null)// 如果第一個節點的下一個節點是 null, 那么, 最后一個節點也是 null. lastWaiter = null; // 將 next 節點設置成 null first.nextWaiter = null; } while (!transferForSignal(first) &&(first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; // 如果上一個節點的狀態被取消了, 或者嘗試設置上一個節點的狀態為 SIGNAL 失敗了(SIGNAL 表示: 他的 next 節點需要停止阻塞), if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); // 喚醒輸入節點上的線程. return true;
}