StampedLock是JUC並發包里面JDK1.8版本新增的一個鎖,該鎖提供了三種模式的讀寫控制,當調用獲取鎖的系列函數的時候,會返回一個long 型的變量,該變量被稱為戳記(stamp),這個戳記代表了鎖的狀態。
try系列獲取鎖的函數,當獲取鎖失敗后會返回為0的stamp值。當調用釋放鎖和轉換鎖的方法時候需要傳入獲取鎖時候返回的stamp值。
StampedLockd的內部實現是基於CLH鎖的,CLH鎖原理:鎖維護着一個等待線程隊列,所有申請鎖且失敗的線程都記錄在隊列。一個節點代表一個線程,保存着一個標記位locked,用以判斷當前線程是否已經釋放鎖。當一個線程試圖獲取鎖時,從隊列尾節點作為前序節點,循環判斷所有的前序節點是否已經成功釋放鎖。
如下圖所示:
我們首先看Stampedlock有哪些屬性先,源碼如下:
private static final long serialVersionUID = -6001602636862214147L; /** 獲取服務器CPU核數 */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** 線程入隊列前自旋次數 */ private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0; /** 隊列頭結點自旋獲取鎖最大失敗次數后再次進入隊列 */ private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0; /** 重新阻塞前的最大重試次數 */ private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0; /** The period for yielding when waiting for overflow spinlock */ private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1 /** 溢出之前用於閱讀器計數的位數 */ private static final int LG_READERS = 7; // 鎖定狀態和stamp操作的值 private static final long RUNIT = 1L; private static final long WBIT = 1L << LG_READERS; private static final long RBITS = WBIT - 1L; private static final long RFULL = RBITS - 1L; private static final long ABITS = RBITS | WBIT; //前8位都為1 private static final long SBITS = ~RBITS; // 1 1000 0000 //鎖state初始值,第9位為1,避免算術時和0沖突 private static final long ORIGIN = WBIT << 1; // 來自取消獲取方法的特殊值,因此調用者可以拋出IE private static final long INTERRUPTED = 1L; // WNode節點的status值 private static final int WAITING = -1; private static final int CANCELLED = 1; // WNode節點的讀寫模式 private static final int RMODE = 0; private static final int WMODE = 1; /** Wait nodes */ static final class WNode { volatile WNode prev; volatile WNode next; volatile WNode cowait; // 讀模式使用該節點形成棧 volatile Thread thread; // non-null while possibly parked volatile int status; // 0, WAITING, or CANCELLED final int mode; // RMODE or WMODE WNode(int m, WNode p) { mode = m; prev = p; } } /** CLH隊頭節點 */ private transient volatile WNode whead; /** CLH隊尾節點 */ private transient volatile WNode wtail; // views transient ReadLockView readLockView; transient WriteLockView writeLockView; transient ReadWriteLockView readWriteLockView; /** 鎖隊列狀態, 當處於寫模式時第8位為1,讀模式時前7為為1-126(附加的readerOverflow用於當讀者超過126時) */ private transient volatile long state; /** 將state超過 RFULL=126的值放到readerOverflow字段中 */ private transient int readerOverflow;
StampedLockd源碼中的WNote就是等待鏈表隊列,每一個WNode標識一個等待線程,whead為CLH隊列頭,wtail為CLH隊列尾,state為鎖的狀態。long型即64位,倒數第八位標識寫鎖狀態,如果為1,標識寫鎖占用!下面圍繞這個state來講述鎖操作。
首先是常量標識:
WBIT=1000 0000(即-128)
RBIT =0111 1111(即127)
SBIT =1000 0000(后7位表示當前正在讀取的線程數量,清0)
StampedLock 給我們提供了3種讀寫模式的鎖,如下:
1.寫鎖writeLock是一個獨占鎖,同時只有一個線程可以獲取該鎖,當一個線程獲取該鎖后,其他請求讀鎖和寫鎖的線程必須等待,這跟ReentrantReadWriteLock 的寫鎖很相似,不過要注意的是StampedLock的寫鎖是不可重入鎖,
當目前沒有線程持有讀鎖或者寫鎖的時候才可以獲取到該鎖,請求該鎖成功后會返回一個stamp 票據變量來表示該鎖的版本,如下源碼所示:
/** * *獲取寫鎖,獲取失敗會一直阻塞,直到獲得鎖成功 * @return 可以用來解鎖或轉換模式的戳記(128的整數) */ public long writeLock() { long s, next; return ((((s = state) & ABITS) == 0L && // 完全沒有任何鎖(沒有讀鎖和寫鎖)的時候可以通過 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? //第8位置為1 next : acquireWrite(false, 0L)); }
writeLock():典型的cas操作,如果STATE等於s,設置寫鎖位為1(s+WBIT)。acquireWrite跟acquireRead邏輯類似,先自旋嘗試、加入等待隊列、直至最終Unsafe.park()掛起線程。
private long acquireWrite(boolean interruptible, long deadline) { WNode node = null, p; for (int spins = -1;;) { // 入隊時自旋 long m, s, ns; //無鎖 if ((m = (s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) return ns; } else if (spins < 0) //持有寫鎖,並且隊列為空 spins = (m == WBIT && wtail == whead) ? SPINS : 0; else if (spins > 0) { //恆成立 if (LockSupport.nextSecondarySeed() >= 0) --spins; } else if ((p = wtail) == null) { //初始化隊列,寫鎖入隊列 WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } else if (node == null) //不為空,寫鎖入隊列 node = new WNode(WMODE, p); else if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; break;//入隊列成功退出循環 } } for (int spins = -1;;) { WNode h, np, pp; int ps; //前驅節點為頭節點 if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { // spin at head long s, ns; //無鎖 if (((s = state) & ABITS) == 0L) { if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT)) { //當前節點設置為頭結點 whead = node; node.prev = null; return ns; } } else if (LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { // help release stale waiters WNode c; Thread w; //頭結點為讀鎖將棧中所有讀鎖線程喚醒 while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } // if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) //前驅節點置為等待狀態 U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; // 0 argument to park means no timeout if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p) U.park(false, time); // emulate LockSupport.park node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } }
並且StampedLock還提供了非阻塞tryWriteLock方法,源碼如下:
/** * 沒有任何鎖時則獲取寫鎖,否則返回0 * * @return 可以用來解鎖或轉換模式的戳記(128的整數),獲取失敗返回0 */ public long tryWriteLock() { long s, next; return ((((s = state) & ABITS) == 0L && U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ? next : 0L); }
/** * unit時間內獲得寫鎖成功返回狀態值,失敗返回0,或拋出InterruptedException * @return 0:獲得鎖失敗 * @throws InterruptedException 線程獲得鎖之前調用interrupt()方法拋出的異常 */ public long tryWriteLock(long time, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(time); if (!Thread.interrupted()) { long next, deadline; if ((next = tryWriteLock()) != 0L) //獲得鎖成功 return next; if (nanos <= 0L) //超時返回0 return 0L; if ((deadline = System.nanoTime() + nanos) == 0L) deadline = 1L; if ((next = acquireWrite(true, deadline)) != INTERRUPTED) //規定時間內獲得鎖結果 return next; } throw new InterruptedException(); }
當釋放該鎖的時候需要調用unlockWrite方法並傳遞獲取鎖的時候的stamp參數。源碼如下:
/** * state匹配stamp則釋放寫鎖, * @throws IllegalMonitorStateException 不匹配則拋出異常 */ public void unlockWrite(long stamp) { WNode h; //state不匹配stamp 或者 沒有寫鎖 if (state != stamp || (stamp & WBIT) == 0L) throw new IllegalMonitorStateException(); //state += WBIT, 第8位置為0,但state & SBITS 會循環,一共有4個值 state = (stamp += WBIT) == 0L ? ORIGIN : stamp; if ((h = whead) != null && h.status != 0) //喚醒繼承者節點線程 release(h); }
unlockWrite():釋放鎖與加鎖動作相反。將寫標記位清零,如果state溢出,則退回到初始值;
2.悲觀鎖readLock,是個共享鎖,在沒有線程獲取獨占寫鎖的情況下,同時多個線程可以獲取該鎖;如果已經有線程持有寫鎖,其他線程請求獲取該鎖會被阻塞,這類似ReentrantReadWriteLock 的讀鎖(不同在於這里的讀鎖是不可重入鎖)。
這里說的悲觀是指在具體操作數據前,悲觀的認為其他線程可能要對自己操作的數據進行修改,所以需要先對數據加鎖,這是在讀少寫多的情況下的一種考慮,請求該鎖成功后會返回一個stamp票據變量來表示該鎖的版本,源碼如下:
/** * 悲觀讀鎖,非獨占鎖,為獲得鎖一直處於阻塞狀態,直到獲得鎖為止 */ public long readLock() { long s = state, next; // 隊列為空 && 沒有寫鎖同時讀鎖數小於126 && CAS修改狀態成功 則狀態加1並返回,否則自旋獲取讀鎖 return ((whead == wtail && (s & ABITS) < RFULL && U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ? next : acquireRead(false, 0L)); }
樂觀鎖失敗后鎖升級為readLock():嘗試state+1,用於統計讀線程的數量,如果失敗,進入acquireRead()進行自旋,通過CAS獲取鎖。
如果自旋失敗,入CLH隊列,然后再自旋,如果成功獲得讀鎖,則激活cowait隊列中的讀線程Unsafe.unpark(),如果最終依然失敗,則Unsafe().park()掛起當前線程。
/** * @param interruptible 是否允許中斷 * @param 標識超時限時(0代表不限時),然后進入循環。 * @return next state, or INTERRUPTED */ private long acquireRead(boolean interruptible, long deadline) { WNode node = null, p; //自旋 for (int spins = -1;;) { WNode h; //判斷隊列為空 if ((h = whead) == (p = wtail)) { //定義 long m,s,ns,並循環 for (long m, s, ns;;) { //將state超過 RFULL=126的值放到readerOverflow字段中 if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) //獲取鎖成功返回 return ns; //state高8位大於0,那么說明當前鎖已經被寫鎖獨占,那么我們嘗試自旋 + 隨機的方式來探測狀態 else if (m >= WBIT) { if (spins > 0) { if (LockSupport.nextSecondarySeed() >= 0) --spins; } else { if (spins == 0) { WNode nh = whead, np = wtail; //一直獲取鎖失敗,或者有線程入隊列了退出內循環自旋,后續進入隊列 if ((nh == h && np == p) || (h = nh) != (p = np)) break; } //自旋 SPINS 次 spins = SPINS; } } } } if (p == null) { //初始隊列 WNode hd = new WNode(WMODE, null); if (U.compareAndSwapObject(this, WHEAD, null, hd)) wtail = hd; } //當前節點為空則構建當前節點,模式為RMODE,前驅節點為p即尾節點。 else if (node == null) node = new WNode(RMODE, p); //當前隊列為空即只有一個節點(whead=wtail)或者當前尾節點的模式不是RMODE,那么我們會嘗試在尾節點后面添加該節點作為尾節點,然后跳出外層循環 else if (h == p || p.mode != RMODE) { if (node.prev != p) node.prev = p; else if (U.compareAndSwapObject(this, WTAIL, p, node)) { p.next = node; //入隊列成功,退出自旋 break; } } //隊列不為空並且是RMODE模式, 添加該節點到尾節點的cowait鏈(實際上構成一個讀線程stack)中 else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) //失敗處理 node.cowait = null; else { //通過CAS方法將該節點node添加至尾節點的cowait鏈中,node成為cowait中的頂元素,cowait構成了一個LIFO隊列。 //循環 for (;;) { WNode pp, c; Thread w; //嘗試unpark頭元素(whead)的cowait中的第一個元素,假如是讀鎖會通過循環釋放cowait鏈 if ((h = whead) != null && (c = h.cowait) != null && U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); //node所在的根節點p的前驅就是whead或者p已經是whead或者p的前驅為null if (h == (pp = p.prev) || h == p || pp == null) { long m, s, ns; do { //根據state再次積極的嘗試獲取鎖 if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) return ns; } while (m < WBIT);//條件為讀模式 } if (whead == h && p.prev == pp) { long time; if (pp == null || h == p || p.status > 0) { //這樣做的原因是被其他線程闖入奪取了鎖,或者p已經被取消 node = null; // throw away break; } if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, p, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); //出現的中斷情況下取消當前節點的cancelWaiter操作 if (interruptible && Thread.interrupted()) return cancelWaiter(node, p, true); } } } } for (int spins = -1;;) { WNode h, np, pp; int ps; if ((h = whead) == p) { if (spins < 0) spins = HEAD_SPINS; else if (spins < MAX_HEAD_SPINS) spins <<= 1; for (int k = spins;;) { // spin at head long m, s, ns; if ((m = (s = state) & ABITS) < RFULL ? U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) : (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) { WNode c; Thread w; whead = node; node.prev = null; while ((c = node.cowait) != null) { if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } return ns; } else if (m >= WBIT && LockSupport.nextSecondarySeed() >= 0 && --k <= 0) break; } } else if (h != null) { WNode c; Thread w; while ((c = h.cowait) != null) { if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null) U.unpark(w); } } if (whead == h) { if ((np = node.prev) != p) { if (np != null) (p = np).next = node; // stale } else if ((ps = p.status) == 0) U.compareAndSwapInt(p, WSTATUS, 0, WAITING); else if (ps == CANCELLED) { if ((pp = p.prev) != null) { node.prev = pp; pp.next = node; } } else { long time; if (deadline == 0L) time = 0L; else if ((time = deadline - System.nanoTime()) <= 0L) return cancelWaiter(node, node, false); Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); node.thread = wt; if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) U.park(false, time); node.thread = null; U.putObject(wt, PARKBLOCKER, null); if (interruptible && Thread.interrupted()) return cancelWaiter(node, node, true); } } } }
並且StampedLock還提供了非阻塞tryReadLock方法,源碼如下:
/** * 可以立即獲得鎖,則獲取讀鎖,否則返回0 */ public long tryReadLock() { for (;;) { long s, m, next; //持有寫鎖返回0 if ((m = (s = state) & ABITS) == WBIT) return 0L; //讀線程數 < RFULL,CAS變更狀態 else if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) return next; } //將state超過 RFULL的值放到readerOverflow字段 else if ((next = tryIncReaderOverflow(s)) != 0L) return next; } }
/** * unit時間內獲得讀鎖成功返回狀態值,失敗返回0,或拋出InterruptedException */ public long tryReadLock(long time, TimeUnit unit) throws InterruptedException { long s, m, next, deadline; long nanos = unit.toNanos(time); if (!Thread.interrupted()) { if ((m = (s = state) & ABITS) != WBIT) { if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) return next; } else if ((next = tryIncReaderOverflow(s)) != 0L) return next; } if (nanos <= 0L) return 0L; if ((deadline = System.nanoTime() + nanos) == 0L) deadline = 1L; if ((next = acquireRead(true, deadline)) != INTERRUPTED) return next; } throw new InterruptedException(); }
StampedLock的悲觀讀鎖readLock 當釋放該鎖時候需要 unlockRead 並傳遞參數 stamp。源碼如下:
/** * state匹配stamp則釋放讀鎖, */ public void unlockRead(long stamp) { long s, m; WNode h; for (;;) { //不匹配拋出異常 if (((s = state) & SBITS) != (stamp & SBITS) || (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT) throw new IllegalMonitorStateException(); //小於最大記錄數值 if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) { if (m == RUNIT && (h = whead) != null && h.status != 0) release(h); break; } } //否則readerOverflow減一 else if (tryDecReaderOverflow(s) != 0L) break; } }
3.樂觀讀鎖 tryOptimisticRead,是相對於悲觀鎖來說的,在操作數據前並沒有通過 CAS 設置鎖的狀態,僅僅是通過位運算測試;如果當前沒有線程持有寫鎖,則簡單的返回一個非 0 的 stamp 版本信息,
獲取該 stamp 后在具體操作數據前還需要調用 validate 驗證下該 stamp 是否已經不可用,也就是看當調用 tryOptimisticRead 返回 stamp 后,到當前時間是否有其它線程持有了寫鎖,如果是那么 validate 會返回 0,
否者就可以使用該 stamp 版本的鎖對數據進行操作。由於 tryOptimisticRead 並沒有使用 CAS 設置鎖狀態,所以不需要顯示的釋放該鎖。
該鎖的一個特點是適用於讀多寫少的場景,因為獲取讀鎖只是使用位操作進行檢驗,不涉及 CAS 操作,所以效率會高很多,但是同時由於沒有使用真正的鎖,在保證數據一致性上需要拷貝一份要操作的變量到方法棧,並且在操作數據時候可能其它寫線程已經修改了數據,
而我們操作的是方法棧里面的數據,也就是一個快照,所以最多返回的不是最新的數據,但是一致性還是得到保障的。源碼如下:
/** * 獲取樂觀讀鎖,返回郵票stamp */ public long tryOptimisticRead() { long s; //有寫鎖返回0. 否則返回256 return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L; }
tryOptimisticRead():如果當前沒有寫鎖占用,返回state(后7位清0,即清0讀線程數),如果有寫鎖,返回0,即失敗。
/** * 驗證從調用tryOptimisticRead開始到現在這段時間內有無寫鎖占用過鎖資源,有寫鎖獲得過鎖資源則返回false. stamp為0返回false. * @return 從返回stamp開始,沒有寫鎖獲得過鎖資源返回true,否則返回false */ public boolean validate(long stamp) { //強制讀取操作和驗證操作在一些情況下的內存排序問題 U.loadFence(); //當持有寫鎖后再釋放寫鎖,該校驗也不成立,返回false return (stamp & SBITS) == (state & SBITS); }
StamedLock還支持這三種鎖在一定條件下進行相互轉換,例如long tryConvertToWriteLock(long stamp)期望把stamp標示的鎖升級為寫鎖,這個函數會在下面幾種情況下返回一個有效的 stamp(也就是晉升寫鎖成功):
1.當前鎖已經是寫鎖模式了。
2.當前鎖處於讀鎖模式,並且沒有其他線程是讀鎖模式
3.當前處於樂觀讀模式,並且當前寫鎖可用。
源碼如下:
/** * state匹配stamp時, 執行下列操作之一. * 1、stamp 已經持有寫鎖,直接返回. * 2、讀模式,但是沒有更多的讀取者,並返回一個寫鎖stamp. * 3、有一個樂觀讀鎖,只在即時可用的前提下返回一個寫鎖stamp * 4、其他情況都返回0 */ public long tryConvertToWriteLock(long stamp) { long a = stamp & ABITS, m, s, next; //state匹配stamp while (((s = state) & SBITS) == (stamp & SBITS)) { //沒有鎖 if ((m = s & ABITS) == 0L) { if (a != 0L) break; //CAS修改狀態為持有寫鎖,並返回 if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) return next; } //持有寫鎖 else if (m == WBIT) { if (a != m) //其他線程持有寫鎖 break; //當前線程已經持有寫鎖 return stamp; } //有一個讀鎖 else if (m == RUNIT && a != 0L) { //釋放讀鎖,並嘗試持有寫鎖 if (U.compareAndSwapLong(this, STATE, s, next = s - RUNIT + WBIT)) return next; } else break; } return 0L; }
/** * state匹配stamp時, 執行下列操作之一. 1、stamp 表示持有寫鎖,釋放寫鎖,並持有讀鎖 2 stamp 表示持有讀鎖 ,返回該讀鎖 3 有一個樂觀讀鎖,只在即時可用的前提下返回一個讀鎖stamp 4、其他情況都返回0,表示失敗 * */ public long tryConvertToReadLock(long stamp) { long a = stamp & ABITS, m, s, next; WNode h; //state匹配stamp while (((s = state) & SBITS) == (stamp & SBITS)) { //沒有鎖 if ((m = s & ABITS) == 0L) { if (a != 0L) break; else if (m < RFULL) { if (U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) return next; } else if ((next = tryIncReaderOverflow(s)) != 0L) return next; } //寫鎖 else if (m == WBIT) { //非當前線程持有寫鎖 if (a != m) break; //釋放寫鎖持有讀鎖 state = next = s + (WBIT + RUNIT); if ((h = whead) != null && h.status != 0) release(h); return next; } //持有讀鎖 else if (a != 0L && a < WBIT) return stamp; else break; } return 0L; }
校驗這個戳是否有效validate():比較當前stamp和發生樂觀鎖得到的stamp比較,不一致則失敗。
還有一個轉換成樂觀鎖tryConvertToOptimisticRead(long stamp) ,這里就不講了,道理都差不多。
另外 StampedLock 的讀寫鎖都是不可重入鎖,所以當獲取鎖后釋放鎖前,不應該再調用會獲取鎖的操作,以避免產生死鎖。
當多個線程同時嘗試獲取讀鎖和寫鎖的時候,誰先獲取鎖沒有一定的規則,完全都是盡力而為,是隨機的,並且該鎖不是直接實現 Lock 或 ReadWriteLock 接口,而是內部自己維護了一個雙向阻塞隊列。
下面通過 JDK8 里面提供的一個管理二維點的例子講解來加深對上面講解的理解。代碼如下所示:
package com.hjc; import java.util.concurrent.locks.StampedLock; /** * Created by cong on 2018/6/16. */ public class Point { // 成員變量 private double x, y; // 鎖實例 private final StampedLock sl = new StampedLock(); // 排它鎖-寫鎖(writeLock) void move(double deltaX, double deltaY) { long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { sl.unlockWrite(stamp); } } // 樂觀讀鎖(tryOptimisticRead) double distanceFromOrigin() { // 嘗試獲取樂觀讀鎖(1) long stamp = sl.tryOptimisticRead(); // 將全部變量拷貝到方法體棧內(2) double currentX = x, currentY = y; // 檢查在(1)獲取到讀鎖票據后,鎖有沒被其它寫線程排它性搶占(3) if (!sl.validate(stamp)) { // 如果被搶占則獲取一個共享讀鎖(悲觀獲取)(4) stamp = sl.readLock(); try { // 將全部變量拷貝到方法體棧內(5) currentX = x; currentY = y; } finally { // 釋放共享讀鎖(6) sl.unlockRead(stamp); } } // 返回計算結果(7) return Math.sqrt(currentX * currentX + currentY * currentY); } // 使用悲觀鎖獲取讀鎖,並嘗試轉換為寫鎖 void moveIfAtOrigin(double newX, double newY) { // 這里可以使用樂觀讀鎖替換(1) long stamp = sl.readLock(); try { // 如果當前點在原點則移動(2) while (x == 0.0 && y == 0.0) { // 嘗試將獲取的讀鎖升級為寫鎖(3) long ws = sl.tryConvertToWriteLock(stamp); // 升級成功,則更新票據,並設置坐標值,然后退出循環(4) if (ws != 0L) { stamp = ws; x = newX; y = newY; break; } else { // 讀鎖升級寫鎖失敗則釋放讀鎖,顯示獲取獨占寫鎖,然后循環重試(5) sl.unlockRead(stamp); stamp = sl.writeLock(); } } } finally { // 釋放鎖(6) sl.unlock(stamp); } } }
如上代碼 Point 類里面有兩個成員變量(x,y) 來標示一個點的二維坐標,和三個操作坐標變量的方法,另外實例化了一個 StampedLock 對象用來保證操作的原子性。
首先分析下 move 方法,該函數作用是使用參數的增量值,改變當前 point 坐標的位置;代碼先獲取到了寫鎖,然后對 point 坐標進行修改,然后釋放鎖。該鎖是排它鎖,這保證了其它線程調用 move 函數時候會被阻塞,也保證了其它線程不能獲取讀鎖,讀取坐標的值,直到當前線程顯示釋放了寫鎖,
也就是保證了對變量 x,y 操作的原子性和數據一致性。
接下來再看 distanceFromOrigin 方法,該方法作用是計算當前位置到原點(坐標為 0,0)的距離,代碼(1)首先嘗試獲取樂觀讀鎖,如果當前沒有其它線程獲取到了寫鎖,那么(1)會返回一個非 0 的 stamp 用來表示版本信息,代碼(2)拷貝坐標變量到本地方法棧里面。
代碼(3)檢查在(1)獲取到的 stamp 值是否還有效,之所以還要在此校驗是因為代碼(1)獲取讀鎖時候並沒有通過 CAS 操作修改鎖的狀態,而是簡單的通過與或操作返回了一個版本信息,這里校驗是看在在獲取版本信息到現在的時間段里面是否有其它線程持有了寫鎖,如果有則之前獲取的版本信息就無效了。
這里如果校驗成功則執行(7)使用本地方法棧里面的值進行計算然后返回。需要注意的是在代碼(3) 校驗成功后,代碼(7)計算期間,其它線程可能獲取到了寫鎖並且修改了 x,y 的值,而當前線程執行代碼(7)進行計算時候采用的還是修改前值的拷貝,也就是操作的值是對之前值的一個拷貝,一個快照,並不是最新的值。
也許我們會想,代碼(2) 和(3)能否互換?。
答案是明顯不能的,如果位置換了,那么首先執行validate ,假設驗證通過了,要拷貝x,y 值到本地方法棧,而在拷貝的過程中很有可能其他線程已經修改過了 x,y 中的一個,這就造成了數據的不一致性了。
那么你可能還會這樣會想,即使不交換代碼 (2) 和(3),在拷貝 x,y 值到本地方法棧里面時,也會存在其他線程修改了x,y中的一個值,這不也會存在問題嗎?
這個確實會存在,但是別忘記了拷貝后還有一道validate,如果這時候有線程修改了x,y 中的值,那么肯定是有線程在調用 validate 前,調用 sl.tryOptimisticRead 后獲取了寫鎖,那么進行 validate 時候就會失敗。
好了知道這么多原理后,我們就會驚嘆這也是樂觀讀設計的精妙之處也是使用時候容易出問題的地方。下面繼續分析 validate 失敗后會執行代碼(4)獲取悲觀讀鎖,如果這時候其他線程持有寫鎖則代碼(4)會導致的當前線程阻塞直到其它線程釋放了寫鎖。
如果這時候沒有其他線程獲取到寫鎖,那么當前線程就可以獲取到讀鎖,然后執行代碼(5)重新拷貝新的坐標值到本地方法棧,然后就是代碼(6)釋放了鎖,拷貝的時候由於加了讀鎖,所以拷貝期間其它線程獲取寫鎖時候會被阻塞,
這保證了數據的一致性,另外這里 x,y 沒有被聲明為 volatie,會不會存在內存不可見性問題那?答案是不會,因為加鎖的語義保證了內存可見性,
最后代碼(7)使用方法棧里面數據計算返回,同理這里在計算時候使用的數據也可能不是最新的,其它寫線程可能已經修改過原來的 x,y 值了。
最后一個方法 moveIfAtOrigin 作用是如果當前坐標為原點則移動到指定的位置。代碼(1)獲取悲觀讀鎖,保證其它線程不能獲取寫鎖修改 x,y 值,然后代碼(2)判斷如果當前點在原點則更新坐標,
代碼(3) 嘗試升級讀鎖為寫鎖,這里升級不一定成功,因為多個線程都可以同時獲取悲觀讀鎖,當多個線程都執行到(3)時候只有一個可以升級成功,升級成功則返回非 0 的 stamp,否非返回 0。
這里假設當前線程升級成功,然后執行步驟(4)更新 stamp 值和坐標值,然后退出循環。如果升級失敗則執行步驟(5)首先釋放讀鎖然后申請寫鎖,獲取到寫鎖后在循環重新設置坐標值。最后步驟(6) 釋放鎖。
使用樂觀讀鎖還是很容易犯錯誤的,必須要嚴謹,必須要保證如下的使用順序,用偽代碼作為講解,如下:
long stamp = lock.tryOptimisticRead(); //非阻塞獲取版本信息 copyVaraibale2ThreadMemory();//拷貝變量到線程本地堆棧 if(!lock.validate(stamp)){ // 校驗 long stamp = lock.readLock();//獲取讀鎖 try { copyVaraibale2ThreadMemory();//拷貝變量到線程本地堆棧 } finally { lock.unlock(stamp);//釋放悲觀鎖 } } useThreadMemoryVarables();//使用線程本地堆棧里面的數據進行操作
總結:StampedLock 提供的讀寫鎖與 ReentrantReadWriteLock 類似,只是前者的都是不可重入鎖。但是前者通過提供樂觀讀鎖在多線程多讀的情況下提供更好的性能,這是因為獲取樂觀讀鎖時候不需要進行 CAS 操作設置鎖的狀態,而只是簡單的測試狀態。