一、簡介
讀寫鎖是一種特殊的自旋鎖,它把對共享資源對訪問者划分成了讀者和寫者,讀者只對共享資源進行訪問,寫者則是對共享資源進行寫操作。讀寫鎖在ReentrantLock上進行了拓展使得該鎖更適合讀操作遠遠大於寫操作對場景。一個讀寫鎖同時只能存在一個寫鎖但是可以存在多個讀鎖,但不能同時存在寫鎖和讀鎖。
如果讀寫鎖當前沒有讀者,也沒有寫者,那么寫者可以立刻獲的讀寫鎖,否則必須自旋,直到沒有任何的寫鎖或者讀鎖存在。如果讀寫鎖沒有寫鎖,那么讀鎖可以立馬獲取,否則必須等待寫鎖釋放。(但是有一個例外,就是讀寫鎖中的鎖降級操作,當同一個線程獲取寫鎖后,在寫鎖沒有釋放的情況下可以獲取讀鎖再釋放讀鎖這就是鎖降級的一個過程)
二、簡單示例
1 package cn.memedai; 2
3 import java.util.Random; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.locks.ReadWriteLock; 7
8 /**
9 * 讀寫鎖Demo 10 */
11 public class ReentrantReadWriteLockDemo { 12
13 class MyObject { 14 private Object object; 15
16 private ReadWriteLock lock = new java.util.concurrent.locks.ReentrantReadWriteLock(); 17
18 public void get() throws InterruptedException { 19 lock.readLock().lock();//上讀鎖
20 try { 21 System.out.println(Thread.currentThread().getName() + "准備讀取數據"); 22 Thread.sleep(new Random().nextInt(1000)); 23 System.out.println(Thread.currentThread().getName() + "讀數據為:" + this.object); 24 } finally { 25 lock.readLock().unlock(); 26 } 27 } 28
29 public void put(Object object) throws InterruptedException { 30 lock.writeLock().lock(); 31 try { 32 System.out.println(Thread.currentThread().getName() + "准備寫數據"); 33 Thread.sleep(new Random().nextInt(1000)); 34 this.object = object; 35 System.out.println(Thread.currentThread().getName() + "寫數據為" + this.object); 36 } finally { 37 lock.writeLock().unlock(); 38 } 39 } 40 } 41
42 public static void main(String[] args) { 43 final MyObject myObject = new ReentrantReadWriteLockDemo().new MyObject(); 44 ExecutorService executorService = Executors.newCachedThreadPool(); 45 for (int i = 0; i < 3; i++) { 46 executorService.execute(new Runnable() { 47 @Override 48 public void run() { 49 for (int j = 0; j < 3; j++) { 50
51 try { 52 myObject.put(new Random().nextInt(1000));//寫操作 53 } catch (InterruptedException e) { 54 e.printStackTrace(); 55 } 56 } 57 } 58 }); 59 } 60
61 for (int i = 0; i < 3; i++) { 62 executorService.execute(new Runnable() { 63 @Override 64 public void run() { 65 for (int j = 0; j < 3; j++) { 66 try { 67 myObject.get();//多個線程讀取操作 68 } catch (InterruptedException e) { 69 e.printStackTrace(); 70 } 71 } 72 } 73 }); 74 } 75
76 executorService.shutdown(); 77 } 78 }
下面是代碼運行結果的一種:
pool-1-thread-1准備寫數據 pool-1-thread-1寫數據為513 pool-1-thread-1准備寫數據 pool-1-thread-1寫數據為173 pool-1-thread-1准備寫數據 pool-1-thread-1寫數據為487 pool-1-thread-2准備寫數據 pool-1-thread-2寫數據為89 pool-1-thread-2准備寫數據 pool-1-thread-2寫數據為814 pool-1-thread-2准備寫數據 pool-1-thread-2寫數據為1 pool-1-thread-3准備寫數據 pool-1-thread-3寫數據為701 pool-1-thread-3准備寫數據 pool-1-thread-3寫數據為503 pool-1-thread-3准備寫數據 pool-1-thread-3寫數據為694 pool-1-thread-4准備讀取數據 pool-1-thread-5准備讀取數據 pool-1-thread-6准備讀取數據 pool-1-thread-4讀數據為:694 pool-1-thread-4准備讀取數據 pool-1-thread-4讀數據為:694 pool-1-thread-4准備讀取數據 pool-1-thread-6讀數據為:694 pool-1-thread-6准備讀取數據 pool-1-thread-5讀數據為:694 pool-1-thread-5准備讀取數據 pool-1-thread-6讀數據為:694 pool-1-thread-6准備讀取數據 pool-1-thread-4讀數據為:694 pool-1-thread-5讀數據為:694 pool-1-thread-5准備讀取數據 pool-1-thread-6讀數據為:694 pool-1-thread-5讀數據為:694
從數據中也可以發現一開始讀取的數據可能不一樣,但是你會發現下面的時候線程4和線程5、線程6之間的讀取的數據都是一樣的,這就是共享讀的特性。
三、實現原理
ReentrantReadWriteLock的基本原理和ReentrantLock沒有很大的區別,只不過在ReentantLock的基礎上拓展了兩個不同類型的鎖,讀鎖和寫鎖。首先可以看一下ReentrantReadWriteLock的內部結構:
內部維護了一個ReadLock和一個WriteLock,整個類的附加功能也就是通過這兩個內部類實現的。
那么內部又是怎么實現這個讀鎖和寫鎖的呢。由於一個類既要維護讀鎖又要維護寫鎖,那么這兩個鎖的狀態又是如何區分的。在ReentrantReadWriteLock對象內部維護了一個讀寫狀態:
讀寫鎖依賴自定義同步器實現同步功能,讀寫狀態也就是同步器的同步狀態。讀寫鎖將整形變量切分成兩部分,高16位表示讀,低16位表示寫:
讀寫鎖的狀態低16位為寫鎖,高16位為讀鎖
讀寫鎖通過位運算計算各自的同步狀態。假設當前同步狀態的值為c,寫狀態就為c&0x0000FFFF,讀狀態為c >>> 16(無符號位補0右移16位)。當寫狀態增加1狀態變為c+1,當讀狀態增加1時,狀態編碼就是c+(1 <<< 16)。
怎么維護讀寫狀態的已經了解了,那么就可以開始了解具體怎么樣實現的多個線程可以讀,一個線程寫的情況。
首先介紹的是ReadLock獲取鎖的過程
lock():獲取讀鎖方法
1 public void lock() { 2 sync.acquireShared(1);//自定義實現的獲取鎖方式 3 }
acquireShared(int arg):這是一個獲取共享鎖的方法
1 protected final int tryAcquireShared(int unused) {
17 Thread current = Thread.currentThread();//獲取當前線程 18 int c = getState();//獲取鎖狀態 19 if (exclusiveCount(c) != 0 &&
20 getExclusiveOwnerThread() != current)//如果獲取鎖的不是當前線程,並且由獨占式鎖的存在就不去獲取,這里會發現必須同時滿足兩個條件才能判斷其不能獲取讀鎖這也會后面的鎖降級做了准備 21 return -1; 22 int r = sharedCount(c);//獲取當前共享資源的數量 23 if (!readerShouldBlock() &&
24 r < MAX_COUNT &&
25 compareAndSetState(c, c + SHARED_UNIT)) {//代表可以獲取讀鎖 26 if (r == 0) {//如果當前沒有線程獲取讀鎖 27 firstReader = current;//當前線程是第一個讀鎖獲取者 28 firstReaderHoldCount = 1;//在計數器上加1 29 } else if (firstReader == current) { 30 firstReaderHoldCount++;//代表重入鎖計數器累加 31 } else {
//內部定義的線程記錄緩存 32 HoldCounter rh = cachedHoldCounter;//HoldCounter主要是一個類用來記錄線程已經線程獲取鎖的數量 33 if (rh == null || rh.tid != current.getId())//如果不是當前線程 34 cachedHoldCounter = rh = readHolds.get();//從每個線程的本地變量ThreadLocal中獲取 35 else if (rh.count == 0)//如果記錄為0初始值設置 36 readHolds.set(rh);//設置記錄 37 rh.count++;//自增 38 } 39 return 1;//返回1代表獲取到了同步狀態 40 } 41 return fullTryAcquireShared(current);//用來處理CAS設置狀態失敗的和tryAcquireShared非阻塞獲取讀鎖失敗的 42 }
內部運用到了ThreadLocal線程本地對象,將每個線程獲取鎖的次數保存到每個線程內部,這樣釋放鎖的時候就不會影響到其它的線程。
fullTryAcquireShared(Thread current):此方法用於處理在獲取讀鎖過程中CAS設置狀態失敗的和非阻塞獲取讀鎖失敗的線程
1 final int fullTryAcquireShared(Thread current) { 2 //內部線程記錄器 8 HoldCounter rh = null; 9 for (;;) { 10 int c = getState();//同步狀態 11 if (exclusiveCount(c) != 0) {//代表存在獨占鎖 12 if (getExclusiveOwnerThread() != current)//獲取獨占鎖的線程不是當前線程返回失敗 13 return -1; 16 } else if (readerShouldBlock()) {//判斷讀鎖是否應該被阻塞 18 if (firstReader == current) { 20 } else { 21 if (rh == null) {//為null 22 rh = cachedHoldCounter;//從緩存中進行獲取 23 if (rh == null || rh.tid != current.getId()) { 24 rh = readHolds.get();//獲取線程內部計數狀態 25 if (rh.count == 0) 26 readHolds.remove();//移除 27 } 28 } 29 if (rh.count == 0)//如果內部計數為0代表獲取失敗 30 return -1; 31 } 32 } 33 if (sharedCount(c) == MAX_COUNT) 34 throw new Error("Maximum lock count exceeded"); 35 if (compareAndSetState(c, c + SHARED_UNIT)) {//CAS設置成功 36 if (sharedCount(c) == 0) { 37 firstReader = current;//代表為第一個獲取讀鎖 38 firstReaderHoldCount = 1; 39 } else if (firstReader == current) { 40 firstReaderHoldCount++;//重入鎖 41 } else { 42 if (rh == null) 43 rh = cachedHoldCounter; 44 if (rh == null || rh.tid != current.getId()) 45 rh = readHolds.get(); 46 else if (rh.count == 0) 47 readHolds.set(rh); 48 rh.count++; 49 cachedHoldCounter = rh; //將當前多少讀鎖記錄下來 50 } 51 return 1;//返回獲取同步狀態成功 52 } 53 } 54 }
分析完上面的方法可以總結一下獲取讀鎖的過程:首先讀寫鎖中讀狀態為所有線程獲取讀鎖的次數,由於是可重入鎖,又因為每個鎖獲取的讀鎖的次數由每個鎖的本地變量ThreadLocal對象去保存因此增加了讀取獲取的流程難度,在每次獲取讀鎖之前都會進行一次判斷是否存在獨占式寫鎖,如果存在獨占式寫鎖就直接返回獲取失敗,進入同步隊列中。如果當前沒有寫鎖被獲取,則線程可以獲取讀鎖,由於共享鎖的存在,每次獲取都會判斷線程的類型,以便每個線程獲取同步狀態的時候都在其對應的本地變量上進行自增操作。
lock(int arg):寫鎖的獲取
public void lock() {
sync.acquire(1);//AQS獨占式獲取鎖
}
tryAcquire(int arg):獨占式的獲取寫鎖
1 protected final boolean tryAcquire(int acquires) {
13 Thread current = Thread.currentThread();//獲取當前線程 14 int c = getState();//獲取同步狀態值 15 int w = exclusiveCount(c);//獲取獨占式資源值 16 if (c != 0) {//已經有線程獲取了 //代表已經存在讀鎖,或者當前線程不是獲取到寫鎖的線程
18 if (w == 0 || current != getExclusiveOwnerThread()) 19 return false;//獲取失敗 20 if (w + exclusiveCount(acquires) > MAX_COUNT) 21 throw new Error("Maximum lock count exceeded"); 22 //設置同步狀態
23 setState(c + acquires); 24 return true; 25 } 26 if (writerShouldBlock() ||
27 !compareAndSetState(c, c + acquires))//判斷當前寫鎖線程是否應該阻塞,這里會有公平鎖和非公平鎖之間的區分 28 return false; 29 setExclusiveOwnerThread(current);//設置為當前線程 30 return true; 31 }
獲取寫鎖相比獲取讀鎖就簡單了很多,在獲取讀鎖之前只需要判斷當前是否存在讀鎖,如果存在讀鎖那么獲取失敗,進而再判斷獲取寫鎖的線程是否為當前線程如果不是也就是失敗否則就是重入鎖在已有的狀態值上進行自增
unlock():讀鎖釋放
public void unlock() { sync.releaseShared(1);//AQS釋放共享鎖操作 }
tryReleaseShared(int arg):釋放共享鎖
1 protected final boolean tryReleaseShared(int unused) { 2 Thread current = Thread.currentThread();//獲取當前線程 3 if (firstReader == current) {//如果當前線程就是獲取讀鎖的線程
5 if (firstReaderHoldCount == 1)//如果此時獲取資源為1 6 firstReader = null;//直接賦值null 7 else
8 firstReaderHoldCount--;//否則計數器自減 9 } else {
//其他線程 10 HoldCounter rh = cachedHoldCounter;//獲取本地計數器 11 if (rh == null || rh.tid != current.getId()) 12 rh = readHolds.get(); 13 int count = rh.count; 14 if (count <= 1) {//代表只獲取了一次 15 readHolds.remove(); 16 if (count <= 0) 17 throw unmatchedUnlockException(); 18 } 19 --rh.count; 20 } 21 for (;;) { 22 int c = getState(); 23 int nextc = c - SHARED_UNIT; 24 if (compareAndSetState(c, nextc))
28 return nextc == 0;//代表已經全部釋放 29 } 30 }
釋放鎖的過程不難,但是有一個注意點,並不是釋放一次就已經代表可以獲取獨占式寫鎖了,只有當同步狀態的值為0的時候也就是代表既沒有讀鎖存在也沒有寫鎖存在才代表完全釋放了讀鎖。
unlock():釋放寫鎖
1 public void unlock() { 2 sync.release(1);//釋放獨占式同步狀態 3 }
tryRelease(int arg):釋放獨占式寫鎖
1 protected final boolean tryRelease(int releases) { 2 if (!isHeldExclusively())//判斷是否 3 throw new IllegalMonitorStateException(); 4 int nextc = getState() - releases;//同步狀態值自減 5 boolean free = exclusiveCount(nextc) == 0;//如果狀態值為0代表全部釋放 6 if (free) 7 setExclusiveOwnerThread(null); 8 setState(nextc); 9 return free; 10 }
寫鎖的釋放相比讀鎖的釋放簡單很多,只需要判斷當前的寫鎖是否全部釋放完畢即可
四、讀寫鎖之鎖降級操作
什么是鎖降級,鎖降級就是從寫鎖降級成為讀鎖。在當前線程擁有寫鎖的情況下,再次獲取到讀鎖,隨后釋放寫鎖的過程就是鎖降級。這里可以舉個例子:
1 public class CacheDemo {
3 private Map<String, Object> cache = new HashMap<String, Object>(); 4
5 private ReadWriteLock rwl = new ReentrantReadWriteLock(); 6 public ReadLock rdl = rwl.readLock(); 7 public WriteLock wl = rwl.writeLock(); 8
9 public volatile boolean update = false; 10 public void processData(){ 11 rdl.lock();//獲取讀鎖 12 if(!update){ 13 rdl.unlock();//釋放讀鎖 14 wl.lock();//獲取寫鎖 15 try{ 16 if(!update){ 17 update =true; 18 } 19 rdl.lock();//獲取讀鎖 20 finally{ 21 wl.unlock();//釋放寫鎖 22 } 23 } 24 try{ 25 }finally{ 26 rdl.unlock();//釋放讀鎖 27 }
29 }
五、總結
讀寫鎖是在重入鎖ReentrantLock基礎上的一大改造,其通過在重入鎖上維護一個讀鎖一個寫鎖實現的。對於ReentrantLock和ReentrantreadWriteLock的使用需要在開發者自己根據實際項目的情況而定。對於讀寫鎖當讀的操作遠遠大於寫操作的時候會增加程序很高的並發量和吞吐量。雖說在高並發的情況下,讀寫鎖的效率很高,但是同時又會存在一些問題,比如當讀並發很高時讀操作長時間占有鎖,導致寫鎖長時間無法被獲取而導致的線程飢餓問題,因此在JDK1.8中又在ReentrantReadWriteLock的基礎上新增了一個讀寫並發鎖StampLock。
==================================================================================
不管歲月里經歷多少辛酸和艱難,告訴自己風雨本身就是一種內涵,努力的面對,不過就是一場命運的漂流,既然在路上,那么目的地必然也就是前方。
==================================================================================