一、簡介
JetCache
是一個基於Java的緩存系統封裝,提供統一的API和注解來簡化緩存的使用。 JetCache提供了比SpringCache更加強大的注解,可以原生的支持TTL、兩級緩存、分布式自動刷新,還提供了Cache
接口用於手工緩存操作。 當前有四個實現:RedisCache
、RedisLettuceCache
、CaffeineCache
、LinkedHashMapCache
。
特性:
-
通過統一的API訪問Cache系統
-
通過注解實現聲明式的方法緩存,支持TTL和兩級緩存
-
通過注解創建並配置
Cache
實例 -
針對所有
Cache
實例和方法緩存的自動統計 -
Key的生成策略和Value的序列化策略支持自定義配置
-
分布式緩存自動刷新,分布式鎖
-
異步Cache API (使用Redis的Lettuce客戶端時)
緩存類型:
-
本地
LinkedHashMap
:使用LinkedHashMap做LUR方式淘汰
Caffeine
:基於Java8開發的提供了近乎最佳命中率的高性能的緩存庫 -
遠程(訪問Redis的客戶端)
Redis
:使用Jedis客戶端,Redis官方首選的Java客戶端
RedisSpringData
:使用SpringData訪問Redis(官網未作介紹)
RedisLettuce
:使用Lettuce客戶端,一個高性能基於Java的Redis驅動框架,支持線程安全的同步、異步操作,底層集成了Project Reactor,提供反應式編程,參考:Redis高級客戶端Lettuce詳解
為什么使用緩存?
在高並發、大流量等場景下,降低系統延遲,緩解數據庫壓力,提高系統整體的性能,讓用戶有更好的體驗。
使用場景
讀多寫少、不追求強一致性、請求入參不易變化
使用規范
選擇了遠程緩存請設置keyPrefix,保證存放至Redis的緩存key規范化,避免與其他系統出現沖突,例如這樣設計:系統簡稱:所屬名字:
,這樣存儲到Redis的緩存key為:系統簡稱:所屬名字:緩存key
選擇了本地緩存請設置limit,全局默認設置了100,本地緩存的數據存放於內存,減輕內存的損耗,如果使用了Caffeine,緩存的key過多可能導致內存溢出
請勿濫用緩存注解,對於非必要添加緩存的方法我們盡量不使用緩存
二、如何使用
說明:以下使用方式是基於SpringBoot引入JetCache
緩存框架的,如果不是SpringBoot工程,請參考JetCache
官網使用
引入maven依賴
<dependencies>
<!-- 使用 jedis 客戶端添加以下依賴 -->
<dependency>
<groupId>com.alicp.jetcache</groupId>
<artifactId>jetcache-starter-redis</artifactId>
<version>${version}</version>
</dependency>
<!-- 使用 lettuce 客戶端添加以下依賴 -->
<dependency>
<groupId>com.alicp.jetcache</groupId>
<artifactId>jetcache-starter-redis-lettuce</artifactId>
<version>${version}</version>
</dependency>
</dependencies>
添加配置
jetcache:
statIntervalMinutes: 60
areaInCacheName: false
penetrationProtect: false
enableMethodCache: true
hiddenPackages: com.xxx.xxx,com.xxx.xxx
local:
default:
type: caffeine # 支持的類型:linkedhashmap、caffeine
limit: 100
keyConvertor: fastjson # 支持的類型:fastjson,可自定義轉換器函數
expireAfterWriteInMillis: 600000
expireAfterAccessInMillis: 300000
remote:
default:
type: redis.lettuce # 支持的類型:redis、redis.lettuce
keyPrefix: '系統簡稱:所屬名字:'
keyConvertor: fastjson
valueEncoder: java # 支持的類型:kryo、java,可自定義編碼器
valueDecoder: java # 支持的類型:kryo、java,可自定義解碼器
expireAfterWriteInMillis: 3600000
#readFrom: slavePreferred # 優先從Slave節點中讀取
uri: redis-sentinel://host1:26379,host2:26379,host3:26379/?sentinelMasterId=mymaster # 哨兵模式
#uri: redis://127.0.0.1:6379/ # 單節點模式
#mode: masterslave # 設置為主從模式
#uri: # 集群模式
#- redis://127.0.0.1:7000
#- redis://127.0.0.1:7001
#- redis://127.0.0.1:7002
example:
keyPrefix: '系統簡稱:所屬名字:'
type: redis
keyConvertor: fastjson
valueEncoder: java
valueDecoder: java
expireAfterWriteInMillis: 3600000
poolConfig:
minIdle: 10
maxIdle: 20
maxTotal: 50
#password: xxx # 連接密碼
#timeout: 2000 # 連接的超時時間,讀取數據的超時時間
#database: 0 # 連接的數據庫
#clientName: null # 客戶端名稱
#ssl: 是否使用SSL
host: ${redis.host}
port: ${redis.port}
#sentinel: host1:26379,host2:26379,host3:26379 # 哨兵模式
#masterName: mymaster
配置說明
jetcache的全局配置
屬性 | 默認值 | 說明 |
---|---|---|
jetcache.statIntervalMinutes | 0 | 用於統計緩存調用相關信息的統計間隔(分鍾),0表示不統計。 |
jetcache.areaInCacheName | true | 緩存實例名稱cacheName會作為緩存key的前綴,2.4.3以前的版本總是把areaName加在cacheName中,因此areaName也出現在key前綴中。我們一般設置為false。 |
jetcache.penetrationProtect | false | 當緩存訪問未命中的情況下,對並發進行的加載行為進行保護。 當前版本實現的是單JVM內的保護,即同一個JVM中同一個key只有一個線程去加載,其它線程等待結果。這是全局配置,如果緩存實例沒有指定則使用全局配置。 |
jetcache.enableMethodCache | true | 是否使用jetcache緩存。 |
jetcache.hiddenPackages | 無 | 自動生成緩存實例名稱時,為了不讓名稱太長,hiddenPackages指定的包名前綴會被截掉,多個包名使用逗號分隔。我們一般會指定每個緩存實例的名稱。 |
本地緩存的全局配置
屬性 | 默認值 | 說明 |
---|---|---|
jetcache.local.${area}.type | 無 | 本地緩存類型,支持 linkedhashmap、caffeine。 |
jetcache.local.${area}.limit | 100 | 每個緩存實例存儲的緩存數量的全局配置,僅本地緩存需要配置,如果緩存實例沒有指定則使用全局配置,請結合實例的業務場景進行配置該參數。 |
jetcache.local.${area}.keyConvertor | 無 | 緩存key轉換器的全局配置,支持的類型:fastjson 。僅當使用@CreateCache且緩存類型為LOCAL時可以指定為none ,此時通過equals方法來識別key。方法緩存必須指定keyConvertor。支持自定義轉換器函數,可設置為:bean:beanName ,然后會從spring容器中獲取該bean。 |
jetcache.local.${area}.expireAfterWriteInMillis | 無窮大 | 本地緩存超時時間的全局配置(毫秒)。 |
jetcache.local.${area}.expireAfterAccessInMillis | 0 | 多長時間沒訪問就讓緩存失效的全局配置(毫秒),僅支持本地緩存。0表示不使用這個功能。 |
遠程緩存的全局配置
屬性 | 默認值 | 說明 |
---|---|---|
jetcache.remote.${area}.type | 無 | 連接Redis的客戶端類型,支持 redis 、redis.lettuce 、redis.springdata 。 |
jetcache.remote.${area}.keyPrefix | 無 | 保存至遠程緩存key的前綴,請規范使用。 |
jetcache.remote.${area}.keyConvertor | 無 | 參考上述說明。 |
jetcache.remote.${area}.valueEncoder | java | 保存至遠程緩存value的編碼函數,支持:java 、kryo 。支持自定義編碼函數,可設置為:bean:beanName ,然后會從spring容器中獲取該bean。 |
jetcache.remote.${area}.valueDecoder | java | 保存至遠程緩存value的解碼函數,支持:java 、kryo 。支持自定義解碼函數,可設置為:bean:beanName ,然后會從spring容器中獲取該bean。 |
jetcache.remote.${area}.expireAfterWriteInMillis | 無窮大 | 遠程緩存超時時間的全局配置(毫秒)。 |
jetcache.remote.${area}.uri | 無 | redis節點信息。 |
上表中${area}對應@Cached和@CreateCache的area屬性,如果注解上沒有指定area,默認值是"default"。
關於緩存的超時時間:
- put等方法上指定了超時時間,則以此時間為准;
- put等方法上未指定超時時間,使用Cache實例的默認超時時間;
- Cache實例的默認超時時間,通過在@CreateCache和@Cached上的expire屬性指定,如果沒有指定,使用yml中定義的全局配置,例如@Cached(cacheType=local)使用jetcache.local.default.expireAfterWriteInMillis,如果仍未指定則是無窮大。
注解說明
如果需要使用jetcache
緩存,啟動類添加兩個注解:@EnableCreateCacheAnnotation
、@EnableMethodCache
@EnableCreateCacheAnnotation
開啟可通過@CreateCache注解創建Cache實例功能。
@EnableMethodCache
開啟可通過@Cached注解創建Cache實例功能,初始化spring aop,注解說明:
屬性 | 默認值 | 說明 |
---|---|---|
basePackages | 無 | jetcache需要攔截的包名,只有這些包名下的Cache實例才會生效 |
order | Ordered.LOWEST_PRECEDENCE | 指定AOP切面執行過程的順序,默認最低優先級 |
mode | AdviceMode.PROXY | Spring AOP的模式,目前就提供默認值讓你修改 |
proxyTargetClass | false | 無 |
@Cached
為一個方法添加緩存,創建對應的緩存實例,注解可以添加在接口或者類的方法上面,該類必須是spring bean,注解說明:
屬性 | 默認值 | 說明 |
---|---|---|
area | "default" | 如果在配置中配置了多個緩存area,在這里指定使用哪個area。 |
name | 未定義 | 指定緩存實例名稱,如果沒有指定,會根據類名+方法名自動生成。name會被用於遠程緩存的key前綴。另外在統計中,一個簡短有意義的名字會提高可讀性。 |
enabled | true | 是否激活緩存。 |
timeUnit | TimeUnit.SECONDS | 指定expire的單位。 |
expire | 未定義 | 超時時間。如果注解上沒有定義,會使用全局配置,如果此時全局配置也沒有定義,則為無窮大。 |
localExpire | 未定義 | 僅當cacheType為BOTH時適用,為本地緩存指定一個不一樣的超時時間,通常應該小於expire。如果沒有設置localExpire且cacheType為BOTH,那么本地緩存的超時時間和遠程緩存保持一致。 |
cacheType | CacheType.REMOTE | 緩存的類型,支持:REMOTE 、LOCAL 、BOTH ,如果定義為BOTH,會使用LOCAL和REMOTE組合成兩級緩存。 |
localLimit | 未定義 | 如果cacheType為LOCAL或BOTH,這個參數指定本地緩存的最大元素數量,以控制內存占用。如果注解上沒有定義,會使用全局配置,如果此時你沒有定義全局配置,則使用默認的全局配置100。請結合實際業務場景進行設置該值。 |
serialPolicy | 未定義 | 指定遠程緩存VALUE的序列化方式,支持SerialPolicy.JAVA 、SerialPolicy.KRYO 。如果注解上沒有定義,會使用全局配置,如果你沒有定義全局配置,則使用默認的全局配置SerialPolicy.JAVA。 |
keyConvertor | 未定義 | 指定KEY的轉換方式,用於將復雜的KEY類型轉換為緩存實現可以接受的類型,支持:KeyConvertor.FASTJSON 、KeyConvertor.NONE 。NONE表示不轉換,FASTJSON可以將復雜對象KEY轉換成String。如果注解上沒有定義,會使用全局配置。 |
key | 未定義 | 使用SpEL指定緩存key,如果沒有指定會根據入參自動生成。 |
cacheNullValue | false | 當方法返回值為null的時候是否要緩存。 |
condition | 未定義 | 使用SpEL指定條件,如果表達式返回true的時候才去緩存中查詢。 |
postCondition | 未定義 | 使用SpEL指定條件,如果表達式返回true的時候才更新緩存,該評估在方法執行后進行,因此可以訪問到#result。 |
@CacheInvalidate
用於移除緩存,配置說明:
配置 | 默認值 | 說明 |
---|---|---|
area | "default" | 如果在配置中配置了多個緩存area,在這里指定使用哪個area。 |
name | 無 | 指定緩存的唯一名稱,一般指向對應的@Cached定義的name。 |
key | 未定義 | 使用SpEL指定key,如果沒有指定會根據入參自動生成。 |
condition | 未定義 | 使用SpEL指定條件,如果表達式返回true才執行刪除,可訪問方法結果#result。刪除緩存實例中key的元素。 |
multi | false | 如果根據SpEL指定的key是一個集合,是否從緩存實例中刪除對應的每個緩存。如果設置為true,但是key不是集合,則不會刪除緩存。 |
@CacheUpdate
用於更新緩存,配置說明:
配置 | 默認值 | 說明 |
---|---|---|
area | "default" | 如果在配置中配置了多個緩存area,在這里指定使用哪個area。 |
name | 無 | 指定緩存的唯一名稱,一般指向對應的@Cached定義的name。 |
key | 未定義 | 使用SpEL指定key,如果沒有指定會根據入參自動生成。 |
value | 無 | 使用SpEL指定value。 |
condition | 未定義 | 使用SpEL指定條件,如果表達式返回true才執行更新,可訪問方法結果#result。更新緩存實例中key的元素。 |
multi | false | 如果根據SpEL指定key和value都是集合並且元素的個數相同,則是否更新緩存實例中的對應的每個元素。如果設置為true,但是key不是集合或者value不是集合或者它們的元素的個數不相同,也不會更新緩存。 |
@CacheRefresh
用於自定刷新緩存,配置說明:
配置 | 默認值 | 說明 |
---|---|---|
refresh | 無 | 刷新間隔 |
stopRefreshAfterLastAccess | 未定義 | 指定該key多長時間沒有訪問就停止刷新,如果不指定會一直刷新。 |
refreshLockTimeout | 60秒 | 類型為BOTH/REMOTE的緩存刷新時,同時只會有一台服務器在刷新,這台服務器會在遠程緩存放置一個分布式鎖,此配置指定該鎖的超時時間。 |
timeUnit | TimeUnit.SECONDS | 指定refresh時間單位。 |
@CachePenetrationProtect
當緩存訪問未命中的情況下,對並發進行的加載行為進行保護。 當前版本實現的是單JVM內的保護,即同一個JVM中同一個key只有一個線程去加載,其它線程等待結果,配置說明:
配置 | 默認值 | 說明 |
---|---|---|
value | true | 是否開啟保護模式。 |
timeout | 未定義 | 其他線程的等待超時時間,如果超時則自己執行方法直接返回結果。 |
timeUnit | TimeUnit.SECONDS | 指定timeout時間單位。 |
@CreateCache
在Spring Bean中使用該注解可創建一個Cache實例,配置說明:
配置 | 默認值 | 說明 |
---|---|---|
area | "default" | 如果在配置中配置了多個緩存area,在這里指定使用哪個area。 |
name | 未定義 | 指定緩存實例名稱,如果沒有指定,會根據類名+方法名自動生成。name會被用於遠程緩存的key前綴。另外在統計中,一個簡短有意義的名字會提高可讀性。 |
timeUnit | TimeUnit.SECONDS | 指定expire的單位。 |
expire | 未定義 | 超時時間。如果注解上沒有定義,會使用全局配置,如果此時全局配置也沒有定義,則為無窮大。 |
localExpire | 未定義 | 僅當cacheType為BOTH時適用,為本地緩存指定一個不一樣的超時時間,通常應該小於expire。如果沒有設置localExpire且cacheType為BOTH,那么本地緩存的超時時間和遠程緩存保持一致。 |
cacheType | CacheType.REMOTE | 緩存的類型,支持:REMOTE 、LOCAL 、BOTH ,如果定義為BOTH,會使用LOCAL和REMOTE組合成兩級緩存。 |
localLimit | 未定義 | 如果cacheType為LOCAL或BOTH,這個參數指定本地緩存的最大元素數量,以控制內存占用。如果注解上沒有定義,會使用全局配置,如果此時你沒有定義全局配置,則使用默認的全局配置100。請結合實際業務場景進行設置該值。 |
serialPolicy | 未定義 | 指定遠程緩存VALUE的序列化方式,支持SerialPolicy.JAVA 、SerialPolicy.KRYO 。如果注解上沒有定義,會使用全局配置,如果你沒有定義全局配置,則使用默認的全局配置SerialPolicy.JAVA。 |
keyConvertor | 未定義 | 指定KEY的轉換方式,用於將復雜的KEY類型轉換為緩存實現可以接受的類型,支持:KeyConvertor.FASTJSON 、KeyConvertor.NONE 。NONE表示不轉換,FASTJSON可以將復雜對象KEY轉換成String。如果注解上沒有定義,會使用全局配置。 |
使用示例
/**
* 啟動類
*/
@SpringBootApplication
@EnableCreateCacheAnnotation
@EnableMethodCache(basePackages = "com.xxx.xxx")
public class Application {
public static void main(String[] args){
SpringApplication.run(Application.class, args);
}
}
/**
* 接口
*/
public interface JetCacheExampleService {
User getValue(long userId);
void updateValue(User user);
void deleteValue(User user);
}
/**
* 實現類
*/
@Service
public class JetCacheExampleServiceImpl implements JetCacheExampleService {
@CreateCache(name = "JetCacheExampleServiceImpl.exampleCache" , localLimit = 50 ,cacheType = CacheType.LOCAL)
@CachePenetrationProtect
private Cache<Long, User> exampleCache;
@Override
@Cached(name = "JetCacheExampleService.getValue", expire = 3600 * 6, localLimit = 50, cacheType = CacheType.BOTH)
@CacheRefresh(refresh = 3600, stopRefreshAfterLastAccess = 3600 * 2)
@CachePenetrationProtect
public User getValue(long userId){
String result = new User();
// ... 處理邏輯
return result;
}
@Override
@CacheUpdate(name = "JetCacheExampleService.getValue", key="#user.userId", value="#user")
public void updateValue(User user){
// 處理邏輯
}
@Override
@CacheInvalidate(name = "JetCacheExampleService.getValue", key="#user.userId")
public void deleteValue(User user){
// 處理邏輯
}
}
如上述所示
getValue方法會創建一個緩存實例,通過@Cached
注解可以看到緩存實例名稱cacheName
為'JetCacheExampleService.getValue',緩存的有效時長為6小時,本地緩存的數量最多為50,緩存類型為BOTH
(優先從本地緩存獲取);通過@CacheRefresh
注解可以看到會為該緩存實例設置一個刷新策略,刷新間隔為1小時,2個小時沒訪問后不再刷新,需要刷新的緩存實例會為其每一個緩存數據創建一個RefreshTask
周期性任務;@CachePenetrationProtect
注解表示該緩存實例開啟保護模式,當緩存未命中,同一個JVM中同一個key只有一個線程去加載數據,其它線程等待結果。
updateValue方法可以更新緩存,通過@CacheUpdate
注解可以看到會更新緩存實例'JetCacheExampleService.getValue'中緩存key為#user.userId的緩存value為#user。
deleteValue方法可以刪除緩存,通過@CacheInvalidate
注解可以看到會刪除緩存實例'JetCacheExampleService.getValue'中緩存key為#user.userId緩存數據。
exampleCache字段會作為一個緩存實例對象,通過@CreateCache
注解可以看到,會將該字段作為cacheName
為'JetCacheExampleService.getValue'緩存實例對象,本地緩存的數量最多為50,緩存類型為LOCAL
,@CachePenetrationProtect
注解表示該緩存實例開啟保護模式。
我的業務場景是使用上述的getValue方法創建緩存實例即可。
注意:
@Cached
注解不能和@CacheUpdate
或者@CacheInvalidate
同時使用@CacheInvalidate
可以多個同時使用
另外通過@CreateCache注解創建緩存實例也可以這樣初始化:
@Service
public class JetCacheExampleServiceImpl implements JetCacheExampleService {
@CreateCache(name = "JetCacheExampleServiceImpl.exampleCache" , localLimit = 50 ,cacheType = CacheType.LOCAL)
private Cache<Long, User> exampleCache;
@PostConstruct
public exampleCacheInit(){
RefreshPolicy policy = RefreshPolicy.newPolicy(60, TimeUnit.MINUTES)
.stopRefreshAfterLastAccess(120, TimeUnit.MINUTES);
exampleCache.config().setLoader(this::loadFromDatabase);
exampleCache.config().setRefreshPolicy(policy);
}
}
更加詳細的使用方法請參考JetCache
官方地址。
三、源碼解析
參考本人Git倉庫中的JetCache
項目,已做詳細的注釋。
簡單概括:利用Spring AOP功能,在調用需要緩存的方法前,通過解析注解獲取緩存配置,根據這些配置創建不同的實例對象,進行緩存等操作。
JetCache
分為兩部分,一部分是Cache API以及實現,另一部分是注解支持。
項目的各個子模塊
-
jetcache-anno-api:定義
JetCache
注解和常量。 -
jetcache-core:核心API,Cache接口的實現,提供各種緩存實例的操作,不依賴於Spring。
-
jetcache-autoconfigure:完成初始化,解析application.yml配置文件中的相關配置,以提供不同緩存實例的
CacheBuilder
構造器 -
jetcache-anno:基於Spring提供
@Cached
和@CreateCache
注解支持,初始化Spring AOP以及JetCache
注解等配置。 -
jetcache-redis:使用Jedis提供Redis支持。
-
jetcache-redis-lettuce:使用Lettuce提供Redis支持,實現了
JetCache
異步訪問緩存的的接口。 -
jetcache-redis-springdata:使用Spring Data提供Redis支持。
-
jetcache-starter-redis:提供pom文件,Spring Boot方式的Starter,基於Jedis。
-
jetcache-starter-redis-lettuce:提供pom文件,Spring Boot方式的Starter,基於Lettuce。
-
jetcache-starter-redis-springdata:提供pom文件,Spring Boot方式的Starter,基於Spring Data。
-
jetcache-test:提供相關測試。
常用注解與變量

在jetcache-anno-api模塊中定義了需要用的緩存注解與常量,在上述已經詳細的講述過,其中@CacheInvalidateContainer
注解定義value為@CacheInvalidate
數組,然后通過jdk8新增的@Repeatable
注解,在@CacheInvalidate
注解上面添加@Repeatable(CacheInvalidateContainer.class)
,即可支持同一個地方可以使用多個@CacheInvalidate
注解。
緩存API
主要查看jetcache-core子模塊,提供各種Cache
緩存,以支持不同的緩存類型
Cache接口的子關系,結構如下圖:

主要對象描述:
- Cache:緩存接口,定義基本方法
- AbstractCache:抽象類,緩存接口的繼承者,提供基本實現,具體實現交由不同的子類
- LinkedHashMapCache:基於LinkedHashMap設計的簡易內存緩存
- CaffeineCache:基於Caffeine工具設計的內存緩存
- RedisCache:Redis實現,使用Jedis客戶端
- RedisLettuceCache:Redis實現,使用Lettuce客戶端
- MultiLevelCache:兩級緩存,用於封裝EmbeddedCache(本地緩存)和ExternalCache(遠程緩存)
- RefreshCache:基於裝飾器模式Decorator,提供自動刷新功能
- LazyInitCache:用於@CreateCache注解創建的緩存實例,依賴於Spring
Cache接口
com.alicp.jetcache.Cache
接口,定義了緩存實例的操作方法(部分有默認實現),以及獲取分布式鎖(非嚴格,用於刷新遠程緩存)的實現,因為繼承了java.io.Closeable
接口,所以也提供了close方法的默認實現,空方法,交由不同緩存實例的實現去實現該方法用於釋放資源,在com.alicp.jetcache.anno.support.ConfigProvider.doShutdown()
方法中會調用每個緩存實例對象的close方法進行資源釋放。主要代碼如下:
public interface Cache<K, V> extends Closeable {
Logger logger = LoggerFactory.getLogger(Cache.class);
//-----------------------------JSR 107 style API------------------------------------------------
default V get(K key) throws CacheInvokeException {
CacheGetResult<V> result = GET(key);
if (result.isSuccess()) {
return result.getValue();
} else {
return null;
}
}
default Map<K, V> getAll(Set<? extends K> keys) throws CacheInvokeException {
MultiGetResult<K, V> cacheGetResults = GET_ALL(keys);
return cacheGetResults.unwrapValues();
}
default void put(K key, V value) {
PUT(key, value);
}
default void putAll(Map<? extends K, ? extends V> map) {
PUT_ALL(map);
}
default boolean putIfAbsent(K key, V value) { // 多級緩存MultiLevelCache不支持此方法
CacheResult result = PUT_IF_ABSENT(key, value, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
return result.getResultCode() == CacheResultCode.SUCCESS;
}
default boolean remove(K key) {
return REMOVE(key).isSuccess();
}
default void removeAll(Set<? extends K> keys) {
REMOVE_ALL(keys);
}
<T> T unwrap(Class<T> clazz);
@Override
default void close() {
}
//--------------------------JetCache API---------------------------------------------
CacheConfig<K, V> config();
default AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit) {
if (key == null) {
return null;
}
// 隨機生成一個值
final String uuid = UUID.randomUUID().toString();
// 過期時間
final long expireTimestamp = System.currentTimeMillis() + timeUnit.toMillis(expire);
final CacheConfig config = config();
AutoReleaseLock lock = () -> { // 創建一把會自動釋放資源的鎖,實現其 close() 方法
int unlockCount = 0;
while (unlockCount++ < config.getTryLockUnlockCount()) {
if(System.currentTimeMillis() < expireTimestamp) { // 這把鎖還沒有過期,則刪除
// 刪除對應的 Key 值
// 出現的結果:成功,失敗,Key 不存在
CacheResult unlockResult = REMOVE(key);
if (unlockResult.getResultCode() == CacheResultCode.FAIL
|| unlockResult.getResultCode() == CacheResultCode.PART_SUCCESS) {
// 刪除對應的 Key 值過程中出現了異常,則重試
logger.info("[tryLock] [{} of {}] [{}] unlock failed. Key={}, msg = {}",
unlockCount, config.getTryLockUnlockCount(), uuid, key, unlockResult.getMessage());
// retry
} else if (unlockResult.isSuccess()) { // 釋放成功
logger.debug("[tryLock] [{} of {}] [{}] successfully release the lock. Key={}",
unlockCount, config.getTryLockUnlockCount(), uuid, key);
return;
} else { // 鎖已經被釋放了
logger.warn("[tryLock] [{} of {}] [{}] unexpected unlock result: Key={}, result={}",
unlockCount, config.getTryLockUnlockCount(), uuid, key, unlockResult.getResultCode());
return;
}
} else { // 該鎖已失效
logger.info("[tryLock] [{} of {}] [{}] lock already expired: Key={}",
unlockCount, config.getTryLockUnlockCount(), uuid, key);
return;
}
}
};
int lockCount = 0;
Cache cache = this;
while (lockCount++ < config.getTryLockLockCount()) {
// 往 Redis(或者本地) 中存放 Key 值(_#RL#結尾的Key)
// 返回的結果:成功、已存在、失敗
CacheResult lockResult = cache.PUT_IF_ABSENT(key, uuid, expire, timeUnit);
if (lockResult.isSuccess()) { // 成功獲取到鎖
logger.debug("[tryLock] [{} of {}] [{}] successfully get a lock. Key={}",
lockCount, config.getTryLockLockCount(), uuid, key);
return lock;
} else if (lockResult.getResultCode() == CacheResultCode.FAIL || lockResult.getResultCode() == CacheResultCode.PART_SUCCESS) {
logger.info("[tryLock] [{} of {}] [{}] cache access failed during get lock, will inquiry {} times. Key={}, msg={}",
lockCount, config.getTryLockLockCount(), uuid,
config.getTryLockInquiryCount(), key, lockResult.getMessage());
// 嘗試獲取鎖的過程中失敗了,也就是往 Redis 中存放 Key 值出現異常
// 這個時候可能 Key 值已經存儲了,但是由於其他原因導致返回的結果表示執行失敗
int inquiryCount = 0;
while (inquiryCount++ < config.getTryLockInquiryCount()) {
CacheGetResult inquiryResult = cache.GET(key);
if (inquiryResult.isSuccess()) {
if (uuid.equals(inquiryResult.getValue())) {
logger.debug("[tryLock] [{} of {}] [{}] successfully get a lock after inquiry. Key={}",
inquiryCount, config.getTryLockInquiryCount(), uuid, key);
return lock;
} else {
logger.debug("[tryLock] [{} of {}] [{}] not the owner of the lock, return null. Key={}",
inquiryCount, config.getTryLockInquiryCount(), uuid, key);
return null;
}
} else {
logger.info("[tryLock] [{} of {}] [{}] inquiry failed. Key={}, msg={}",
inquiryCount, config.getTryLockInquiryCount(), uuid, key, inquiryResult.getMessage());
// retry inquiry
}
}
} else { // 已存在表示該鎖被其他人占有
// others holds the lock
logger.debug("[tryLock] [{} of {}] [{}] others holds the lock, return null. Key={}",
lockCount, config.getTryLockLockCount(), uuid, key);
return null;
}
}
logger.debug("[tryLock] [{}] return null after {} attempts. Key={}", uuid, config.getTryLockLockCount(), key);
return null;
}
default boolean tryLockAndRun(K key, long expire, TimeUnit timeUnit, Runnable action){
// Release the lock use Java 7 try-with-resources.
try (AutoReleaseLock lock = tryLock(key, expire, timeUnit)) { // 嘗試獲取鎖
if (lock != null) { // 獲取到鎖則執行下面的任務
action.run();
return true;
} else {
return false;
}
// 執行完鎖的操作后會進行資源釋放,調用 AutoCloseable 的 close() 方法
}
}
CacheGetResult<V> GET(K key);
MultiGetResult<K, V> GET_ALL(Set<? extends K> keys);
default V computeIfAbsent(K key, Function<K, V> loader) {
return computeIfAbsent(key, loader, config().isCacheNullValue());
}
V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull);
V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull, long expireAfterWrite, TimeUnit timeUnit);
default void put(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
PUT(key, value, expireAfterWrite, timeUnit);
}
default CacheResult PUT(K key, V value) {
if (key == null) {
return CacheResult.FAIL_ILLEGAL_ARGUMENT;
}
return PUT(key, value, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
}
CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
default void putAll(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
PUT_ALL(map, expireAfterWrite, timeUnit);
}
default CacheResult PUT_ALL(Map<? extends K, ? extends V> map) {
if (map == null) {
return CacheResult.FAIL_ILLEGAL_ARGUMENT;
}
return PUT_ALL(map, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
}
CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit);
CacheResult REMOVE(K key);
CacheResult REMOVE_ALL(Set<? extends K> keys);
CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
}
com.alicp.jetcache.Cache
定義的方法大都是關於緩存的獲取、刪除和存放操作
-
其中大寫的方法返回
JetCache
自定義的CacheResult(完整的返回值,可以清晰的知道執行結果,例如get返回null的時候,無法斷定是對應的key不存在,還是訪問緩存發生了異常) -
小寫的方法默認實現就是調用大寫的方法
-
computeIfAbsent
方法最為核心,交由子類去實現 -
tryLockAndRun
方法會非堵塞的嘗試獲取一把AutoReleaseLock分布式鎖(非嚴格),獲取過程:- 嘗試往Redis中設置(已存在無法設置)一個鍵值對,key為緩存
key_#RL#
,value為UUID
,並設置這個鍵值對的過期時間為60秒(默認) - 如果獲取到鎖后進行加載任務,也就是重新加載方法並更新遠程緩存
- 該鎖實現了java.lang.AutoCloseable接口,使用try-with-resource方式,在執行完加載任務后會自動釋放資源,也就是調用close方法將獲取鎖過程中設置的鍵值對從Redis中刪除
- 在RefreshCache中會調用該方法,因為如果存在遠程緩存需要刷新則需要采用分布式鎖的方式
- 嘗試往Redis中設置(已存在無法設置)一個鍵值對,key為緩存
AbstractCache抽象類
com.alicp.jetcache.AbstractCache
抽象類,實現了Cache接口,主要代碼如下:
public abstract class AbstractCache<K, V> implements Cache<K, V> {
/**
* 當緩存未命中時,並發情況同一個Key是否只允許一個線程去加載,其他線程等待結果(可以設置timeout,超時則自己加載並直接返回)
* 如果是的話則由獲取到Key對應的 LoaderLock.signal(采用了 CountDownLatch)的線程進行加載
* loaderMap臨時保存 Key 對應的 LoaderLock 對象
*/
private volatile ConcurrentHashMap<Object, LoaderLock> loaderMap;
ConcurrentHashMap<Object, LoaderLock> initOrGetLoaderMap() {
if (loaderMap == null) {
synchronized (this) {
if (loaderMap == null) {
loaderMap = new ConcurrentHashMap<>();
}
}
}
return loaderMap;
}
@Override
public final V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull) {
return computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
0, null, this);
}
@Override
public final V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
long expireAfterWrite, TimeUnit timeUnit) {
return computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
expireAfterWrite, timeUnit, this);
}
private static <K, V> boolean needUpdate(V loadedValue, boolean cacheNullWhenLoaderReturnNull, Function<K, V> loader) {
if (loadedValue == null && !cacheNullWhenLoaderReturnNull) {
return false;
}
if (loader instanceof CacheLoader && ((CacheLoader<K, V>) loader).vetoCacheUpdate()) {
return false;
}
return true;
}
static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {
// 獲取內部的 Cache 對象
AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
// 封裝 loader 函數成一個 ProxyLoader 對象,主要在重新加載緩存后發出一個 CacheLoadEvent 到 CacheMonitor
CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify);
CacheGetResult<V> r;
if (cache instanceof RefreshCache) { // 該緩存實例需要刷新
RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache);
/*
* 從緩存中獲取數據
* 如果是多級緩存(先從本地緩存獲取,獲取不到則從遠程緩存獲取)
* 如果緩存數據是從遠程緩存獲取到的數據則會更新至本地緩存,並且如果本地緩存沒有設置 localExpire 則使用遠程緩存的到期時間作為自己的到期時間
* 我一般不設置 localExpire ,因為可能導致本地緩存的有效時間比遠程緩存的有效時間更長
* 如果設置 localExpire 了記得設置 expireAfterAccessInMillis
*/
r = refreshCache.GET(key);
// 添加/更新當前 RefreshCache 的刷新緩存任務,存放於 RefreshCache 的 taskMap 中
refreshCache.addOrUpdateRefreshTask(key, newLoader);
} else {
// 從緩存中獲取數據
r = cache.GET(key);
}
if (r.isSuccess()) { // 緩存命中
return r.getValue();
} else { // 緩存未命中
// 創建當緩存未命中去更新緩存的函數
Consumer<V> cacheUpdater = (loadedValue) -> {
if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {
/*
* 未在緩存注解中配置 key 的生成方式則默認取入參作為緩存 key
* 在進入當前方法時是否可以考慮為 key 創建一個副本????
* 因為緩存未命中然后通過 loader 重新加載方法時,如果方法內部對入參進行了修改,那么生成的緩存 key 也會被修改
* 從而導致相同的 key 進入該方法時一直與緩存中的 key 不相同,一直出現緩存未命中
*/
if (timeUnit != null) {
cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult();
} else {
cache.PUT(key, loadedValue).waitForResult();
}
}
};
V loadedValue;
if (cache.config().isCachePenetrationProtect()) { // 添加了 @CachePenetrationProtect 注解
// 一個JVM只允許一個線程執行
loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater);
} else {
// 執行方法
loadedValue = newLoader.apply(key);
// 將新的結果異步緩存
cacheUpdater.accept(loadedValue);
}
return loadedValue;
}
}
static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache,
K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) {
ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap();
Object lockKey = buildLoaderLockKey(abstractCache, key);
while (true) {
// 為什么加一個 create[] 數組 疑問??
boolean create[] = new boolean[1];
LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
create[0] = true;
LoaderLock loaderLock = new LoaderLock();
loaderLock.signal = new CountDownLatch(1);
loaderLock.loaderThread = Thread.currentThread();
return loaderLock;
});
if (create[0] || ll.loaderThread == Thread.currentThread()) {
try {
// 加載該 Key 實例的方法
V loadedValue = newLoader.apply(key);
ll.success = true;
ll.value = loadedValue;
// 將重新加載的數據更新至緩存
cacheUpdater.accept(loadedValue);
return loadedValue;
} finally {
// 標記已完成
ll.signal.countDown();
if (create[0]) {
loaderMap.remove(lockKey);
}
}
} else { // 等待其他線程加載,如果出現異常或者超時則自己加載返回數據,但是不更新緩存
try {
Duration timeout = config.getPenetrationProtectTimeout();
if (timeout == null) {
ll.signal.await();
} else {
boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
if(!ok) {
logger.info("loader wait timeout:" + timeout);
return newLoader.apply(key);
}
}
} catch (InterruptedException e) {
logger.warn("loader wait interrupted");
return newLoader.apply(key);
}
if (ll.success) {
return (V) ll.value;
} else {
continue;
}
}
}
}
private static Object buildLoaderLockKey(Cache c, Object key) {
if (c instanceof AbstractEmbeddedCache) {
return ((AbstractEmbeddedCache) c).buildKey(key);
} else if (c instanceof AbstractExternalCache) {
byte bytes[] = ((AbstractExternalCache) c).buildKey(key);
return ByteBuffer.wrap(bytes);
} else if (c instanceof MultiLevelCache) {
c = ((MultiLevelCache) c).caches()[0];
return buildLoaderLockKey(c, key);
} else if(c instanceof ProxyCache) {
c = ((ProxyCache) c).getTargetCache();
return buildLoaderLockKey(c, key);
} else {
throw new CacheException("impossible");
}
}
/**
* 重新加載數據鎖
*/
static class LoaderLock {
/**
* 柵欄
*/
CountDownLatch signal;
/**
* 持有的線程
*/
Thread loaderThread;
/**
* 是否加載成功
*/
boolean success;
/**
* 加載出來的數據
*/,
Object value;
}
}
com.alicp.jetcache.AbstractCache
實現了Cache
接口的大寫方法,內部調用自己定義的抽象方法(以DO_
開頭,交由不同的子類實現),操作緩存后發送相應的事件CacheEvent
,也就是調用自己定義的notify方法,遍歷每個CacheMonitor
對該事件進行后置操作,用於統計信息。
computeIfAbsentImpl
方法實現了Cache
接口的核心方法,從緩存實例中根據緩存key獲取緩存value,邏輯如下:
-
獲取cache的targetCache,因為我們通過
@CreateCache
注解創建的緩存實例將生成LazyInitCache
對象,需要調用其getTargetCache方法才會完成緩存實例的初始化 -
loader函數是對加載原有方法的封裝,這里再進行一層封裝,封裝成
ProxyLoader
類型,目的是在加載原有方法后將發送CacheLoadEvent
事件 -
從緩存實例中獲取對應的緩存value,如果緩存實例對象是
RefreshCache
類型(在com.alicp.jetcache.anno.support.CacheContext.buildCache
方法中會將cache包裝成CacheHandlerRefreshCache
),則調用RefreshCache.addOrUpdateRefreshTask
方法,判斷是否應該為它添加一個定時的刷新任務 -
如果緩存未命中,則執行loader函數,如果開啟了保護模式,則調用自定義的synchronizedLoad方法,大致邏輯:根據緩存key從自己的loaderMap(線程安全)遍歷中嘗試獲取(不存在則創建)
LoaderLock
加載鎖,獲取到這把加載鎖才可以執行loader函數,如果已被其他線程占有則進行等待(沒有設置超時時間則一直等待),通過CountDownLatch
計數器實現
AbstractEmbeddedCache本地緩存
com.alicp.jetcache.embedded.AbstractEmbeddedCache
抽象類繼承AbstractCache抽象類,定義了本地緩存的存放緩存數據的對象為com.alicp.jetcache.embedded.InnerMap
接口和一個初始化該接口的createAreaCache抽象方法,基於InnerMap接口實現以DO_
開頭的方法,完成緩存實例各種操作的具體實現,主要代碼如下:
public abstract class AbstractEmbeddedCache<K, V> extends AbstractCache<K, V> {
protected EmbeddedCacheConfig<K, V> config;
/**
* 本地緩存的 Map
*/
protected InnerMap innerMap;
protected abstract InnerMap createAreaCache();
public AbstractEmbeddedCache(EmbeddedCacheConfig<K, V> config) {
this.config = config;
innerMap = createAreaCache();
}
@Override
public CacheConfig<K, V> config() {
return config;
}
public Object buildKey(K key) {
Object newKey = key;
Function<K, Object> keyConvertor = config.getKeyConvertor();
if (keyConvertor != null) {
newKey = keyConvertor.apply(key);
}
return newKey;
}
@Override
protected CacheGetResult<V> do_GET(K key) {
Object newKey = buildKey(key);
CacheValueHolder<V> holder = (CacheValueHolder<V>) innerMap.getValue(newKey);
return parseHolderResult(holder);
}
protected CacheGetResult<V> parseHolderResult(CacheValueHolder<V> holder) {
long now = System.currentTimeMillis();
if (holder == null) {
return CacheGetResult.NOT_EXISTS_WITHOUT_MSG;
} else if (now >= holder.getExpireTime()) {
return CacheGetResult.EXPIRED_WITHOUT_MSG;
} else {
synchronized (holder) {
long accessTime = holder.getAccessTime();
if (config.isExpireAfterAccess()) {
long expireAfterAccess = config.getExpireAfterAccessInMillis();
if (now >= accessTime + expireAfterAccess) {
return CacheGetResult.EXPIRED_WITHOUT_MSG;
}
}
// 設置該緩存數據的最后一次訪問時間
holder.setAccessTime(now);
}
return new CacheGetResult(CacheResultCode.SUCCESS, null, holder);
}
}
@Override
protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
ArrayList<K> keyList = new ArrayList<K>(keys.size());
ArrayList<Object> newKeyList = new ArrayList<Object>(keys.size());
keys.stream().forEach((k) -> {
Object newKey = buildKey(k);
keyList.add(k);
newKeyList.add(newKey);
});
Map<Object, CacheValueHolder<V>> innerResultMap = innerMap.getAllValues(newKeyList);
Map<K, CacheGetResult<V>> resultMap = new HashMap<>();
for (int i = 0; i < keyList.size(); i++) {
K key = keyList.get(i);
Object newKey = newKeyList.get(i);
CacheValueHolder<V> holder = innerResultMap.get(newKey);
resultMap.put(key, parseHolderResult(holder));
}
MultiGetResult<K, V> result = new MultiGetResult<>(CacheResultCode.SUCCESS, null, resultMap);
return result;
}
@Override
protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
CacheValueHolder<V> cacheObject = new CacheValueHolder(value ,timeUnit.toMillis(expireAfterWrite));
innerMap.putValue(buildKey(key), cacheObject);
return CacheResult.SUCCESS_WITHOUT_MSG;
}
@Override
protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
HashMap newKeyMap = new HashMap();
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheValueHolder<V> cacheObject = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
newKeyMap.put(buildKey(en.getKey()), cacheObject);
}
innerMap.putAllValues(newKeyMap);
final HashMap resultMap = new HashMap();
map.keySet().forEach((k) -> resultMap.put(k, CacheResultCode.SUCCESS));
return CacheResult.SUCCESS_WITHOUT_MSG;
}
@Override
protected CacheResult do_REMOVE(K key) {
innerMap.removeValue(buildKey(key));
return CacheResult.SUCCESS_WITHOUT_MSG;
}
@Override
protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) {
Set newKeys = keys.stream().map((key) -> buildKey(key)).collect(Collectors.toSet());
innerMap.removeAllValues(newKeys);
final HashMap resultMap = new HashMap();
keys.forEach((k) -> resultMap.put(k, CacheResultCode.SUCCESS));
return CacheResult.SUCCESS_WITHOUT_MSG;
}
@Override
protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
CacheValueHolder<V> cacheObject = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite));
if (innerMap.putIfAbsentValue(buildKey(key), cacheObject)) {
return CacheResult.SUCCESS_WITHOUT_MSG;
} else {
return CacheResult.EXISTS_WITHOUT_MSG;
}
}
}
com.alicp.jetcache.embedded.AbstractEmbeddedCache
抽象類實現了操作本地緩存的相關方法
-
定義了緩存實例對象本地緩存的配置信息
EmbeddedCacheConfig
對象 -
定義了緩存實例對象本地緩存基於內存操作緩存數據的
InnerMap
對象,它的初始化過程交由不同的內存緩存實例(LinkedHashMapCache和CaffeineCache)
LinkedHashMapCache
com.alicp.jetcache.embedded.LinkedHashMapCache
基於LinkedHashMap完成緩存實例對象本地緩存基於內存操作緩存數據的InnerMap
對象的初始化工作,主要代碼如下:
public class LinkedHashMapCache<K, V> extends AbstractEmbeddedCache<K, V> {
private static Logger logger = LoggerFactory.getLogger(LinkedHashMapCache.class);
public LinkedHashMapCache(EmbeddedCacheConfig<K, V> config) {
super(config);
// 將緩存實例添加至 Cleaner
addToCleaner();
}
protected void addToCleaner() {
Cleaner.add(this);
}
@Override
protected InnerMap createAreaCache() {
return new LRUMap(config.getLimit(), this);
}
public void cleanExpiredEntry() {
((LRUMap) innerMap).cleanExpiredEntry();
}
/**
* 用於本地緩存類型為 linkedhashmap 緩存實例存儲緩存數據
*/
final class LRUMap extends LinkedHashMap implements InnerMap {
/**
* 允許的最大緩存數量
*/
private final int max;
/**
* 緩存實例鎖
*/
private Object lock;
public LRUMap(int max, Object lock) {
super((int) (max * 1.4f), 0.75f, true);
this.max = max;
this.lock = lock;
}
/**
* 當元素大於最大值時移除最老的元素
*
* @param eldest 最老的元素
* @return 是否刪除
*/
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > max;
}
/**
* 清理過期的元素
*/
void cleanExpiredEntry() {
synchronized (lock) { // 占有當前緩存實例這把鎖
for (Iterator it = entrySet().iterator(); it.hasNext();) {
Map.Entry en = (Map.Entry) it.next();
Object value = en.getValue();
if (value != null && value instanceof CacheValueHolder) {
CacheValueHolder h = (CacheValueHolder) value;
/*
* 緩存的數據已經失效了則刪除
* 為什么不對 expireAfterAccess 進行判斷,取最小值,疑問????
*/
if (System.currentTimeMillis() >= h.getExpireTime()) {
it.remove();
}
} else {
// assert false
if (value == null) {
logger.error("key " + en.getKey() + " is null");
} else {
logger.error("value of key " + en.getKey() + " is not a CacheValueHolder. type=" + value.getClass());
}
}
}
}
}
@Override
public Object getValue(Object key) {
synchronized (lock) {
return get(key);
}
}
@Override
public Map getAllValues(Collection keys) {
Map values = new HashMap();
synchronized (lock) {
for (Object key : keys) {
Object v = get(key);
if (v != null) {
values.put(key, v);
}
}
}
return values;
}
@Override
public void putValue(Object key, Object value) {
synchronized (lock) {
put(key, value);
}
}
@Override
public void putAllValues(Map map) {
synchronized (lock) {
Set<Map.Entry> set = map.entrySet();
for (Map.Entry en : set) {
put(en.getKey(), en.getValue());
}
}
}
@Override
public boolean removeValue(Object key) {
synchronized (lock) {
return remove(key) != null;
}
}
@Override
public void removeAllValues(Collection keys) {
synchronized (lock) {
for (Object k : keys) {
remove(k);
}
}
}
@Override
@SuppressWarnings("unchecked")
public boolean putIfAbsentValue(Object key, Object value) {
/*
* 如果緩存 key 不存在,或者對應的 value 已經失效則放入,否則返回 false
*/
synchronized (lock) {
CacheValueHolder h = (CacheValueHolder) get(key);
if (h == null || parseHolderResult(h).getResultCode() == CacheResultCode.EXPIRED) {
put(key, value);
return true;
} else {
return false;
}
}
}
}
}
com.alicp.jetcache.embedded.LinkedHashMapCache
自定義LRUMap
繼承LinkedHashMap並實現InnerMap接口
-
自定義
max
字段,存儲元素個數的最大值,並設置初始容量為(max * 1.4f) -
自定義
lock
字段,每個緩存實例的鎖,通過synchronized關鍵詞保證線程安全,所以性能相對來說不好 -
覆蓋LinkedHashMap的
removeEldestEntry
方法,當元素大於最大值時移除最老的元素 -
自定義
cleanExpiredEntry
方法,遍歷Map,根據緩存value(被封裝成的com.alicp.jetcache.CacheValueHolder
對象,包含緩存數據、失效時間戳和第一次訪問的時間),清理過期的元素 -
該對象初始化時會被添加至
com.alicp.jetcache.embedded.Cleaner
清理器中,Cleaner會周期性(每隔60秒)遍歷LinkedHashMapCache緩存實例,調用其cleanExpiredEntry方法
Cleaner清理器
com.alicp.jetcache.embedded.Cleaner
用於清理緩存類型為LinkedHashMapCache的緩存數據,請查看相應注釋,代碼如下:
/**
* 執行任務:定時清理(每分鍾) LinkedHashMapCache 緩存實例中過期的緩存數據
*/
class Cleaner {
/**
* 存放弱引用對象,以防內存溢出
* 如果被弱引用的對象只被當前弱引用對象關聯時,gc 時被弱引用的對象則會被回收(取決於被弱引用的對象是否還與其他強引用對象關聯)
*
* 個人理解:當某個 LinkedHashMapCache 強引用對象沒有被其他對象(除了這里)引用時,我們應該讓這個對象被回收,
* 但是由於這里使用的也是強引用,這個對象被其他強引用對象關聯了,不可能被回收,存在內存溢出的危險,
* 所以這里使用了弱引用對象,如果被弱引用的對象沒有被其他對象(除了這里)引用時,這個對象會被回收
*
* 舉個例子:如果我們往一個 Map<Object, Object> 中存放一個key-value鍵值對
* 假設對應的鍵已經不再使用被回收了,那我們無法再獲取到對應的值,也無法被回收,占有一定的內存,存在風險
*/
static LinkedList<WeakReference<LinkedHashMapCache>> linkedHashMapCaches = new LinkedList<>();
static {
// 創建一個線程池,1個核心線程
ScheduledExecutorService executorService = JetCacheExecutor.defaultExecutor();
// 起一個循環任務一直清理 linkedHashMapCaches 過期的數據(每隔60秒)
executorService.scheduleWithFixedDelay(() -> run(), 60, 60, TimeUnit.SECONDS);
}
static void add(LinkedHashMapCache cache) {
synchronized (linkedHashMapCaches) {
// 創建一個弱引用對象,並添加到清理對象中
linkedHashMapCaches.add(new WeakReference<>(cache));
}
}
static void run() {
synchronized (linkedHashMapCaches) {
Iterator<WeakReference<LinkedHashMapCache>> it = linkedHashMapCaches.iterator();
while (it.hasNext()) {
WeakReference<LinkedHashMapCache> ref = it.next();
// 獲取被弱引用的對象(強引用)
LinkedHashMapCache c = ref.get();
if (c == null) { // 表示被弱引用的對象被標記成了垃圾,則移除
it.remove();
} else {
c.cleanExpiredEntry();
}
}
}
}
}
CaffeineCache
com.alicp.jetcache.embedded.CaffeineCache
基於Caffeine完成緩存實例對象本地緩存基於內存操作緩存數據的InnerMap
對象的初始化工作,主要代碼如下:
public class CaffeineCache<K, V> extends AbstractEmbeddedCache<K, V> {
/**
* 緩存實例對象
*/
private com.github.benmanes.caffeine.cache.Cache cache;
public CaffeineCache(EmbeddedCacheConfig<K, V> config) {
super(config);
}
/**
* 初始化本地緩存的容器
*
* @return Map對象
*/
@Override
@SuppressWarnings("unchecked")
protected InnerMap createAreaCache() {
Caffeine<Object, Object> builder = Caffeine.newBuilder();
// 設置緩存實例的最大緩存數量
builder.maximumSize(config.getLimit());
final boolean isExpireAfterAccess = config.isExpireAfterAccess();
final long expireAfterAccess = config.getExpireAfterAccessInMillis();
// 設置緩存實例的緩存數據的失效策略
builder.expireAfter(new Expiry<Object, CacheValueHolder>() {
/**
* 獲取緩存的有效時間
*
* @param value 緩存數據
* @return 有效時間
*/
private long getRestTimeInNanos(CacheValueHolder value) {
long now = System.currentTimeMillis();
long ttl = value.getExpireTime() - now;
/*
* 如果本地緩存設置了多長時間沒訪問緩存則失效
*/
if(isExpireAfterAccess){
// 設置緩存的失效時間
// 多長時間沒訪問緩存則失效 and 緩存的有效時長取 min
ttl = Math.min(ttl, expireAfterAccess);
}
return TimeUnit.MILLISECONDS.toNanos(ttl);
}
@Override
public long expireAfterCreate(Object key, CacheValueHolder value, long currentTime) {
return getRestTimeInNanos(value);
}
@Override
public long expireAfterUpdate(Object key, CacheValueHolder value,
long currentTime, long currentDuration) {
return currentDuration;
}
@Override
public long expireAfterRead(Object key, CacheValueHolder value,
long currentTime, long currentDuration) {
return getRestTimeInNanos(value);
}
});
// 構建 Cache 緩存實例
cache = builder.build();
return new InnerMap() {
@Override
public Object getValue(Object key) {
return cache.getIfPresent(key);
}
@Override
public Map getAllValues(Collection keys) {
return cache.getAllPresent(keys);
}
@Override
public void putValue(Object key, Object value) {
cache.put(key, value);
}
@Override
public void putAllValues(Map map) {
cache.putAll(map);
}
@Override
public boolean removeValue(Object key) {
return cache.asMap().remove(key) != null;
}
@Override
public void removeAllValues(Collection keys) {
cache.invalidateAll(keys);
}
@Override
public boolean putIfAbsentValue(Object key, Object value) {
return cache.asMap().putIfAbsent(key, value) == null;
}
};
}
}
com.alicp.jetcache.embedded.CaffeineCache
通過Caffeine構建一個com.github.benmanes.caffeine.cache.Cache
緩存對象,然后實現InnerMap接口,調用這個緩存對象的相關方法
-
構建時設置每個元素的過期時間,也就是根據每個元素(
com.alicp.jetcache.CacheValueHolder
)的失效時間戳來設置,底層如何實現的可以參考Caffeine官方地址 -
調用
com.github.benmanes.caffeine.cache.Cache
的put方法我有遇到過'unable to create native thread'內存溢出的問題,所以請結合實際業務場景合理的設置緩存相關配置
AbstractExternalCache遠程緩存
com.alicp.jetcache.embedded.AbstractExternalCache
抽象類繼承AbstractCache抽象類,定義了緩存實例對象遠程緩存的配置信息ExternalCacheConfig
對象,提供了將緩存key轉換成字節數組的方法,代碼比較簡單。
RedisCache
com.alicp.jetcache.redis.RedisCache
使用Jedis連接Redis,對遠程的緩存數據進行操作,代碼沒有很復雜,可查看我的注釋
-
定義了
com.alicp.jetcache.redis.RedisCacheConfig
配置對象,包含Redis連接池的相關信息 -
實現了以
DO_
開頭的方法,也就是通過Jedis操作緩存數據
RedisLettuceCache
com.alicp.jetcache.redis.lettuce.RedisLettuceCache
使用Lettuce連接Redis,對遠程的緩存數據進行操作,代碼沒有很復雜,可查看我的注釋
-
定義了
com.alicp.jetcache.redis.lettuce.RedisLettuceCacheConfig
配置對象,包含Redis客戶端、與Redis建立的安全連接等信息,因為底層是基於Netty實現的,所以無需配置線程池 -
使用
com.alicp.jetcache.redis.lettuce.LettuceConnectionManager
自定義管理器將與Redis連接的相關信息封裝成LettuceObjects
對象,並管理RedisClient與LettuceObjects對應關系 -
相比Jedis更加安全高效
-
對Lettuce不了解的可以參考我寫的測試類
com.alicp.jetcache.test.external.LettuceTest
MultiLevelCache兩級緩存
當你設置了緩存類型為BOTH兩級緩存,那么創建的實例對象會被封裝成com.alicp.jetcache.MultiLevelCache
對象
-
定義了
caches
字段類型為Cache[],用於保存AbstractEmbeddedCache本地緩存實例和AbstractExternalCache遠程緩存實例,本地緩存存放於遠程緩存前面 -
實現了
do_GET
方法,遍歷caches數組,也就是先從本地緩存獲取,如果獲取緩存不成功則從遠程緩存獲取,成功獲取到緩存后會調用checkResultAndFillUpperCache方法 -
從
checkResultAndFillUpperCache
方法的邏輯可以看到,將獲取到的緩存數據更新至更底層的緩存中,也就是說如果緩存數據是從遠程獲取到的,那么進入這個方法后會將獲取到的緩存數據更新到本地緩存中去,這樣下次請求可以直接從本地緩存獲取,避免與Redis之間的網絡消耗 -
實現了
do_PUT
方法,遍歷caches數組,通過CompletableFuture
進行異步編程,將所有的操作綁定在一條鏈上執行。 -
實現的了
PUT(K key, V value)
方法,會先判斷是否單獨配置了本地緩存時間localExipre,配置了則單獨為本地緩存設置過期時間,沒有配置則到期時間和遠程緩存的一樣 -
覆蓋
tryLock
方法,調用caches[caches.length-1].tryLock方法,也就是只會調用最頂層遠程緩存的這個方法
主要代碼如下:
public class MultiLevelCache<K, V> extends AbstractCache<K, V> {
private Cache[] caches;
private MultiLevelCacheConfig<K, V> config;
@SuppressWarnings("unchecked")
@Deprecated
public MultiLevelCache(Cache... caches) throws CacheConfigException {
this.caches = caches;
checkCaches();
CacheConfig lastConfig = caches[caches.length - 1].config();
config = new MultiLevelCacheConfig<>();
config.setCaches(Arrays.asList(caches));
config.setExpireAfterWriteInMillis(lastConfig.getExpireAfterWriteInMillis());
config.setCacheNullValue(lastConfig.isCacheNullValue());
}
@SuppressWarnings("unchecked")
public MultiLevelCache(MultiLevelCacheConfig<K, V> cacheConfig) throws CacheConfigException {
this.config = cacheConfig;
this.caches = cacheConfig.getCaches().toArray(new Cache[]{});
checkCaches();
}
private void checkCaches() {
if (caches == null || caches.length == 0) {
throw new IllegalArgumentException();
}
for (Cache c : caches) {
if (c.config().getLoader() != null) {
throw new CacheConfigException("Loader on sub cache is not allowed, set the loader into MultiLevelCache.");
}
}
}
public Cache[] caches() {
return caches;
}
@Override
public MultiLevelCacheConfig<K, V> config() {
return config;
}
@Override
public CacheResult PUT(K key, V value) {
if (config.isUseExpireOfSubCache()) { // 本地緩存使用自己的失效時間
// 設置了TimeUnit為null,本地緩存則使用自己的到期時間
return PUT(key, value, 0, null);
} else {
return PUT(key, value, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
}
}
@Override
public CacheResult PUT_ALL(Map<? extends K, ? extends V> map) {
if (config.isUseExpireOfSubCache()) {
return PUT_ALL(map, 0, null);
} else {
return PUT_ALL(map, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
}
}
@Override
protected CacheGetResult<V> do_GET(K key) {
// 遍歷多級緩存(遠程緩存排在后面)
for (int i = 0; i < caches.length; i++) {
Cache cache = caches[i];
CacheGetResult result = cache.GET(key);
if (result.isSuccess()) {
CacheValueHolder<V> holder = unwrapHolder(result.getHolder());
/*
* 這個遍歷是從低層的緩存開始獲取,獲取成功則將該值設置到更低層的緩存中
* 情景:
* 本地沒有獲取到緩存,遠程獲取到了緩存,這里會將遠程的緩存數據設置到本地中,
* 這樣下次請求則直接從本次獲取,減少了遠程獲取的時間
*/
checkResultAndFillUpperCache(key, i, holder);
return new CacheGetResult(CacheResultCode.SUCCESS, null, holder);
}
}
return CacheGetResult.NOT_EXISTS_WITHOUT_MSG;
}
private CacheValueHolder<V> unwrapHolder(CacheValueHolder<V> h) {
// if @Cached or @CacheCache change type from REMOTE to BOTH (or from BOTH to REMOTE),
// during the dev/publish process, the value type which different application server put into cache server will be different
// (CacheValueHolder<V> and CacheValueHolder<CacheValueHolder<V>>, respectively).
// So we need correct the problem at here and in CacheGetResult.
Objects.requireNonNull(h);
if (h.getValue() instanceof CacheValueHolder) {
return (CacheValueHolder<V>) h.getValue();
} else {
return h;
}
}
private void checkResultAndFillUpperCache(K key, int i, CacheValueHolder<V> h) {
Objects.requireNonNull(h);
long currentExpire = h.getExpireTime();
long now = System.currentTimeMillis();
if (now <= currentExpire) {
if(config.isUseExpireOfSubCache()){ // 如果使用本地自己的緩存過期時間
// 使用本地緩存自己的過期時間
PUT_caches(i, key, h.getValue(), 0, null);
} else { // 使用遠程緩存的過期時間
long restTtl = currentExpire - now;
if (restTtl > 0) { // 遠程緩存數據還未失效,則重新設置本地的緩存
PUT_caches(i, key, h.getValue(), restTtl, TimeUnit.MILLISECONDS);
}
}
}
}
@Override
protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
HashMap<K, CacheGetResult<V>> resultMap = new HashMap<>();
Set<K> restKeys = new HashSet<>(keys);
for (int i = 0; i < caches.length; i++) {
if (restKeys.size() == 0) {
break;
}
Cache<K, CacheValueHolder<V>> c = caches[i];
MultiGetResult<K, CacheValueHolder<V>> allResult = c.GET_ALL(restKeys);
if (allResult.isSuccess() && allResult.getValues() != null) {
for (Map.Entry<K, CacheGetResult<CacheValueHolder<V>>> en : allResult.getValues().entrySet()) {
K key = en.getKey();
CacheGetResult result = en.getValue();
if (result.isSuccess()) {
CacheValueHolder<V> holder = unwrapHolder(result.getHolder());
checkResultAndFillUpperCache(key, i, holder);
resultMap.put(key, new CacheGetResult(CacheResultCode.SUCCESS, null, holder));
restKeys.remove(key);
}
}
}
}
for (K k : restKeys) {
resultMap.put(k, CacheGetResult.NOT_EXISTS_WITHOUT_MSG);
}
return new MultiGetResult<>(CacheResultCode.SUCCESS, null, resultMap);
}
@Override
protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
return PUT_caches(caches.length, key, value, expireAfterWrite, timeUnit);
}
@Override
protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
for (Cache c : caches) {
CacheResult r;
if(timeUnit == null) {
r = c.PUT_ALL(map);
} else {
r = c.PUT_ALL(map, expireAfterWrite, timeUnit);
}
future = combine(future, r);
}
return new CacheResult(future);
}
private CacheResult PUT_caches(int lastIndex, K key, V value, long expire, TimeUnit timeUnit) {
CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
for (int i = 0; i < lastIndex; i++) {
Cache cache = caches[i];
CacheResult r;
if (timeUnit == null) { // 表示本地緩存使用自己過期時間
r = cache.PUT(key, value);
} else {
r = cache.PUT(key, value, expire, timeUnit);
}
// 將多個 PUT 操作放在一條鏈上
future = combine(future, r);
}
return new CacheResult(future);
}
private CompletableFuture<ResultData> combine(CompletableFuture<ResultData> future, CacheResult result) {
return future.thenCombine(result.future(), (d1, d2) -> {
if (d1 == null) {
return d2;
}
if (d1.getResultCode() != d2.getResultCode()) {
return new ResultData(CacheResultCode.PART_SUCCESS, null, null);
}
return d1;
});
}
@Override
protected CacheResult do_REMOVE(K key) {
CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
for (Cache cache : caches) {
CacheResult r = cache.REMOVE(key);
future = combine(future, r);
}
return new CacheResult(future);
}
@Override
protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) {
CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
for (Cache cache : caches) {
CacheResult r = cache.REMOVE_ALL(keys);
future = combine(future, r);
}
return new CacheResult(future);
}
@Override
public <T> T unwrap(Class<T> clazz) {
Objects.requireNonNull(clazz);
for (Cache cache : caches) {
try {
T obj = (T) cache.unwrap(clazz);
if (obj != null) {
return obj;
}
} catch (IllegalArgumentException e) {
// ignore
}
}
throw new IllegalArgumentException(clazz.getName());
}
@Override
public AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit) {
if (key == null) {
return null;
}
return caches[caches.length - 1].tryLock(key, expire, timeUnit);
}
@Override
public boolean putIfAbsent(K key, V value) {
throw new UnsupportedOperationException("putIfAbsent is not supported by MultiLevelCache");
}
@Override
protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
throw new UnsupportedOperationException("PUT_IF_ABSENT is not supported by MultiLevelCache");
}
@Override
public void close() {
for (Cache c : caches) {
c.close();
}
}
}
RefreshCache
com.alicp.jetcache.RefreshCache
為緩存實例添加刷新任務,前面在AbstractCache抽象類中講到了,在com.alicp.jetcache.anno.support.CacheContext.buildCache
方法中會將cache包裝成CacheHandlerRefreshCache
,所以說每個緩存實例都會調用一下addOrUpdateRefreshTask
方法,代碼如下:
public class RefreshCache<K, V> extends LoadingCache<K, V> {
protected CacheConfig<K, V> config;
/**
* 用於保存刷新任務
*/
private ConcurrentHashMap<Object, RefreshTask> taskMap = new ConcurrentHashMap<>();
protected void addOrUpdateRefreshTask(K key, CacheLoader<K, V> loader) {
// 獲取緩存刷新策略
RefreshPolicy refreshPolicy = config.getRefreshPolicy();
if (refreshPolicy == null) { // 沒有則不進行刷新
return;
}
// 獲取刷新時間間隔
long refreshMillis = refreshPolicy.getRefreshMillis();
if (refreshMillis > 0) {
// 獲取線程任務的ID
Object taskId = getTaskId(key);
// 獲取對應的RefreshTask,不存在則創建一個
RefreshTask refreshTask = taskMap.computeIfAbsent(taskId, tid -> {
logger.debug("add refresh task. interval={}, key={}", refreshMillis, key);
RefreshTask task = new RefreshTask(taskId, key, loader);
task.lastAccessTime = System.currentTimeMillis();
/*
* 獲取 ScheduledExecutorService 周期/延遲線程池,10個核心線程,創建的線程都是守護線程
* scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit)
* 運行的任務task、多久延遲后開始執行、后續執行的周期間隔多長,時間單位
* 通過其創建一個循環任務,用於刷新緩存數據
*/
ScheduledFuture<?> future = JetCacheExecutor.heavyIOExecutor().scheduleWithFixedDelay(task,
refreshMillis, refreshMillis, TimeUnit.MILLISECONDS);
task.future = future;
return task;
});
// 設置最后一次訪問時間
refreshTask.lastAccessTime = System.currentTimeMillis();
}
}
}
如果緩存實例配置了刷新策略並且刷新間隔大於0,則會從taskMap
(線程安全)中嘗試獲取對應的刷新任務RefreshTask
,如果不存在則創建一個任務放入線程池周期性的執行
com.alicp.jetcache.RefreshCache.RefreshTask
代碼如下:
public class RefreshCache<K, V> extends LoadingCache<K, V> {
protected Cache concreteCache() {
Cache c = getTargetCache();
while (true) {
if (c instanceof ProxyCache) {
c = ((ProxyCache) c).getTargetCache();
} else if (c instanceof MultiLevelCache) {
Cache[] caches = ((MultiLevelCache) c).caches();
// 如果是兩級緩存則返回遠程緩存
c = caches[caches.length - 1];
} else {
return c;
}
}
}
class RefreshTask implements Runnable {
/**
* 唯一標志符,也就是Key轉換后的值
*/
private Object taskId;
/**
* 緩存的Key
*/
private K key;
/**
* 執行方法的CacheLoader對象
*/
private CacheLoader<K, V> loader;
/**
* 最后一次訪問時間
*/
private long lastAccessTime;
/**
* 該 Task 的執行策略
*/
private ScheduledFuture future;
RefreshTask(Object taskId, K key, CacheLoader<K, V> loader) {
this.taskId = taskId;
this.key = key;
this.loader = loader;
}
private void cancel() {
logger.debug("cancel refresh: {}", key);
// 嘗試中斷當前任務
future.cancel(false);
// 從任務列表中刪除
taskMap.remove(taskId);
}
/**
* 重新加載數據
*
* @throws Throwable 異常
*/
private void load() throws Throwable {
CacheLoader<K, V> l = loader == null ? config.getLoader() : loader;
if (l != null) {
// 封裝 CacheLoader 成 ProxyLoader,加載后會發起 Load 事件
l = CacheUtil.createProxyLoader(cache, l, eventConsumer);
// 加載
V v = l.load(key);
if (needUpdate(v, l)) {
// 將重新加載的數據放入緩存
cache.PUT(key, v);
}
}
}
/**
* 遠程加載數據
*
* @param concreteCache 緩存對象
* @param currentTime 當前時間
* @throws Throwable 異常
*/
private void externalLoad(final Cache concreteCache, final long currentTime) throws Throwable {
// 獲取 Key 轉換后的值
byte[] newKey = ((AbstractExternalCache) concreteCache).buildKey(key);
// 創建分布式鎖對應的Key
byte[] lockKey = combine(newKey, "_#RL#".getBytes());
// 分布式鎖的存在時間
long loadTimeOut = RefreshCache.this.config.getRefreshPolicy().getRefreshLockTimeoutMillis();
// 刷新間隔
long refreshMillis = config.getRefreshPolicy().getRefreshMillis();
// Key對應的時間戳Key(用於存放上次刷新時間)
byte[] timestampKey = combine(newKey, "_#TS#".getBytes());
// AbstractExternalCache buildKey method will not convert byte[]
// 獲取Key上一次刷新時間
CacheGetResult refreshTimeResult = concreteCache.GET(timestampKey);
boolean shouldLoad = false; // 是否需要重新加載
if (refreshTimeResult.isSuccess()) {
// 當前時間與上一次刷新的時間間隔是否大於或等於刷新間隔
shouldLoad = currentTime >= Long.parseLong(refreshTimeResult.getValue().toString()) + refreshMillis;
} else if (refreshTimeResult.getResultCode() == CacheResultCode.NOT_EXISTS) { // 無緩存
shouldLoad = true;
}
if (!shouldLoad) {
if (multiLevelCache) {
// 將頂層的緩存數據更新至低層的緩存中,例如將遠程的緩存數據放入本地緩存
// 因為如果是多級緩存,創建刷新任務后,我們只需更新遠程的緩存,然后從遠程緩存獲取緩存數據更新低層的緩存,保證緩存一致
refreshUpperCaches(key);
}
return;
}
// 重新加載
Runnable r = () -> {
try {
load();
// AbstractExternalCache buildKey method will not convert byte[]
// 保存一個key-value至redis,其中的信息為該value的生成時間,刷新緩存
concreteCache.put(timestampKey, String.valueOf(System.currentTimeMillis()));
} catch (Throwable e) {
throw new CacheException("refresh error", e);
}
};
// AbstractExternalCache buildKey method will not convert byte[]
// 分布式緩存沒有一個全局分配的功能,這里嘗試獲取一把非嚴格的分布式鎖,獲取鎖的超時時間默認60秒,也就是獲取到這把鎖最多可以擁有60秒
// 只有獲取Key對應的這把分布式鎖,才執行重新加載的操作
boolean lockSuccess = concreteCache.tryLockAndRun(lockKey, loadTimeOut, TimeUnit.MILLISECONDS, r);
if (!lockSuccess && multiLevelCache) { // 沒有獲取到鎖並且是多級緩存
// 這個時候應該有其他實例在刷新緩存,所以這里設置過一會直接獲取遠程的緩存數據更新到本地
// 創建一個延遲任務(1/5刷新間隔后),將最頂層的緩存數據更新至每一層
JetCacheExecutor.heavyIOExecutor().schedule(() -> refreshUpperCaches(key), (long) (0.2 * refreshMillis),
TimeUnit.MILLISECONDS);
}
}
private void refreshUpperCaches(K key) {
MultiLevelCache<K, V> targetCache = (MultiLevelCache<K, V>) getTargetCache();
Cache[] caches = targetCache.caches();
int len = caches.length;
// 獲取多級緩存中頂層的緩存數據
CacheGetResult cacheGetResult = caches[len - 1].GET(key);
if (!cacheGetResult.isSuccess()) {
return;
}
// 將緩存數據重新放入低層緩存
for (int i = 0; i < len - 1; i++) {
caches[i].PUT(key, cacheGetResult.getValue());
}
}
/**
* 刷新任務的具體執行
*/
@Override
public void run() {
try {
if (config.getRefreshPolicy() == null || (loader == null && !hasLoader())) {
// 取消執行
cancel();
return;
}
long now = System.currentTimeMillis();
long stopRefreshAfterLastAccessMillis = config.getRefreshPolicy().getStopRefreshAfterLastAccessMillis();
if (stopRefreshAfterLastAccessMillis > 0) {
// 最后一次訪問到現在時間的間隔超過了設置的 stopRefreshAfterLastAccessMillis,則取消當前任務執行
if (lastAccessTime + stopRefreshAfterLastAccessMillis < now) {
logger.debug("cancel refresh: {}", key);
cancel();
return;
}
}
logger.debug("refresh key: {}", key);
// 獲取緩存實例對象,如果是多層則返回頂層,也就是遠程緩存
Cache concreteCache = concreteCache();
if (concreteCache instanceof AbstractExternalCache) { // 遠程緩存刷新
externalLoad(concreteCache, now);
} else { // 本地緩存刷新
load();
}
} catch (Throwable e) {
logger.error("refresh error: key=" + key, e);
}
}
}
}
刷新邏輯:
-
判斷是否需要停止刷新了,需要的話調用其
future
的cancel方法取消執行,並從taskMap
中刪除 -
獲取緩存實例對象,如果是多層則返回頂層,也就是遠程緩存實例對象
-
如果是本地緩存,則調用
load
方法,也就是執行loader函數加載原有方法,將獲取到的數據更新至緩存實例中(如果是多級緩存,則每級緩存都會更新) -
如果是遠程緩存對象,則調用
externalLoad
方法,刷新后會往Redis中存放一個鍵值對,key為key_#TS#
,value為上一次刷新時間
-
先從Redis中獲取上一次刷新時間的鍵值對,根據上一次刷新的時間判斷是否大於刷新間隔,大於(或者沒有上一次刷新時間)表示需要重新加載數據,否則不需要重新加載數據
-
如果不需要重新加載數據,但是又是多級緩存,則獲取遠程緩存數據更新至本地緩存,保證兩級緩存的一致性
-
如果需要重新加載數據,則調用
tryLockAndRun
方法,嘗試獲取分布式鎖,執行刷新任務(調用load
方法,並往Redis中重新設置上一次的刷新時間),如果沒有獲取到分布式鎖,則創建一個延遲任務(1/5刷新間隔后)將最頂層的緩存數據更新至每一層
-
解析配置
主要查看jetcache-autoconfigure子模塊,解析application.yml中jetcache相關配置,初始化不同緩存類型的CacheBuilder
構造器,用於生產緩存實例,也初始化以下對象:
com.alicp.jetcache.anno.support.ConfigProvider
:緩存管理器,注入了全局配置GlobalCacheConfig、緩存實例管理器SimpleCacheManager、緩存上下文CacheContext等大量信息
com.alicp.jetcache.autoconfigure.AutoConfigureBeans
:存儲CacheBuilder
構造器以及Redis的相關信息
com.alicp.jetcache.anno.support.GlobalCacheConfig
:全局配置類,保存了一些全局信息
初始化構造器
通過@Conditional
注解將需要使用到的緩存類型對應的構造器初始化類注入到Spring容器並執行初始化過程,也就是創建CacheBuilder構造器
初始化構造器類的類型結構如下圖所示:

主要對象描述:
AbstractCacheAutoInit:抽象類,實現Spring的InitializingBean接口,注入至Spring容器時完成初始化
EmbeddedCacheAutoInit:抽象類,繼承AbstractCacheAutoInit,解析本地緩存獨有的配置
LinkedHashMapAutoConfiguration:初始化LinkedHashMapCacheBuilder構造器
CaffeineAutoConfiguration:初始化CaffeineCacheBuilder構造器
ExternalCacheAutoInit:抽象類,繼承AbstractCacheAutoInit,解析遠程緩存獨有的配置
RedisAutoInit:初始化RedisCacheBuilder構造器
RedisLettuceAutoInit:初始化RedisLettuceCacheBuilder構造器
AbstractCacheAutoInit
com.alicp.jetcache.autoconfigure.AbstractCacheAutoInit
抽象類主要實現了Spring的InitializingBean接口,在注入Spring容器時,Spring會調用其afterPropertiesSet方法,完成本地緩存類型和遠程緩存類型CacheBuilder
構造器的初始化,主要代碼如下:
public abstract class AbstractCacheAutoInit implements InitializingBean {
@Autowired
protected ConfigurableEnvironment environment;
@Autowired
protected AutoConfigureBeans autoConfigureBeans;
@Autowired
protected ConfigProvider configProvider;
protected String[] typeNames;
private boolean inited = false;
public AbstractCacheAutoInit(String... cacheTypes) {
Objects.requireNonNull(cacheTypes,"cacheTypes can't be null");
Assert.isTrue(cacheTypes.length > 0, "cacheTypes length is 0");
this.typeNames = cacheTypes;
}
/**
* 初始化方法
*/
@Override
public void afterPropertiesSet() {
if (!inited) {
synchronized (this) {
if (!inited) {
// 這里我們有兩個指定前綴 'jetcache.local' 'jetcache.remote'
process("jetcache.local.", autoConfigureBeans.getLocalCacheBuilders(), true);
process("jetcache.remote.", autoConfigureBeans.getRemoteCacheBuilders(), false);
inited = true;
}
}
}
}
private void process(String prefix, Map cacheBuilders, boolean local) {
// 創建一個配置對象(本地或者遠程)
ConfigTree resolver = new ConfigTree(environment, prefix);
// 獲取本地或者遠程的配置項
Map<String, Object> m = resolver.getProperties();
// 獲取本地或者遠程的 area ,這里我一般只有默認的 default
Set<String> cacheAreaNames = resolver.directChildrenKeys();
for (String cacheArea : cacheAreaNames) {
// 獲取本地或者遠程存儲類型,例如 caffeine,redis.lettuce
final Object configType = m.get(cacheArea + ".type");
// 緩存類型是否和當前 CacheAutoInit 的某一個 typeName 匹配(不同的 CacheAutoInit 會設置一個或者多個 typename)
boolean match = Arrays.stream(typeNames).anyMatch((tn) -> tn.equals(configType));
/*
* 因為有很多 CacheAutoInit 繼承者,都會執行這個方法,不同的繼承者解析不同的配置
* 例如 CaffeineAutoConfiguration 只解析 jetcache.local.default.type=caffeine 即可
* RedisLettuceAutoInit 只解析 jetcache.remote.default.type=redis.lettuce 即可
*/
if (!match) {
continue;
}
// 獲取本地或者遠程的 area 的子配置項
ConfigTree ct = resolver.subTree(cacheArea + ".");
logger.info("init cache area {} , type= {}", cacheArea, typeNames[0]);
// 根據配置信息構建本地或者遠程緩存的 CacheBuilder 構造器
CacheBuilder c = initCache(ct, local ? "local." + cacheArea : "remote." + cacheArea);
// 將 CacheBuilder 構造器存放至 AutoConfigureBeans
cacheBuilders.put(cacheArea, c);
}
}
/**
* 設置公共的配置到 CacheBuilder 構造器中
*
* @param builder 構造器
* @param ct 配置信息
*/
protected void parseGeneralConfig(CacheBuilder builder, ConfigTree ct) {
AbstractCacheBuilder acb = (AbstractCacheBuilder) builder;
// 設置 Key 的轉換函數
acb.keyConvertor(configProvider.parseKeyConvertor(ct.getProperty("keyConvertor")));
// 設置超時時間
String expireAfterWriteInMillis = ct.getProperty("expireAfterWriteInMillis");
if (expireAfterWriteInMillis == null) {
// compatible with 2.1 兼容老版本
expireAfterWriteInMillis = ct.getProperty("defaultExpireInMillis");
}
if (expireAfterWriteInMillis != null) {
acb.setExpireAfterWriteInMillis(Long.parseLong(expireAfterWriteInMillis));
}
// 多長時間沒有訪問就讓緩存失效,0表示不使用該功能(注意:只支持本地緩存)
String expireAfterAccessInMillis = ct.getProperty("expireAfterAccessInMillis");
if (expireAfterAccessInMillis != null) {
acb.setExpireAfterAccessInMillis(Long.parseLong(expireAfterAccessInMillis));
}
}
/**
* 初始化 CacheBuilder 構造器交由子類去實現
*
* @param ct 配置信息
* @param cacheAreaWithPrefix 配置前綴
* @return CacheBuilder 構造器
*/
protected abstract CacheBuilder initCache(ConfigTree ct, String cacheAreaWithPrefix);
}
- 在
afterPropertiesSet()
方法中可以看到會調用process
方法分別初始化本地緩存和遠程緩存的構造器 - 定義的
process
方法:- 首先會從當前環境中解析出JetCache的相關配置到ConfigTree對象中
- 然后遍歷緩存區域,獲取對應的緩存類型type,進行不同類型的緩存實例CacheBuilder構造器初始化過程
- 不同CacheBuilder構造器的初始化方法
initCache
交由子類實現 - 獲取到CacheBuilder構造器后會將其放入
AutoConfigureBeans
對象中去
- 另外也定義了
parseGeneralConfig
方法解析本地緩存和遠程緩存都有的配置至CacheBuilder構造器中
EmbeddedCacheAutoInit
com.alicp.jetcache.autoconfigure.EmbeddedCacheAutoInit
抽象類繼承了AbstractCacheAutoInit
,主要是覆蓋父類的parseGeneralConfig
,解析本地緩存單有的配置limit
,代碼如下:
public abstract class EmbeddedCacheAutoInit extends AbstractCacheAutoInit {
public EmbeddedCacheAutoInit(String... cacheTypes) {
super(cacheTypes);
}
@Override
protected void parseGeneralConfig(CacheBuilder builder, ConfigTree ct) {
super.parseGeneralConfig(builder, ct);
EmbeddedCacheBuilder ecb = (EmbeddedCacheBuilder) builder;
// 設置本地緩存每個緩存實例的緩存數量個數限制(默認100)
ecb.limit(Integer.parseInt(ct.getProperty("limit", String.valueOf(CacheConsts.DEFAULT_LOCAL_LIMIT))));
}
}
LinkedHashMapAutoConfiguration
com.alicp.jetcache.autoconfigure.LinkedHashMapAutoConfiguration
繼承了EmbeddedCacheAutoInit
,實現了initCache
方法,先通過LinkedHashMapCacheBuilder創建一個默認實現類,然后解析相關配置至構造器中完成初始化,代碼如下:
@Component
@Conditional(LinkedHashMapAutoConfiguration.LinkedHashMapCondition.class)
public class LinkedHashMapAutoConfiguration extends EmbeddedCacheAutoInit {
public LinkedHashMapAutoConfiguration() {
super("linkedhashmap");
}
@Override
protected CacheBuilder initCache(ConfigTree ct, String cacheAreaWithPrefix) {
// 創建一個 LinkedHashMapCacheBuilder 構造器
LinkedHashMapCacheBuilder builder = LinkedHashMapCacheBuilder.createLinkedHashMapCacheBuilder();
// 解析相關配置至 LinkedHashMapCacheBuilder 的 CacheConfig 中
parseGeneralConfig(builder, ct);
return builder;
}
public static class LinkedHashMapCondition extends JetCacheCondition {
// 配置了緩存類型為 linkedhashmap 當前類才會被注入 Spring 容器
public LinkedHashMapCondition() {
super("linkedhashmap");
}
}
}
-
這里我們注意到
@Conditional
注解,這個注解的作用是:滿足SpringBootCondition
條件這個Bean才會被Spring容器管理 -
他的條件是
LinkedHashMapCondition
,繼承了JetCacheCondition
,也就是說配置文件中配置了緩存類型為linkedhashmap
時這個類才會被Spring容器管理,才會完成LinkedHashMapCacheBuilder構造器的初始化 -
JetCacheCondition
邏輯並不復雜,可自行查看
CaffeineAutoConfiguration
com.alicp.jetcache.autoconfigure.CaffeineAutoConfiguration
繼承了EmbeddedCacheAutoInit
,實現了initCache
方法,先通過CaffeineCacheBuilder創建一個默認實現類,然后解析相關配置至構造器中完成初始化,代碼如下:
@Component
@Conditional(CaffeineAutoConfiguration.CaffeineCondition.class)
public class CaffeineAutoConfiguration extends EmbeddedCacheAutoInit {
public CaffeineAutoConfiguration() {
super("caffeine");
}
@Override
protected CacheBuilder initCache(ConfigTree ct, String cacheAreaWithPrefix) {
// 創建一個 CaffeineCacheBuilder 構造器
CaffeineCacheBuilder builder = CaffeineCacheBuilder.createCaffeineCacheBuilder();
// 解析相關配置至 CaffeineCacheBuilder 的 CacheConfig 中
parseGeneralConfig(builder, ct);
return builder;
}
public static class CaffeineCondition extends JetCacheCondition {
// 配置了緩存類型為 caffeine 當前類才會被注入 Spring 容器
public CaffeineCondition() {
super("caffeine");
}
}
}
-
同樣使用了
@Conditional
注解,這個注解的作用是:滿足SpringBootCondition
條件這個Bean才會被Spring容器管理 -
他的條件是
CaffeineCondition
,繼承了JetCacheCondition
,也就是說配置文件中配置了緩存類型為caffeine
時這個類才會被Spring容器管理,才會完成LinkedHashMapCacheBuilder構造器的初始化
ExternalCacheAutoInit
com.alicp.jetcache.autoconfigure.ExternalCacheAutoInit
抽象類繼承了AbstractCacheAutoInit
,主要是覆蓋父類的parseGeneralConfig
,解析遠程緩存單有的配置keyPrefix
、valueEncoder
和valueDecoder
,代碼如下:
public abstract class ExternalCacheAutoInit extends AbstractCacheAutoInit {
public ExternalCacheAutoInit(String... cacheTypes) {
super(cacheTypes);
}
/**
* 設置遠程緩存 CacheBuilder 構造器的相關配置
*
* @param builder 構造器
* @param ct 配置信息
*/
@Override
protected void parseGeneralConfig(CacheBuilder builder, ConfigTree ct) {
super.parseGeneralConfig(builder, ct);
ExternalCacheBuilder ecb = (ExternalCacheBuilder) builder;
// 設置遠程緩存 key 的前綴
ecb.setKeyPrefix(ct.getProperty("keyPrefix"));
/*
* 根據配置創建緩存數據的編碼函數和解碼函數
*/
ecb.setValueEncoder(configProvider.parseValueEncoder(ct.getProperty("valueEncoder", CacheConsts.DEFAULT_SERIAL_POLICY)));
ecb.setValueDecoder(configProvider.parseValueDecoder(ct.getProperty("valueDecoder", CacheConsts.DEFAULT_SERIAL_POLICY)));
}
}
RedisAutoInit
com.alicp.jetcache.autoconfigure.RedisAutoInit
繼承了ExternalCacheAutoInit
,實現initCache
方法,完成了通過Jedis連接Redis的初始化操作,主要代碼如下:
@Configuration
@Conditional(RedisAutoConfiguration.RedisCondition.class)
public class RedisAutoConfiguration {
public static final String AUTO_INIT_BEAN_NAME = "redisAutoInit";
@Bean(name = AUTO_INIT_BEAN_NAME)
public RedisAutoInit redisAutoInit() {
return new RedisAutoInit();
}
public static class RedisCondition extends JetCacheCondition {
// 配置了緩存類型為 redis 當前類才會被注入 Spring 容器
public RedisCondition() {
super("redis");
}
}
public static class RedisAutoInit extends ExternalCacheAutoInit {
public RedisAutoInit() {
// 設置緩存類型
super("redis");
}
@Autowired
private AutoConfigureBeans autoConfigureBeans;
@Override
protected CacheBuilder initCache(ConfigTree ct, String cacheAreaWithPrefix) {
Pool jedisPool = parsePool(ct);
Pool[] slavesPool = null;
int[] slavesPoolWeights = null;
// 是否只從 Redis 的從節點讀取數據
boolean readFromSlave = Boolean.parseBoolean(ct.getProperty("readFromSlave", "False"));
// 獲取從節點的配置信息
ConfigTree slaves = ct.subTree("slaves.");
Set<String> slaveNames = slaves.directChildrenKeys();
// 依次創建每個從節點的連接池
if (slaveNames.size() > 0) {
slavesPool = new Pool[slaveNames.size()];
slavesPoolWeights = new int[slaveNames.size()];
int i = 0;
for (String slaveName: slaveNames) {
ConfigTree slaveConfig = slaves.subTree(slaveName + ".");
slavesPool[i] = parsePool(slaveConfig);
slavesPoolWeights[i] = Integer.parseInt(slaveConfig.getProperty("weight","100"));
i++;
}
}
// 創建一個 RedisCacheBuilder 構造器
ExternalCacheBuilder externalCacheBuilder = RedisCacheBuilder.createRedisCacheBuilder()
.jedisPool(jedisPool)
.readFromSlave(readFromSlave)
.jedisSlavePools(slavesPool)
.slaveReadWeights(slavesPoolWeights);
// 解析相關配置至 RedisCacheBuilder 的 CacheConfig 中
parseGeneralConfig(externalCacheBuilder, ct);
// eg: "jedisPool.remote.default"
autoConfigureBeans.getCustomContainer().put("jedisPool." + cacheAreaWithPrefix, jedisPool);
return externalCacheBuilder;
}
/**
* 創建 Redis 連接池
*
* @param ct 配置信息
* @return 連接池
*/
private Pool<Jedis> parsePool(ConfigTree ct) {
// 創建連接池配置對象
GenericObjectPoolConfig poolConfig = parsePoolConfig(ct);
String host = ct.getProperty("host", (String) null);
int port = Integer.parseInt(ct.getProperty("port", "0"));
int timeout = Integer.parseInt(ct.getProperty("timeout", String.valueOf(Protocol.DEFAULT_TIMEOUT)));
String password = ct.getProperty("password", (String) null);
int database = Integer.parseInt(ct.getProperty("database", String.valueOf(Protocol.DEFAULT_DATABASE)));
String clientName = ct.getProperty("clientName", (String) null);
boolean ssl = Boolean.parseBoolean(ct.getProperty("ssl", "false"));
String masterName = ct.getProperty("masterName", (String) null);
String sentinels = ct.getProperty("sentinels", (String) null);//ip1:port,ip2:port
Pool<Jedis> jedisPool;
if (sentinels == null) {
Objects.requireNonNull(host, "host/port or sentinels/masterName is required");
if (port == 0) {
throw new IllegalStateException("host/port or sentinels/masterName is required");
}
// 創建一個 Jedis 連接池
jedisPool = new JedisPool(poolConfig, host, port, timeout, password, database, clientName, ssl);
} else {
Objects.requireNonNull(masterName, "host/port or sentinels/masterName is required");
String[] strings = sentinels.split(",");
HashSet<String> sentinelsSet = new HashSet<>();
for (String s : strings) {
if (s != null && !s.trim().equals("")) {
sentinelsSet.add(s.trim());
}
}
// 創建一個 Jedis Sentine 連接池
jedisPool = new JedisSentinelPool(masterName, sentinelsSet, poolConfig, timeout, password, database, clientName);
}
return jedisPool;
}
}
}
-
com.alicp.jetcache.autoconfigure.RedisAutoInit
是com.alicp.jetcache.autoconfigure.RedisAutoConfiguration
內部的靜態類,在RedisAutoConfiguration內通過redisAutoInit()
方法定義RedisAutoInit作為Spring Bean -
同樣RedisAutoConfiguration使用了
@Conditional
注解,滿足SpringBootCondition
條件這個Bean才會被Spring容器管理,內部的RedisAutoInit也不會被管理,也就是說配置文件中配置了緩存類型為redis
時RedisLettuceAutoInit才會被Spring容器管理,才會完成RedisLettuceCacheBuilder構造器的初始化 -
實現了
initCache
方法- 先解析Redis的相關配置
- 通過Jedis創建Redis連接池
- 通過RedisCacheBuilder創建一個默認實現類
- 解析相關配置至構造器中完成初始化
- 將Redis連接保存至
AutoConfigureBeans
中
RedisLettuceAutoInit
com.alicp.jetcache.autoconfigure.RedisLettuceAutoInit
繼承了ExternalCacheAutoInit
,實現initCache
方法,完成了通過Lettuce連接Redis的初始化操作,主要代碼如下:
@Configuration
@Conditional(RedisLettuceAutoConfiguration.RedisLettuceCondition.class)
public class RedisLettuceAutoConfiguration {
public static final String AUTO_INIT_BEAN_NAME = "redisLettuceAutoInit";
/**
* 注入 spring 容器的條件
*/
public static class RedisLettuceCondition extends JetCacheCondition {
// 配置了緩存類型為 redis.lettuce 當前類才會被注入 Spring 容器
public RedisLettuceCondition() {
super("redis.lettuce");
}
}
@Bean(name = {AUTO_INIT_BEAN_NAME})
public RedisLettuceAutoInit redisLettuceAutoInit() {
return new RedisLettuceAutoInit();
}
public static class RedisLettuceAutoInit extends ExternalCacheAutoInit {
public RedisLettuceAutoInit() {
// 設置緩存類型
super("redis.lettuce");
}
/**
* 初始化 RedisLettuceCacheBuilder 構造器
*
* @param ct 配置信息
* @param cacheAreaWithPrefix 配置前綴
* @return 構造器
*/
@Override
protected CacheBuilder initCache(ConfigTree ct, String cacheAreaWithPrefix) {
Map<String, Object> map = ct.subTree("uri"/*there is no dot*/).getProperties();
// 數據節點偏好設置
String readFromStr = ct.getProperty("readFrom");
// 集群模式
String mode = ct.getProperty("mode");
// 異步獲取結果的超時時間,默認1s
long asyncResultTimeoutInMillis = Long.parseLong(
ct.getProperty("asyncResultTimeoutInMillis", Long.toString(CacheConsts.ASYNC_RESULT_TIMEOUT.toMillis())));
ReadFrom readFrom = null;
if (readFromStr != null) {
/*
* MASTER:只從Master節點中讀取。
* MASTER_PREFERRED:優先從Master節點中讀取。
* SLAVE_PREFERRED:優先從Slave節點中讀取。
* SLAVE:只從Slave節點中讀取。
* NEAREST:使用最近一次連接的Redis實例讀取。
*/
readFrom = ReadFrom.valueOf(readFromStr.trim());
}
AbstractRedisClient client;
StatefulConnection connection = null;
if (map == null || map.size() == 0) {
throw new CacheConfigException("lettuce uri is required");
} else {
// 創建對應的 RedisURI
List<RedisURI> uriList = map.values().stream().map((k) -> RedisURI.create(URI.create(k.toString())))
.collect(Collectors.toList());
if (uriList.size() == 1) { // 只有一個 URI,集群模式只給一個域名怎么辦 TODO 疑問??
RedisURI uri = uriList.get(0);
if (readFrom == null) {
// 創建一個 Redis 客戶端
client = RedisClient.create(uri);
// 設置失去連接時的行為,拒絕命令,默認為 DEFAULT
((RedisClient) client).setOptions(ClientOptions.builder().
disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS).build());
} else {
// 創建一個 Redis 客戶端
client = RedisClient.create();
((RedisClient) client).setOptions(ClientOptions.builder().
disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS).build());
// 創建一個安全連接並設置數據節點偏好
StatefulRedisMasterSlaveConnection c = MasterSlave.connect(
(RedisClient) client, new JetCacheCodec(), uri);
c.setReadFrom(readFrom);
connection = c;
}
} else { // 多個 URI,集群模式
if (mode != null && mode.equalsIgnoreCase("MasterSlave")) {
client = RedisClient.create();
((RedisClient) client).setOptions(ClientOptions.builder().
disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS).build());
StatefulRedisMasterSlaveConnection c = MasterSlave.connect(
(RedisClient) client, new JetCacheCodec(), uriList);
if (readFrom != null) {
c.setReadFrom(readFrom);
}
connection = c;
} else {
// 創建一個 Redis 客戶端
client = RedisClusterClient.create(uriList);
((RedisClusterClient) client).setOptions(ClusterClientOptions.builder().
disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS).build());
if (readFrom != null) {
StatefulRedisClusterConnection c = ((RedisClusterClient) client).connect(new JetCacheCodec());
c.setReadFrom(readFrom);
connection = c;
}
}
}
}
// 創建一個 RedisLettuceCacheBuilder 構造器
ExternalCacheBuilder externalCacheBuilder = RedisLettuceCacheBuilder.createRedisLettuceCacheBuilder()
.connection(connection)
.redisClient(client)
.asyncResultTimeoutInMillis(asyncResultTimeoutInMillis);
// 解析相關配置至 RedisLettuceCacheBuilder 的 CacheConfig 中
parseGeneralConfig(externalCacheBuilder, ct);
// eg: "remote.default.client"
autoConfigureBeans.getCustomContainer().put(cacheAreaWithPrefix + ".client", client);
// 開始將 Redis 客戶端和安全連接保存至 LettuceConnectionManager 管理器中
LettuceConnectionManager m = LettuceConnectionManager.defaultManager();
// 初始化 Lettuce 連接 Redis
m.init(client, connection);
// 初始化 Redis 連接的相關信息保存至 LettuceObjects 中,並將相關信息保存至 AutoConfigureBeans.customContainer
autoConfigureBeans.getCustomContainer().put(cacheAreaWithPrefix + ".connection", m.connection(client));
autoConfigureBeans.getCustomContainer().put(cacheAreaWithPrefix + ".commands", m.commands(client));
autoConfigureBeans.getCustomContainer().put(cacheAreaWithPrefix + ".asyncCommands", m.asyncCommands(client));
autoConfigureBeans.getCustomContainer().put(cacheAreaWithPrefix + ".reactiveCommands", m.reactiveCommands(client));
return externalCacheBuilder;
}
}
}
-
com.alicp.jetcache.autoconfigure.RedisLettuceAutoInit
是com.alicp.jetcache.autoconfigure.RedisLettuceAutoConfiguration
內部的靜態類,在RedisLettuceAutoConfiguration內通過redisLettuceAutoInit()
方法定義RedisLettuceAutoInit作為Spring Bean -
同樣RedisLettuceAutoConfiguration使用了
@Conditional
注解,滿足SpringBootCondition
條件這個Bean才會被Spring容器管理,內部的RedisLettuceAutoInit也不會被管理,也就是說配置文件中配置了緩存類型為redis.lettuce
時RedisLettuceAutoInit才會被Spring容器管理,才會完成RedisLettuceCacheBuilder構造器的初始化 -
實現了
initCache
方法- 先解析Redis的相關配置
- 通過Lettuce創建Redis客戶端和與Redis的連接
- 通過RedisLettuceCacheBuilder創建一個默認實現類
- 解析相關配置至構造器中完成初始化
- 獲取
LettuceConnectionManager
管理器,將通過Lettuce創建Redis客戶端和與Redis的連接保存 - 將Redis客戶端、與Redis的連接、同步命令、異步命令和反應式命令相關保存至
AutoConfigureBeans
中
JetCacheAutoConfiguration自動配置
上面的初始化構造器的類需要被Spring容器管理,就需被掃描到,我們一般會設置掃描路徑,但是別人引入JetCache肯定是作為其他包不能夠被掃描到的,這些Bean也就不會被Spring管理,這里我們查看jetcache-autoconfigure
模塊下src/main/resources/META-INF/spring.factories
文件,內容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.alicp.jetcache.autoconfigure.JetCacheAutoConfiguration
這應該是一種SPI
機制,這樣這個項目以外的JetCache包里面的com.alicp.jetcache.autoconfigure.JetCacheAutoConfiguration
就會被Spring容器掃描到,我們來看看他的代碼:
/**
* 該 Bean 將會被 Spring 容器注入,依次注入下面幾個 Bean
* SpringConfigProvider -> AutoConfigureBeans -> BeanDependencyManager(為 GlobalCacheConfig 添加 CacheAutoInit 依賴) -> GlobalCacheConfig
* 由此會完成初始化配置操作,緩存實例構造器 CacheBuilder 也會被注入容器
*
* Created on 2016/11/17.
*
* @author <a href="mailto:areyouok@gmail.com">huangli</a>
*/
@Configuration
@ConditionalOnClass(GlobalCacheConfig.class)
@ConditionalOnMissingBean(GlobalCacheConfig.class)
@EnableConfigurationProperties(JetCacheProperties.class)
@Import({RedisAutoConfiguration.class,
CaffeineAutoConfiguration.class,
MockRemoteCacheAutoConfiguration.class,
LinkedHashMapAutoConfiguration.class,
RedisLettuceAutoConfiguration.class,
RedisSpringDataAutoConfiguration.class})
public class JetCacheAutoConfiguration {
public static final String GLOBAL_CACHE_CONFIG_NAME = "globalCacheConfig";
private SpringConfigProvider _springConfigProvider = new SpringConfigProvider();
private AutoConfigureBeans _autoConfigureBeans = new AutoConfigureBeans();
private GlobalCacheConfig _globalCacheConfig;
@Bean
@ConditionalOnMissingBean
public SpringConfigProvider springConfigProvider() {
return _springConfigProvider;
}
@Bean
public AutoConfigureBeans autoConfigureBeans() {
return _autoConfigureBeans;
}
@Bean
public static BeanDependencyManager beanDependencyManager(){
return new BeanDependencyManager();
}
@Bean(name = GLOBAL_CACHE_CONFIG_NAME)
public GlobalCacheConfig globalCacheConfig(SpringConfigProvider configProvider,
AutoConfigureBeans autoConfigureBeans,
JetCacheProperties props) {
if (_globalCacheConfig != null) {
return _globalCacheConfig;
}
_globalCacheConfig = new GlobalCacheConfig();
_globalCacheConfig.setHiddenPackages(props.getHiddenPackages());
_globalCacheConfig.setStatIntervalMinutes(props.getStatIntervalMinutes());
_globalCacheConfig.setAreaInCacheName(props.isAreaInCacheName());
_globalCacheConfig.setPenetrationProtect(props.isPenetrationProtect());
_globalCacheConfig.setEnableMethodCache(props.isEnableMethodCache());
_globalCacheConfig.setLocalCacheBuilders(autoConfigureBeans.getLocalCacheBuilders());
_globalCacheConfig.setRemoteCacheBuilders(autoConfigureBeans.getRemoteCacheBuilders());
return _globalCacheConfig;
}
}
-
可以看到通過
@Import
注解,初始化構造器的那些類會被加入到Spring容器,加上@Condotional
注解,只有我們配置過的緩存類型的構造器才會被加入,然后保存至AutoConfigureBeans對象中 -
注意到這里我們注入的是
SpringConfigProvider
對象,加上@ConditionalOnMissingBean
注解,無法再次注冊該對象至Spring容器,相比ConfigProvider
對象,它的區別是設置了EncoderParser為DefaultSpringEncoderParser,設置了KeyConvertorParser為DefaultSpringKeyConvertorParser,目的是支持兩個解析器能夠解析自定義bean -
在
BeanDependencyManager
中可以看到它是一個BeanFactoryPostProcessor
,用於BeanFactory容器初始后執行操作,目的是往JetCacheAutoConfiguration的BeanDefinition的依賴中添加幾個AbstractCacheAutoInit類型的beanName,保證幾個CacheBuilder構造器已經初始化 -
globalCacheConfig
方法中設置全局的相關配置並添加已經初始化的CacheBuilder構造器,然后返回GlobalCacheConfig讓Spring容器管理,這樣一來就完成了JetCache的解析配置並初始化的功能
CacheBuilder構造器
構造器的作用就是根據配置構建一個對應類型的緩存實例
CacheBuilder的子類結構如下:

根據類名就可以知道其作用
CacheBuilder接口只定義了一個buildCache()
方法,用於構建緩存實例,交由不同的實現類
AbstractCacheBuilder抽象類實現了buildCache()
方法,主要代碼如下:
public abstract class AbstractCacheBuilder<T extends AbstractCacheBuilder<T>> implements CacheBuilder, Cloneable {
/**
* 該緩存實例的配置
*/
protected CacheConfig config;
/**
* 創建緩存實例函數
*/
private Function<CacheConfig, Cache> buildFunc;
public abstract CacheConfig getConfig();
protected T self() {
return (T) this;
}
public T buildFunc(Function<CacheConfig, Cache> buildFunc) {
this.buildFunc = buildFunc;
return self();
}
protected void beforeBuild() {
}
@Deprecated
public final <K, V> Cache<K, V> build() {
return buildCache();
}
@Override
public final <K, V> Cache<K, V> buildCache() {
if (buildFunc == null) {
throw new CacheConfigException("no buildFunc");
}
beforeBuild();
// 克隆一份配置信息,因為這里獲取到的是全局配置信息,以防后續被修改
CacheConfig c = getConfig().clone();
// 通過構建函數創建一個緩存實例
Cache<K, V> cache = buildFunc.apply(c);
/*
* 目前發現 c.getLoader() 都是 null,后續都會把 cache 封裝成 CacheHandlerRefreshCache
* TODO 疑問????
*/
if (c.getLoader() != null) {
if (c.getRefreshPolicy() == null) {
cache = new LoadingCache<>(cache);
} else {
cache = new RefreshCache<>(cache);
}
}
return cache;
}
@Override
public Object clone() {
AbstractCacheBuilder copy = null;
try {
copy = (AbstractCacheBuilder) super.clone();
copy.config = getConfig().clone();
return copy;
} catch (CloneNotSupportedException e) {
throw new CacheException(e);
}
}
}
-
實現了
java.lang.Cloneable
的clone方法,支持克隆該對象,因為每個緩存實例的配置不一定相同,這個構造器中保存的是全局的一些配置,所以需要克隆一個構造器出來為每個緩存實例設置其自己的配置而不影響這個最初始的構造器 -
定義CacheConfig對象存放緩存配置,構建緩存實例需要根據這些配置
-
定義的
buildFunc
函數用於構建緩存實例,我們在初始化構造器中可以看到,不同的構造器設置的該函數都是new一個緩存實例並傳入配置信息,例如:// 設置構建 CaffeineCache 緩存實例的函數 buildFunc((c) -> new CaffeineCache((EmbeddedCacheConfig) c)); // 進入CaffeineCache的構造器你就可以看到會根據配置完成緩存實例的初始化
不同類型的構造器區別在於CacheConfig類型不同,因為遠程和本地的配置是有所區別的,還有就是設置的buildFunc
函數不同,因為需要構建不同的緩存實例,和上面的例子差不多,都是new一個緩存實例並傳入配置信息,這里就不一一講述了
AOP
主要查看jetcache-anno子模塊,提供AOP功能
啟用JetCache
JetCache可以通過@EnableMethodCache和@EnableCreateCacheAnnotation注解完成AOP的初始化工作,我們在Spring Boot工程中的啟動類上面添加這兩個注解即可啟用JetCache緩存。
@EnableMethodCache
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({CommonConfiguration.class, ConfigSelector.class})
public @interface EnableMethodCache {
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
String[] basePackages();
}
注解的相關配置在上面的'如何使用'中已經講過了,這里我們關注@Import
注解中的CommonConfiguration
和ConfigSelector
兩個類,將會被Spring容器管理
-
com.alicp.jetcache.anno.config.CommonConfiguration
上面有@Configuration注解,所以會被作為一個Spring Bean,里面定義了一個Bean為ConfigMap
,所以這個Bean也會被Spring容器管理,com.alicp.jetcache.anno.support.ConfigMap
中保存方法與緩存注解配置信息的映射關系 -
com.alicp.jetcache.anno.config.ConfigSelector
繼承了AdviceModeImportSelector,通過@Import
注解他的selectImports
方法會被調用,根據不同的AdviceMode導入不同的配置類,可以看到會返回一個JetCacheProxyConfiguration類名稱,那么它也會被注入
com.alicp.jetcache.anno.config.JetCacheProxyConfiguration
是配置AOP的配置類,代碼如下:
@Configuration
public class JetCacheProxyConfiguration implements ImportAware, ApplicationContextAware {
protected AnnotationAttributes enableMethodCache;
private ApplicationContext applicationContext;
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
// 獲取 @EnableMethodCache 注解信息
this.enableMethodCache = AnnotationAttributes.fromMap(
importMetadata.getAnnotationAttributes(EnableMethodCache.class.getName(), false));
if (this.enableMethodCache == null) {
throw new IllegalArgumentException(
"@EnableMethodCache is not present on importing class " + importMetadata.getClassName());
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Bean(name = CacheAdvisor.CACHE_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public CacheAdvisor jetcacheAdvisor(JetCacheInterceptor jetCacheInterceptor) {
CacheAdvisor advisor = new CacheAdvisor();
// bean的名稱:jetcache2.internalCacheAdvisor
advisor.setAdviceBeanName(CacheAdvisor.CACHE_ADVISOR_BEAN_NAME);
// 設置緩存攔截器為 JetCacheInterceptor
advisor.setAdvice(jetCacheInterceptor);
// 設置需要掃描的包
advisor.setBasePackages(this.enableMethodCache.getStringArray("basePackages"));
// 設置優先級,默認 Integer 的最大值,最低優先級
advisor.setOrder(this.enableMethodCache.<Integer>getNumber("order"));
return advisor;
}
/**
* 注入一個 JetCacheInterceptor 攔截器,設置為框架內部的角色
*
* @return JetCacheInterceptor
*/
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public JetCacheInterceptor jetCacheInterceptor() {
return new JetCacheInterceptor();
}
}
因為JetCacheProxyConfiguration是通過@Import
注解注入的並且實現了ImportAware
接口,當被注入Bean的時候會先調用其setImportMetadata
方法(這里好像必須添加@Configuration注解,不然無法被Spring識別出來)獲取到@EnableMethodCache
注解的元信息
其中定義了兩個Bean:
com.alicp.jetcache.anno.aop.JetCacheInterceptor
:實現了aop中的MethodInterceptor方法攔截器,可用於aop攔截方法后執行相關處理
com.alicp.jetcache.anno.aop.CacheAdvisor
:
-
繼承了
org.springframework.aop.support.AbstractBeanFactoryPointcutAdvisor
,將會作為一個AOP切面 -
設置了通知advice為JetCacheInterceptor,也就是說被攔截的方法都會進入JetCacheInterceptor,JetCacheInterceptor就作為JetCache的入口了
-
根據注解設置了需要掃描的包路徑以及優先級,默認是最低優先級
-
CacheAdvisor實現了
org.springframework.aopPointcutAdvisor
接口的getPointcut()
方法,設置這個切面的切入點為com.alicp.jetcache.anno.aop.CachePointcut
-
從CachePointcut作為切入點
-
實現了
org.springframework.aop.ClassFilter
接口,用於判斷哪些類需要被攔截 -
實現了
org.springframework.aop.MethodMatcher
接口,用於判斷哪些類中的哪些方法會被攔截 -
在判斷方法是否需要進入JetCache的JetCacheInterceptor過程中,會解析方法上面的JetCache相關緩存注解,將配置信息封裝
com.alicp.jetcache.anno.methodCacheInvokeConfig
對象中,並把它保存至com.alicp.jetcache.anno.support.ConfigMap
對象中
-
總結:@EnableMethodCache注解主要就是生成一個AOP切面用於攔截帶有緩存注解的方法
@EnableCreateCacheAnnotation
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({CommonConfiguration.class, CreateCacheAnnotationBeanPostProcessor.class})
public @interface EnableCreateCacheAnnotation {
}
相比@EnableMethodCache注解,沒有相關屬性,同樣會導入CommonConfiguration類
不同的是將導入com.alicp.jetcache.anno.field.CreateCacheAnnotationBeanPostProcessor
類,它繼承了org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor
作為一個BeanPostProcessor,用於在Spring初始化bean的時候做一些操作
從代碼中可以看到他的作用是:如果這個bean內部存在添加了帶有@CreateCache
注解的字段(沒有添加static),會將這個字段作為需要注入的對象,解析成 com.alicp.jetcache.anno.field.LazyInitCache
緩存實例
LazyInitCache的主要代碼如下:
class LazyInitCache implements ProxyCache {
/**
* 是否初始化,用於懶加載
*/
private boolean inited;
/**
* 緩存實例
*/
private Cache cache;
/**
* 所處上下文
*/
private ConfigurableListableBeanFactory beanFactory;
/**
* CreateCache 注解元信息
*/
private CreateCache ann;
/**
* 字段
*/
private Field field;
/**
* 刷新策略
*/
private RefreshPolicy refreshPolicy;
/**
* 保護策略
*/
private PenetrationProtectConfig protectConfig;
public LazyInitCache(ConfigurableListableBeanFactory beanFactory, CreateCache ann, Field field) {
this.beanFactory = beanFactory;
this.ann = ann;
this.field = field;
CacheRefresh cr = field.getAnnotation(CacheRefresh.class);
if (cr != null) {
refreshPolicy = CacheConfigUtil.parseRefreshPolicy(cr);
}
CachePenetrationProtect penetrateProtect = field.getAnnotation(CachePenetrationProtect.class);
if (penetrateProtect != null) {
protectConfig = CacheConfigUtil.parsePenetrationProtectConfig(penetrateProtect);
}
}
private void checkInit() {
if (!inited) {
synchronized (this) {
if (!inited) {
init();
inited = true;
}
}
}
}
/**
* 獲取緩存實例,不存在則新建
*
* @return 緩存實例
*/
@Override
public Cache getTargetCache() {
checkInit();
return cache;
}
private void init() {
if (inited) {
throw new IllegalStateException();
}
// 從 spring 的容器中獲取全局緩存配置 GlobalCacheConfig 對象
GlobalCacheConfig globalCacheConfig = beanFactory.getBean(GlobalCacheConfig.class);
ConfigProvider configProvider = beanFactory.getBean(ConfigProvider.class);
// 將注解信息封裝到 CachedAnnoConfig 對象中
CachedAnnoConfig cac = new CachedAnnoConfig();
cac.setArea(ann.area());
cac.setName(ann.name());
cac.setTimeUnit(ann.timeUnit());
cac.setExpire(ann.expire());
cac.setLocalExpire(ann.localExpire());
cac.setCacheType(ann.cacheType());
cac.setLocalLimit(ann.localLimit());
cac.setSerialPolicy(ann.serialPolicy());
cac.setKeyConvertor(ann.keyConvertor());
cac.setRefreshPolicy(refreshPolicy);
cac.setPenetrationProtectConfig(protectConfig);
String cacheName = cac.getName();
if (CacheConsts.isUndefined(cacheName)) {
String[] hiddenPackages = globalCacheConfig.getHiddenPackages();
CacheNameGenerator g = configProvider.createCacheNameGenerator(hiddenPackages);
cacheName = g.generateCacheName(field);
}
// 從緩存實例管理器中獲取或者創建對應的緩存實例
cache = configProvider.getCacheContext().__createOrGetCache(cac, ann.area(), cacheName);
}
}
-
可以看到通過
@CreateCache
創建的緩存實例也可以添加@CacheRefresh
和@CachePenetrationProtect
注解 -
在AbstractCache抽象類的computeIfAbsentImpl方法中我們有講到,如果緩存實例是ProxyCache類型,則會先調用其
getTargetCache()
方法獲取緩存實例對象,所以LazyInitCache在第一次訪問的時候才進行初始化,並根據緩存注解配置信息創建(存在則直接獲取)一個緩存實例
總結:@EnableCreateCacheAnnotation注解主要是支持@CreateCache能夠創建緩存實例
通過@EnableMethodCache
和@EnableCreateCacheAnnotation
兩個注解,加上前面的解析配置過程
,已經完成的JetCache的解析與初始化過程,那么接下來我們來看看JetCache如何處理被攔截的方法。
攔截器
從com.alicp.jetcache.anno.aop.CachePointcut
切入點判斷方法是否需要攔截的邏輯:
-
方法所在的類對象是否匹配,除去以"java"、"org.springframework"開頭和包含"$$EnhancerBySpringCGLIB$$"、"$$FastClassBySpringCGLIB$$"的類,該類是否在我們通過
@EnableMethodCache
注解配置的basePackages中 -
從
ConfigMap
獲取方法對應的CacheInvokeConfig
對象,也就是獲取緩存配置信息- 如果是一個空對象,那么不需要被攔截,因為前面已經判斷了所在的類是否需要被攔截,而這個類中並不是所有的方法都會添加緩存注解,所以這一類的方法會設置一個空對象(定義在CacheInvokeConfig內部的一個靜態對象添加了final修飾),保存在ConfigMap中
- 如果不為null,則需被攔截
- 通過CacheConfigUtil解析這個方法的緩存注解,如果有@Cached注解或者@CacheInvalidate注解或者@CacheUpdate注解,先解析注解生成CacheInvokeConfig對象保存至ConfigMap中,然后該方法會被攔截,否在保存一個空對象不會被攔截
ConfigProvider
com.alicp.jetcache.anno.support.ConfigProvide
是一個配置提供者對象,包含了JetCache的全局配置、緩存實例管理器、緩存value轉換器、緩存key轉換器、上下文和監控指標相關信息,主要代碼如下:
public class ConfigProvider extends AbstractLifecycle {
/**
* 緩存的全局配置
*/
@Resource
protected GlobalCacheConfig globalCacheConfig;
/**
* 緩存實例管理器
*/
protected SimpleCacheManager cacheManager;
/**
* 根據不同類型生成緩存數據轉換函數的轉換器
*/
protected EncoderParser encoderParser;
/**
* 根據不同類型生成緩存 Key 轉換函數的轉換器
*/
protected KeyConvertorParser keyConvertorParser;
/**
* 緩存監控指標管理器
*/
protected CacheMonitorManager cacheMonitorManager;
/**
* 打印緩存各項指標的函數
*/
private Consumer<StatInfo> metricsCallback = new StatInfoLogger(false);
/**
* 緩存更新事件(REMOVE OR PUT)消息接收者,無實現類
* 我們可以自己實現 CacheMessagePublisher 用於統計一些緩存的命中信息
*/
private CacheMessagePublisher cacheMessagePublisher;
/**
* 默認的緩存監控指標管理器
*/
private CacheMonitorManager defaultCacheMonitorManager = new DefaultCacheMonitorManager();
/**
* 緩存上下文
*/
private CacheContext cacheContext;
public ConfigProvider() {
cacheManager = SimpleCacheManager.defaultManager;
encoderParser = new DefaultEncoderParser();
keyConvertorParser = new DefaultKeyConvertorParser();
cacheMonitorManager = defaultCacheMonitorManager;
}
@Override
public void doInit() {
// 啟動緩存指標監控器,周期性打印各項指標
initDefaultCacheMonitorInstaller();
// 初始化緩存上下文
cacheContext = newContext();
}
protected void initDefaultCacheMonitorInstaller() {
if (cacheMonitorManager == defaultCacheMonitorManager) {
DefaultCacheMonitorManager installer = (DefaultCacheMonitorManager) cacheMonitorManager;
installer.setGlobalCacheConfig(globalCacheConfig);
installer.setMetricsCallback(metricsCallback);
if (cacheMessagePublisher != null) {
installer.setCacheMessagePublisher(cacheMessagePublisher);
}
// 啟動緩存指標監控器
installer.init();
}
}
@Override
public void doShutdown() {
shutdownDefaultCacheMonitorInstaller();
cacheManager.rebuild();
}
protected void shutdownDefaultCacheMonitorInstaller() {
if (cacheMonitorManager == defaultCacheMonitorManager) {
((DefaultCacheMonitorManager) cacheMonitorManager).shutdown();
}
}
/**
* 根據編碼類型通過緩存value轉換器生成編碼函數
*
* @param valueEncoder 編碼類型
* @return 編碼函數
*/
public Function<Object, byte[]> parseValueEncoder(String valueEncoder) {
return encoderParser.parseEncoder(valueEncoder);
}
/**
* 根據解碼類型通過緩存value轉換器生成解碼函數
*
* @param valueDecoder 解碼類型
* @return 解碼函數
*/
public Function<byte[], Object> parseValueDecoder(String valueDecoder) {
return encoderParser.parseDecoder(valueDecoder);
}
/**
* 根據轉換類型通過緩存key轉換器生成轉換函數
*
* @param convertor 轉換類型
* @return 轉換函數
*/
public Function<Object, Object> parseKeyConvertor(String convertor) {
return keyConvertorParser.parseKeyConvertor(convertor);
}
public CacheNameGenerator createCacheNameGenerator(String[] hiddenPackages) {
return new DefaultCacheNameGenerator(hiddenPackages);
}
protected CacheContext newContext() {
return new CacheContext(this, globalCacheConfig);
}
}
繼承了com.alicp.jetcache.anno.support.AbstractLifecycle
,查看其代碼可以看到有兩個方法,分別為init()
初始化方法和shutdown()
銷毀方法,因為分別添加了@PostConstruct
注解和@PreDestroy
注解,所以在Spring初始化時會調用init(),在Spring容器銷毀時會調用shutdown()方法,內部分別調用doInit()和doShutdown(),這兩個方法交由子類實現
在doInit()方法中先啟動緩存指標監控器,用於周期性打印各項緩存指標,然后初始化CacheContext緩存上下文,SpringConfigProvider返回的是SpringConfigContext
在doShutdown()方法中關閉緩存指標監控器,清除緩存實例
CacheContext
com.alicp.jetcache.anno.support.CacheContext
緩存上下文主要為每一個被攔截的請求創建緩存上下文,構建對應的緩存實例,主要代碼如下:
public class CacheContext {
private static Logger logger = LoggerFactory.getLogger(CacheContext.class);
private static ThreadLocal<CacheThreadLocal> cacheThreadLocal = new ThreadLocal<CacheThreadLocal>() {
@Override
protected CacheThreadLocal initialValue() {
return new CacheThreadLocal();
}
};
/**
* JetCache 緩存的管理器(包含很多信息)
*/
private ConfigProvider configProvider;
/**
* 緩存的全局配置
*/
private GlobalCacheConfig globalCacheConfig;
/**
* 緩存實例管理器
*/
protected SimpleCacheManager cacheManager;
public CacheContext(ConfigProvider configProvider, GlobalCacheConfig globalCacheConfig) {
this.globalCacheConfig = globalCacheConfig;
this.configProvider = configProvider;
cacheManager = configProvider.getCacheManager();
}
public CacheInvokeContext createCacheInvokeContext(ConfigMap configMap) {
// 創建一個本次調用的上下文
CacheInvokeContext c = newCacheInvokeContext();
// 添加一個函數,后續用於獲取緩存實例
// 根據注解配置信息獲取緩存實例對象,不存在則創建並設置到緩存注解配置類中
c.setCacheFunction((invokeContext, cacheAnnoConfig) -> {
Cache cache = cacheAnnoConfig.getCache();
if (cache == null) {
if (cacheAnnoConfig instanceof CachedAnnoConfig) { // 緩存注解
// 根據配置創建一個緩存實例對象,通過 CacheBuilder
cache = createCacheByCachedConfig((CachedAnnoConfig) cacheAnnoConfig, invokeContext);
} else if ((cacheAnnoConfig instanceof CacheInvalidateAnnoConfig) || (cacheAnnoConfig instanceof CacheUpdateAnnoConfig)) { // 更新/使失效緩存注解
CacheInvokeConfig cacheDefineConfig = configMap.getByCacheName(cacheAnnoConfig.getArea(), cacheAnnoConfig.getName());
if (cacheDefineConfig == null) {
String message = "can't find @Cached definition with area=" + cacheAnnoConfig.getArea()
+ " name=" + cacheAnnoConfig.getName() +
", specified in " + cacheAnnoConfig.getDefineMethod();
CacheConfigException e = new CacheConfigException(message);
logger.error("Cache operation aborted because can't find @Cached definition", e);
return null;
}
cache = createCacheByCachedConfig(cacheDefineConfig.getCachedAnnoConfig(), invokeContext);
}
cacheAnnoConfig.setCache(cache);
}
return cache;
});
return c;
}
private Cache createCacheByCachedConfig(CachedAnnoConfig ac, CacheInvokeContext invokeContext) {
// 緩存區域
String area = ac.getArea();
// 緩存實例名稱
String cacheName = ac.getName();
if (CacheConsts.isUndefined(cacheName)) { // 沒有定義緩存實例名稱
// 生成緩存實例名稱:類名+方法名+(參數類型)
cacheName = configProvider.createCacheNameGenerator(invokeContext.getHiddenPackages())
.generateCacheName(invokeContext.getMethod(), invokeContext.getTargetObject());
}
// 創建緩存實例對象
Cache cache = __createOrGetCache(ac, area, cacheName);
return cache;
}
@Deprecated
public <K, V> Cache<K, V> getCache(String cacheName) {
return getCache(CacheConsts.DEFAULT_AREA, cacheName);
}
@Deprecated
public <K, V> Cache<K, V> getCache(String area, String cacheName) {
Cache cache = cacheManager.getCacheWithoutCreate(area, cacheName);
return cache;
}
public Cache __createOrGetCache(CachedAnnoConfig cachedAnnoConfig, String area, String cacheName) {
// 緩存名稱拼接
String fullCacheName = area + "_" + cacheName;
// 從緩存實例管理器中根據緩存區域和緩存實例名稱獲取緩存實例
Cache cache = cacheManager.getCacheWithoutCreate(area, cacheName);
if (cache == null) {
synchronized (this) { // 加鎖
// 再次確認
cache = cacheManager.getCacheWithoutCreate(area, cacheName);
if (cache == null) {
/*
* 緩存區域的名稱是否作為緩存 key 名稱前綴,默認為 true ,我一般設置為 false
*/
if (globalCacheConfig.isAreaInCacheName()) {
// for compatible reason, if we use default configuration, the prefix should same to that version <=2.4.3
cache = buildCache(cachedAnnoConfig, area, fullCacheName);
} else {
// 構建一個緩存實例
cache = buildCache(cachedAnnoConfig, area, cacheName);
}
cacheManager.putCache(area, cacheName, cache);
}
}
}
return cache;
}
protected Cache buildCache(CachedAnnoConfig cachedAnnoConfig, String area, String cacheName) {
Cache cache;
if (cachedAnnoConfig.getCacheType() == CacheType.LOCAL) { // 本地緩存
cache = buildLocal(cachedAnnoConfig, area);
} else if (cachedAnnoConfig.getCacheType() == CacheType.REMOTE) { // 遠程緩存
cache = buildRemote(cachedAnnoConfig, area, cacheName);
} else { // 兩級緩存
// 構建本地緩存實例
Cache local = buildLocal(cachedAnnoConfig, area);
// 構建遠程緩存實例
Cache remote = buildRemote(cachedAnnoConfig, area, cacheName);
// 兩級緩存時是否單獨設置了本地緩存失效時間 localExpire
boolean useExpireOfSubCache = cachedAnnoConfig.getLocalExpire() > 0;
// 創建一個兩級緩存CacheBuilder
cache = MultiLevelCacheBuilder.createMultiLevelCacheBuilder()
.expireAfterWrite(remote.config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS)
.addCache(local, remote)
.useExpireOfSubCache(useExpireOfSubCache)
.cacheNullValue(cachedAnnoConfig.isCacheNullValue())
.buildCache();
}
// 設置緩存刷新策略
cache.config().setRefreshPolicy(cachedAnnoConfig.getRefreshPolicy());
// 將 cache 封裝成 CacheHandlerRefreshCache,也就是 RefreshCache 類型
// 后續添加刷新任務時會判斷是否為 RefreshCache 類型,然后決定是否執行 addOrUpdateRefreshTask 方法,添加刷新任務,沒有刷新策略不會添加
cache = new CacheHandler.CacheHandlerRefreshCache(cache);
// 設置緩存未命中時,JVM是否只允許一個線程執行方法,其他線程等待,全局配置默認為false
cache.config().setCachePenetrationProtect(globalCacheConfig.isPenetrationProtect());
PenetrationProtectConfig protectConfig = cachedAnnoConfig.getPenetrationProtectConfig();
if (protectConfig != null) { // 方法的@CachePenetrationProtect注解
cache.config().setCachePenetrationProtect(protectConfig.isPenetrationProtect());
cache.config().setPenetrationProtectTimeout(protectConfig.getPenetrationProtectTimeout());
}
if (configProvider.getCacheMonitorManager() != null) {
// 添加監控統計配置
configProvider.getCacheMonitorManager().addMonitors(area, cacheName, cache);
}
return cache;
}
protected Cache buildRemote(CachedAnnoConfig cachedAnnoConfig, String area, String cacheName) {
// 獲取緩存區域對應的 CacheBuilder 構造器
ExternalCacheBuilder cacheBuilder = (ExternalCacheBuilder) globalCacheConfig.getRemoteCacheBuilders().get(area);
if (cacheBuilder == null) {
throw new CacheConfigException("no remote cache builder: " + area);
}
// 克隆一個 CacheBuilder 構造器,因為不同緩存實例有不同的配置
cacheBuilder = (ExternalCacheBuilder) cacheBuilder.clone();
if (cachedAnnoConfig.getExpire() > 0 ) {
// 設置失效時間
cacheBuilder.expireAfterWrite(cachedAnnoConfig.getExpire(), cachedAnnoConfig.getTimeUnit());
}
// 設置緩存 key 的前綴
if (cacheBuilder.getConfig().getKeyPrefix() != null) {
// 配置文件中配置了 prefix,則設置為 prefix+cacheName
cacheBuilder.setKeyPrefix(cacheBuilder.getConfig().getKeyPrefix() + cacheName);
} else { // 設置為 cacheName
cacheBuilder.setKeyPrefix(cacheName);
}
if (!CacheConsts.isUndefined(cachedAnnoConfig.getKeyConvertor())) { // 如果注解中設置了Key的轉換方式則替換,否則還是使用全局的
// 設置 key 的轉換器,只支持 FASTJSON
cacheBuilder.setKeyConvertor(configProvider.parseKeyConvertor(cachedAnnoConfig.getKeyConvertor()));
}
if (!CacheConsts.isUndefined(cachedAnnoConfig.getSerialPolicy())) {
// 緩存數據保存至遠程需要進行編碼和解碼,所以這里設置其編碼和解碼方式,KRYO 和 JAVA 可選擇
cacheBuilder.setValueEncoder(configProvider.parseValueEncoder(cachedAnnoConfig.getSerialPolicy()));
cacheBuilder.setValueDecoder(configProvider.parseValueDecoder(cachedAnnoConfig.getSerialPolicy()));
}
// 設置是否緩存 null 值
cacheBuilder.setCacheNullValue(cachedAnnoConfig.isCacheNullValue());
return cacheBuilder.buildCache();
}
protected Cache buildLocal(CachedAnnoConfig cachedAnnoConfig, String area) {
// 獲取緩存區域對應的 CacheBuilder 構造器
EmbeddedCacheBuilder cacheBuilder = (EmbeddedCacheBuilder) globalCacheConfig.getLocalCacheBuilders().get(area);
if (cacheBuilder == null) {
throw new CacheConfigException("no local cache builder: " + area);
}
// 克隆一個 CacheBuilder 構造器,因為不同緩存實例有不同的配置
cacheBuilder = (EmbeddedCacheBuilder) cacheBuilder.clone();
if (cachedAnnoConfig.getLocalLimit() != CacheConsts.UNDEFINED_INT) {
// 本地緩存數量限制
cacheBuilder.setLimit(cachedAnnoConfig.getLocalLimit());
}
if (cachedAnnoConfig.getCacheType() == CacheType.BOTH && cachedAnnoConfig.getLocalExpire() > 0) {
// 設置本地緩存失效時間,前提是多級緩存,一般和遠程緩存保持一致不設置
cacheBuilder.expireAfterWrite(cachedAnnoConfig.getLocalExpire(), cachedAnnoConfig.getTimeUnit());
} else if (cachedAnnoConfig.getExpire() > 0) {
// 設置失效時間
cacheBuilder.expireAfterWrite(cachedAnnoConfig.getExpire(), cachedAnnoConfig.getTimeUnit());
}
if (!CacheConsts.isUndefined(cachedAnnoConfig.getKeyConvertor())) {
cacheBuilder.setKeyConvertor(configProvider.parseKeyConvertor(cachedAnnoConfig.getKeyConvertor()));
}
// 設置是否緩存 null 值
cacheBuilder.setCacheNullValue(cachedAnnoConfig.isCacheNullValue());
// 構建一個緩存實例
return cacheBuilder.buildCache();
}
protected CacheInvokeContext newCacheInvokeContext() {
return new CacheInvokeContext();
}
}
createCacheInvokeContext
方法返回一個本次調用的上下文CacheInvokeContext,為這個上下文設置緩存函數,用於獲取或者構建緩存實例,這個函數在CacheHandler中會被調用,我們來看看這個函數的處理邏輯:有兩個入參,分別為本次調用的上下文和緩存注解的配置信息
首先從緩存注解的配置信息中獲取緩存實例,如果不為null則直接返回,否則調用createCacheByCachedConfig
方法,根據配置通過CacheBuilder構造器創建一個緩存實例對象
createCacheByCachedConfig
方法:
-
如果沒有定義緩存實例名稱(@Cached注解中的name配置),則生成
類名+方法名+(參數類型)
作為緩存實例名稱 -
然后調用
__createOrGetCache
方法
__createOrGetCache
方法:
-
通過緩存實例管理器SimpleCacheManager根據緩存區域area和緩存實例名稱cacheName獲取緩存實例對象,如果不為null則直接返回,判斷緩存實例對象是否為null為進行兩次確認,第二次會給當前CacheContext加鎖進行判斷,避免線程不安全
-
緩存實例對象還是為null的話,先判斷緩存區域area是否添加至緩存實例名稱中,是的話"area_cacheName"為緩存實例名稱,然后調用
buildCache
方法創建一個緩存實例對象
buildCache
方法:根據緩存實例類型構建不同的緩存實例對象,處理邏輯如下:
- CacheType為
LOCAL
則調用buildLocal
方法:
1.1. 從GlobalCacheConfig全局配置的localCacheBuilders(保存本地緩存CacheBuilder構造器的集合)中的獲取本地緩存該緩存區域的構造器,在之前講到的'JetCacheAutoConfiguration自動配置'中有說到過,會將初始化好的構造器從AutoConfigureBeans中添加至GlobalCacheConfig中
1.2. 克隆一個 CacheBuilder 構造器,因為不同緩存實例有不同的配置
1.3. 將緩存注解的配置信息設置到構造器中,有以下配置:
- 如果配置了localLimit,則設置本地緩存最大數量limit的值
- 如果CacheType為BOTH並且配置了localExpire(大於0),則設置有效時間expireAfterWrite的值為localExpire,否則如果配置的expire大於0,則設置其值為expire
- 如果配置了keyConvertor,則根據該值生成一個轉換函數,沒有配置的話在初始化構造器的時候根據全局配置可能已經生成了一個轉換函數(我一般在全局配置中設置)
- 設置是否緩存null值
1.4. 通過調用構造器的buildCache()方法構建一個緩存實例對象,該方法在之前講到的'CacheBuilder構造器'中有分析過
- CacheType為
REMOTE
則調用buildRemote
方法:
1.1. 從GlobalCacheConfig全局配置的remoteCacheBuilders(保存遠程緩存CacheBuilder構造器的集合)中的獲取遠程緩存該緩存區域的構造器
1.2. 克隆一個 CacheBuilder 構造器,因為不同緩存實例有不同的配置
1.3. 將緩存注解的配置信息設置到構造器中,有以下配置:
- 如果配置了expire,則設置遠程緩存有效時間expireAfterWrite的值
- 如果全局設置遠程緩存的緩存key的前綴keyPrefix,則設置緩存key的前綴為"keyPrefix+cacheName",否則我為"cacheName"
- 如果配置了keyConvertor,則根據該值生成一個轉換函數,沒有配置的話在初始化構造器的時候根據全局配置可能已經生成了一個轉換函數(我一般在全局配置中設置)
- 如果設置了serialPolicy,則根據該值生成編碼和解碼函數,沒有配置的話在初始化構造器的時候根據全局配置可能已經生成了編碼函數和解碼函數(我一般在全局配置中設置)
- 設置是否緩存null值
1.4. 通過調用構造器的buildCache()方法構建一個緩存實例對象
- CacheType為
BOTH
則調用buildLocal
方法構建本地緩存實例,調用buildRemote
方法構建遠程緩存實例:
1.1. 創建一個MultiLevelCacheBuilder構造器
1.2. 設置有效時間為遠程緩存的有效時間、添加local和remote緩存實例、設置是否單獨配置了本地緩存的失效時間(是否有配置localExpire)、設置是否緩存null值
1.3. 通過調用構造器的buildCache()方法構建一個緩存實例對象
-
設置刷新策略RefreshPolicy,沒有的話為null
-
將緩存實例對象封裝成CacheHandlerRefreshCache對象,用於后續的添加刷新任務,在之前的'AbstractCache抽象類'有講到
-
設置是否開啟緩存未命中時加載方法的保護模式,全局默認為false
-
將緩存實例添加至監控管理器中
JetCacheInterceptor
被攔截后的處理在com.alicp.jetcache.anno.aop.JetCacheInterceptor
中,代碼如下:
public class JetCacheInterceptor implements MethodInterceptor, ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(JetCacheInterceptor.class);
/**
* 緩存實例注解信息
*/
@Autowired
private ConfigMap cacheConfigMap;
/**
* Spring 上下文
*/
private ApplicationContext applicationContext;
/**
* 緩存的全局配置
*/
private GlobalCacheConfig globalCacheConfig;
/**
* JetCache 緩存的管理器(包含很多信息)
*/
ConfigProvider configProvider;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
if (configProvider == null) {
/**
* 這里會獲取到 SpringConfigProvider 可查看 {@link com.alicp.jetcache.autoconfigure.JetCacheAutoConfiguration}
*/
configProvider = applicationContext.getBean(ConfigProvider.class);
}
if (configProvider != null && globalCacheConfig == null) {
globalCacheConfig = configProvider.getGlobalCacheConfig();
}
if (globalCacheConfig == null || !globalCacheConfig.isEnableMethodCache()) {
return invocation.proceed();
}
// 獲取被攔截的方法
Method method = invocation.getMethod();
// 獲取被攔截的對象
Object obj = invocation.getThis();
CacheInvokeConfig cac = null;
if (obj != null) {
// 獲取改方法的Key(方法所在類名+方法名+(參數類型)+方法返回類型+_被攔截的類名)
String key = CachePointcut.getKey(method, obj.getClass());
// 獲取該方法的緩存注解信息,在 Pointcut 中已經對注解進行解析並放入 ConfigMap 中
cac = cacheConfigMap.getByMethodInfo(key);
}
if(logger.isTraceEnabled()){
logger.trace("JetCacheInterceptor invoke. foundJetCacheConfig={}, method={}.{}(), targetClass={}",
cac != null,
method.getDeclaringClass().getName(),
method.getName(),
invocation.getThis() == null ? null : invocation.getThis().getClass().getName());
}
// 無緩存相關注解配置信息表明無須緩存,直接執行該方法
if (cac == null || cac == CacheInvokeConfig.getNoCacheInvokeConfigInstance()) {
return invocation.proceed();
}
// 為本次調用創建一個上下文對象,包含對應的緩存實例
CacheInvokeContext context = configProvider.getCacheContext().createCacheInvokeContext(cacheConfigMap);
context.setTargetObject(invocation.getThis());
context.setInvoker(invocation::proceed);
context.setMethod(method);
context.setArgs(invocation.getArguments());
context.setCacheInvokeConfig(cac);
context.setHiddenPackages(globalCacheConfig.getHiddenPackages());
// 繼續往下執行
return CacheHandler.invoke(context);
}
public void setCacheConfigMap(ConfigMap cacheConfigMap) {
this.cacheConfigMap = cacheConfigMap;
}
}
從ConfigMap
中獲取被攔截的方法對象的緩存配置信息,如果沒有則直接執行該方法,否則繼續往下執行
根據CacheContext
對象(SpringCacheContext,因為在之前講到的'JetCacheAutoConfiguration自動配置'中有說到注入的是SpringConfigProvider對象,在其初始化方法中調用newContext()方法生成SpringCacheContext)調用其createCacheInvokeContext
方法為本次調用創建一個上下文CacheInvokeContext
,並設置獲取緩存實例函數,具體實現邏輯查看上面講到的CacheContext
設置本次調用上下文的targetObject為被攔截對象,invoker為被攔截對象的調用器,method為被攔截方法,args為方法入參,cacheInvokeConfig為緩存配置信息,hiddenPackages為緩存實例名稱需要截斷的包名
通過CacheHandler的invoke方法繼續往下執行
CacheHandler
com.alicp.jetcache.anno.method.CacheHandler
用於JetCache處理被攔截的方法,部分代碼如下:
public class CacheHandler implements InvocationHandler {
public static Object invoke(CacheInvokeContext context) throws Throwable {
if (context.getCacheInvokeConfig().isEnableCacheContext()) {
try {
CacheContextSupport._enable();
return doInvoke(context);
} finally {
CacheContextSupport._disable();
}
} else {
return doInvoke(context);
}
}
private static Object doInvoke(CacheInvokeContext context) throws Throwable {
// 獲取緩存實例配置
CacheInvokeConfig cic = context.getCacheInvokeConfig();
// 獲取注解配置信息
CachedAnnoConfig cachedConfig = cic.getCachedAnnoConfig();
if (cachedConfig != null && (cachedConfig.isEnabled() || CacheContextSupport._isEnabled())) {
// 經過緩存中獲取結果
return invokeWithCached(context);
} else if (cic.getInvalidateAnnoConfigs() != null || cic.getUpdateAnnoConfig() != null) {
// 根據結果刪除或者更新緩存
return invokeWithInvalidateOrUpdate(context);
} else {
// 執行該方法
return invokeOrigin(context);
}
}
private static Object invokeWithCached(CacheInvokeContext context) throws Throwable {
// 獲取本地調用的上下文
CacheInvokeConfig cic = context.getCacheInvokeConfig();
// 獲取注解配置信息
CachedAnnoConfig cac = cic.getCachedAnnoConfig();
// 獲取緩存實例對象(不存在則會創建並設置到 cac 中)
// 可在 JetCacheInterceptor 創建本次調用的上下文時,調用 createCacheInvokeContext(cacheConfigMap) 方法中查看詳情
Cache cache = context.getCacheFunction().apply(context, cac);
if (cache == null) {
logger.error("no cache with name: " + context.getMethod());
// 無緩存實例對象,執行原有方法
return invokeOrigin(context);
}
// 生成緩存 Key 對象(注解中沒有配置的話就是入參,沒有入參則為 "_$JETCACHE_NULL_KEY$_" )
Object key = ExpressionUtil.evalKey(context, cic.getCachedAnnoConfig());
if (key == null) {
// 生成緩存 Key 失敗則執行原方法,並記錄 CacheLoadEvent 事件
return loadAndCount(context, cache, key);
}
/*
* 根據配置的 condition 來決定是否走緩存
* 緩存注解中沒有配置 condition 表示所有請求都走緩存
* 配置了 condition 表示滿足條件的才走緩存
*/
if (!ExpressionUtil.evalCondition(context, cic.getCachedAnnoConfig())) {
// 不滿足 condition 則直接執行原方法,並記錄 CacheLoadEvent 事件
return loadAndCount(context, cache, key);
}
try {
// 創建一個執行原有方法的函數
CacheLoader loader = new CacheLoader() {
@Override
public Object load(Object k) throws Throwable {
Object result = invokeOrigin(context);
context.setResult(result);
return result;
}
@Override
public boolean vetoCacheUpdate() {
// 本次執行原方法后是否需要更新緩存
return !ExpressionUtil.evalPostCondition(context, cic.getCachedAnnoConfig());
}
};
// 獲取結果
Object result = cache.computeIfAbsent(key, loader);
return result;
} catch (CacheInvokeException e) {
throw e.getCause();
}
}
private static Object loadAndCount(CacheInvokeContext context, Cache cache, Object key) throws Throwable {
long t = System.currentTimeMillis();
Object v = null;
boolean success = false;
try {
// 調用原有方法
v = invokeOrigin(context);
success = true;
} finally {
t = System.currentTimeMillis() - t;
// 發送 CacheLoadEvent 事件
CacheLoadEvent event = new CacheLoadEvent(cache, t, key, v, success);
while (cache instanceof ProxyCache) {
cache = ((ProxyCache) cache).getTargetCache();
}
if (cache instanceof AbstractCache) {
((AbstractCache) cache).notify(event);
}
}
return v;
}
private static Object invokeOrigin(CacheInvokeContext context) throws Throwable {
// 執行被攔截的方法
return context.getInvoker().invoke();
}
}
直接查看invokeWithCached
方法:
-
獲取緩存注解信息
-
根據本地調用的上下文CacheInvokeContext獲取緩存實例對象(調用其cacheFunction函數),在CacheContext中有講到
-
如果緩存實例不存在則直接調用invokeOrigin方法,執行被攔截的對象的調用器
-
根據本次調用的上下文CacheInvokeContext生成緩存key,根據配置的緩存key的SpEL表達式生成,如果沒有配置則返回入參對象,如果沒有對象則返回"_ $JETCACHE_NULL_KEY$_"
-
根據配置condition表達式判斷是否需要走緩存
-
創建一個
CacheLoader
對象,用於執行被攔截的對象的調用器,也就是加載原有方法 -
調用緩存實例的
computeIfAbsent(key, loader)
方法獲取結果,這個方法的處理過程可查看'緩存API'這一小節
至此結束!!!😄😄😄