大家好,我是冰河~~
在實際工作中,有一種非常普遍的並發場景:那就是讀多寫少的場景。在這種場景下,為了優化程序的性能,我們經常使用緩存來提高應用的訪問性能。因為緩存非常適合使用在讀多寫少的場景中。而在並發場景中,Java SDK中提供了ReadWriteLock來滿足讀多寫少的場景。本文我們就來說說使用ReadWriteLock如何實現一個通用的緩存中心。
本文涉及的知識點有:
文章已收錄到:
https://github.com/sunshinelyz/technology-binghe
https://gitee.com/binghe001/technology-binghe
讀寫鎖
說起讀寫鎖,相信小伙伴們並不陌生。總體來說,讀寫鎖需要遵循以下原則:
- 一個共享變量允許同時被多個讀線程讀取到。
- 一個共享變量在同一時刻只能被一個寫線程進行寫操作。
- 一個共享變量在被寫線程執行寫操作時,此時這個共享變量不能被讀線程執行讀操作。
這里,需要小伙伴們注意的是:讀寫鎖和互斥鎖的一個重要的區別就是:讀寫鎖允許多個線程同時讀共享變量,而互斥鎖不允許。所以,在高並發場景下,讀寫鎖的性能要高於互斥鎖。但是,讀寫鎖的寫操作是互斥的,也就是說,使用讀寫鎖時,一個共享變量在被寫線程執行寫操作時,此時這個共享變量不能被讀線程執行讀操作。
讀寫鎖支持公平模式和非公平模式,具體是在ReentrantReadWriteLock
的構造方法中傳遞一個boolean類型的變量來控制。
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
另外,需要注意的一點是:在讀寫鎖中,讀鎖調用newCondition()會拋出UnsupportedOperationException異常,也就是說:讀鎖不支持條件變量。
緩存實現
這里,我們使用ReadWriteLock快速實現一個緩存的通用工具類,總體代碼如下所示。
public class ReadWriteLockCache<K,V> {
private final Map<K, V> m = new HashMap<>();
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 讀鎖
private final Lock r = rwl.readLock();
// 寫鎖
private final Lock w = rwl.writeLock();
// 讀緩存
public V get(K key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
// 寫緩存
public V put(K key, V value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
}
可以看到,在ReadWriteLockCache中,我們定義了兩個泛型類型,K代表緩存的Key,V代表緩存的value。在ReadWriteLockCache類的內部,我們使用Map來緩存相應的數據,小伙伴都都知道HashMap並不是線程安全的類,所以,這里使用了讀寫鎖來保證線程的安全性,例如,我們在get()方法中使用了讀鎖,get()方法可以被多個線程同時執行讀操作;put()方法內部使用寫鎖,也就是說,put()方法在同一時刻只能有一個線程對緩存進行寫操作。
這里需要注意的是:無論是讀鎖還是寫鎖,鎖的釋放操作都需要放到finally{}
代碼塊中。
在以往的經驗中,有兩種向緩存中加載數據的方式,一種是:項目啟動時,將數據全量加載到緩存中,一種是在項目運行期間,按需加載所需要的緩存數據。
接下來,我們就分別來看看全量加載緩存和按需加載緩存的方式。
全量加載緩存
全量加載緩存相對來說比較簡單,就是在項目啟動的時候,將數據一次性加載到緩存中,這種情況適用於緩存數據量不大,數據變動不頻繁的場景,例如:可以緩存一些系統中的數據字典等信息。整個緩存加載的大體流程如下所示。
將數據全量加載到緩存后,后續就可以直接從緩存中讀取相應的數據了。
全量加載緩存的代碼實現比較簡單,這里,我就直接使用如下代碼進行演示。
public class ReadWriteLockCache<K,V> {
private final Map<K, V> m = new HashMap<>();
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
// 讀鎖
private final Lock r = rwl.readLock();
// 寫鎖
private final Lock w = rwl.writeLock();
public ReadWriteLockCache(){
//查詢數據庫
List<Field<K, V>> list = .....;
if(!CollectionUtils.isEmpty(list)){
list.parallelStream().forEach((f) ->{
m.put(f.getK(), f.getV);
});
}
}
// 讀緩存
public V get(K key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
// 寫緩存
public V put(K key, V value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
}
按需加載緩存
按需加載緩存也可以叫作懶加載,就是說:需要加載的時候才會將數據加載到緩存。具體來說:就是程序啟動的時候,不會將數據加載到緩存,當運行時,需要查詢某些數據,首先檢測緩存中是否存在需要的數據,如果存在,則直接讀取緩存中的數據,如果不存在,則到數據庫中查詢數據,並將數據寫入緩存。后續的讀取操作,因為緩存中已經存在了相應的數據,直接返回緩存的數據即可。
這種查詢緩存的方式適用於大多數緩存數據的場景。
我們可以使用如下代碼來表示按需查詢緩存的業務。
class ReadWriteLockCache<K,V> {
private final Map<K, V> m = new HashMap<>();
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
V get(K key) {
V v = null;
//讀緩存
r.lock();
try {
v = m.get(key);
} finally{
r.unlock();
}
//緩存中存在,返回
if(v != null) {
return v;
}
//緩存中不存在,查詢數據庫
w.lock();
try {
//再次驗證緩存中是否存在數據
v = m.get(key);
if(v == null){
//查詢數據庫
v=從數據庫中查詢出來的數據
m.put(key, v);
}
} finally{
w.unlock();
}
return v;
}
}
這里,在get()方法中,首先從緩存中讀取數據,此時,我們對查詢緩存的操作添加了讀鎖,查詢返回后,進行解鎖操作。判斷緩存中返回的數據是否為空,不為空,則直接返回數據;如果為空,則獲取寫鎖,之后再次從緩存中讀取數據,如果緩存中不存在數據,則查詢數據庫,將結果數據寫入緩存,釋放寫鎖。最終返回結果數據。
這里,有小伙伴可能會問:為啥程序都已經添加寫鎖了,在寫鎖內部為啥還要查詢一次緩存呢?
這是因為在高並發的場景下,可能會存在多個線程來競爭寫鎖的現象。例如:第一次執行get()方法時,緩存中的數據為空。如果此時有三個線程同時調用get()方法,同時運行到 w.lock()
代碼處,由於寫鎖的排他性。此時只有一個線程會獲取到寫鎖,其他兩個線程則阻塞在w.lock()
處。獲取到寫鎖的線程繼續往下執行查詢數據庫,將數據寫入緩存,之后釋放寫鎖。
此時,另外兩個線程競爭寫鎖,某個線程會獲取到鎖,繼續往下執行,如果在w.lock()
后沒有 v = m.get(key);
再次查詢緩存的數據,則這個線程會直接查詢數據庫,將數據寫入緩存后釋放寫鎖。最后一個線程同樣會按照這個流程執行。
這里,實際上第一個線程已經查詢過數據庫,並且將數據寫入緩存了,其他兩個線程就沒必要再次查詢數據庫了,直接從緩存中查詢出相應的數據即可。所以,在w.lock()
后添加 v = m.get(key);
再次查詢緩存的數據,能夠有效的減少高並發場景下重復查詢數據庫的問題,提升系統的性能。
讀寫鎖的升降級
關於鎖的升降級,小伙伴們需要注意的是:在ReadWriteLock中,鎖是不支持升級的,因為讀鎖還未釋放時,此時獲取寫鎖,就會導致寫鎖永久等待,相應的線程也會被阻塞而無法喚醒。
雖然不支持鎖升級,但是ReadWriteLock支持鎖降級,例如,我們來看看官方的ReentrantReadWriteLock示例,如下所示。
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}}
數據同步問題
首先,這里說的數據同步指的是數據源和數據緩存之間的數據同步,說的再直接一點,就是數據庫和緩存之間的數據同步。
這里,我們可以采取三種方案來解決數據同步的問題,如下圖所示
超時機制
這個比較好理解,就是在向緩存寫入數據的時候,給一個超時時間,當緩存超時后,緩存的數據會自動從緩存中移除,此時程序再次訪問緩存時,由於緩存中不存在相應的數據,查詢數據庫得到數據后,再將數據寫入緩存。
定時更新緩存
這種方案是超時機制的增強版,在向緩存中寫入數據的時候,同樣給一個超時時間。與超時機制不同的是,在程序后台單獨啟動一個線程,定時查詢數據庫中的數據,然后將數據寫入緩存中,這樣能夠在一定程度上避免緩存的穿透問題。
實時更新緩存
這種方案能夠做到數據庫中的數據與緩存的數據是實時同步的,可以使用阿里開源的Canal框架實現MySQL數據庫與緩存數據的實時同步。也可以使用我個人開源的mykit-data框架哦(推薦使用)~~
mykit-data開源地址:
好了,今天就到這兒吧,我是冰河,我們下期見~~