Want#
我們希望設計一套緩存API,適應不同的緩存產品,並且基於Spring框架完美集成應用開發。
本文旨在針對緩存產品定義一個輕量級的客戶端訪問框架,目標支持多種緩存產品,面向接口編程,目前支持簡單的CRUD。
引導#
目前大多數NoSQL產品的Java客戶端API都以完全實現某個NoSQL產品的特性而實現,而緩存只是一個feature,如果緩存API只針對緩存這一個feature,那么它能否可以定義的更易於使用,API是否能定義的更合理呢?
即:站在抽象緩存產品設計的角度定義一個API,而不是完整封裝NoSQL產品的客戶端訪問API
緩存產品定義#
以Memcached、Redis、MongoDB三類產品為例,后兩者可不止緩存這樣的一個feature:
- Memcached:純粹的分布式緩存產品,支持簡單kv存儲結構,優勢在於內存利用率高
- Redis:優秀的分布式緩存產品,支持多種存儲結構(set,list,map),優勢在於數據持久化和性能,不過還兼顧輕量級消息隊列這樣的私活
- MongoDB:遠不止緩存這一點feature,文檔型的數據庫,支持類SQL語法,性能據官網介紹很不錯(3.x版本使用了新的存儲引擎)
也許有人會說,為什么把MongoDB也作為緩存產品的一種選型呢?
廣義上講,內存中的一個Map結構就可以成為一個緩存了,因此MongoDB這種文檔型的NoSQL數據庫更不用說了。
以百度百科對緩存的解釋,適當補充
- 定義:數據交換的緩沖區
- 目標:提高數據讀取命中率,減少直接訪問底層存儲介質
- 特性:緩存數據持久化,讀寫同步控制,緩存數據過期,異步讀寫等等
僅僅以緩存定義來看,任何存取數據性能高於底層介質的存儲結構都可以作為緩存。
緩存應用場景#
- db數據緩沖池,常見的orm框架比如Mybatis、Hibernate都支持緩存結構設計,並支持以常見緩存產品redis,memcached等作為底層存儲。
- 緩存業務邏輯狀態,比如一段業務邏輯執行比較復雜並且消耗資源(cpu、內存),可考慮將執行結果緩存,下一次相同請求(請求參數相同)執行數據優先從緩存讀取。
業務邏輯增加緩存處理的樣例代碼
// 從緩存中獲取數據
Object result = cacheClient.get(key);
// 結果為空
if(result == null) {
// 執行業務處理
result = do(...);
// 存入緩存
cacheClient.put(key, result);
}
// 返回結果
return result;
緩存API定義#
我們的目標:盡可能的抽象緩存讀寫定義,最大限度的兼容各種底層緩存產品的能力(沒有蛀牙)
- 泛型接口,支持任意類型參數與返回
- 多種存儲結構(list,map)
- 過期,同步異步特性
存儲結構在接口方法維度上擴展
各類操作特性在Option對象上擴展
翻譯成代碼(代碼過多、非完整版本):
基礎API定義##
緩存抽象接口
package org.wit.ff.cache;
import java.util.List;
import java.util.Map;
/**
* Created by F.Fang on 2015/9/23.
* Version :2015/9/23
*/
public interface IAppCache {
/**
*
* @param key 鍵
* @param <K>
* @return 目標緩存中是否存在鍵
*/
<K> boolean contains(K key);
/**
*
* @param key 鍵
* @param value 值
* @param <K>
* @param <V>
* @return 存儲到目標緩存是否成功
*/
<K,V> boolean put(K key, V value);
/**
*
* @param key 鍵
* @param value 值
* @param option 超時,同步異步控制
* @param <K>
* @param <V>
* @return 存儲到目標緩存是否成功
*/
<K,V> boolean put(K key, V value, Option option);
/**
*
* @param key 鍵
* @param type 值
* @param <K>
* @param <V>
* @return 返回緩存系統目標鍵對應的值
*/
<K,V> V get(K key, Class<V> type);
/**
*
* @param key 鍵
* @param <K>
* @return 刪除目標緩存鍵是否成功
*/
<K> boolean remove(K key);
}
緩存可選項
package org.wit.ff.cache;
/**
* Created by F.Fang on 2015/9/23.
* Version :2015/9/23
*/
public class Option {
/**
* 超時時間.
*/
private long expireTime;
/**
* 超時類型.
*/
private ExpireType expireType;
/**
* 調用模式.
* 異步選項,默認同步(非異步)
*/
private boolean async;
public Option(){
// 默認是秒設置.
expireType = ExpireType.SECONDS;
}
public long getExpireTime() {
return expireTime;
}
public void setExpireTime(long expireTime) {
this.expireTime = expireTime;
}
public boolean isAsync() {
return async;
}
public void setAsync(boolean async) {
this.async = async;
}
public ExpireType getExpireType() {
return expireType;
}
public void setExpireType(ExpireType expireType) {
this.expireType = expireType;
}
}
過期時間枚舉
package org.wit.ff.cache;
/**
* Created by F.Fang on 2015/9/18.
* Version :2015/9/18
*/
public enum ExpireType {
SECONDS, DATETIME
}
序列化接口
package org.wit.ff.cache;
/**
* Created by F.Fang on 2015/9/15.
* Version :2015/9/15
*/
public interface ISerializer<T> {
byte[] serialize(T obj);
T deserialize(byte[] bytes, Class<T> type);
}
默認序列化實現
package org.wit.ff.cache.impl;
import org.springframework.util.SerializationUtils;
import org.wit.ff.cache.ISerializer;
/**
* Created by F.Fang on 2015/9/15.
* Version :2015/9/15
*/
public class DefaultSerializer<T> implements ISerializer<T>{
@Override
public byte[] serialize(T obj) {
return SerializationUtils.serialize(obj);
}
@Override
public T deserialize(byte[] bytes, Class<T> type) {
return (T)SerializationUtils.deserialize(bytes);
}
}
基於Redis的實現#
- 基於Jedis客戶端API的封裝
- 支持自定義序列化
- 底層與redis交互的數據類型均為bytes
緩存API實現##
Jedis緩存API實現
package org.wit.ff.cache.impl;
import org.wit.ff.cache.ExpireType;
import org.wit.ff.cache.IAppCache;
import org.wit.ff.cache.ISerializer;
import org.wit.ff.cache.Option;
import org.wit.ff.util.ByteUtil;
import org.wit.ff.util.ClassUtil;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by F.Fang on 2015/9/16.
* 目前的實現雖然不夠嚴密,但是基本夠用.
* 因為對於put操作,對於目前的業務場景是允許失敗的,因為下次執行正常業務邏輯處理時仍然可以重建緩存.
* Version :2015/9/16
*/
public class JedisAppCache implements IAppCache {
/**
* redis連接池.
*/
private JedisPool pool;
/**
* 序列化工具.
*/
private ISerializer serializer;
/**
* 全局超時選項.
*/
private Option option;
public JedisAppCache() {
serializer = new DefaultSerializer();
option = new Option();
}
@Override
public <K> boolean contains(K key) {
if (key == null) {
throw new IllegalArgumentException("key can't be null!");
}
try (Jedis jedis = pool.getResource()) {
byte[] kBytes = translateObjToBytes(key);
return jedis.exists(kBytes);
}
}
@Override
public <K, V> boolean put(K key, V value) {
return put(key, value, option);
}
@Override
public <K, V> boolean put(K key, V value, Option option) {
if (key == null || value == null) {
throw new IllegalArgumentException("key,value can't be null!");
}
try (Jedis jedis = pool.getResource()) {
byte[] kBytes = translateObjToBytes(key);
byte[] vBytes = translateObjToBytes(value);
// 暫時不考慮狀態碼的問題, 成功狀態碼為OK.
String code = jedis.set(kBytes, vBytes);
// 如果設置了合法的過期時間才設置超時.
setExpire(kBytes, option, jedis);
return "OK".equals(code);
}
}
@Override
public <K, V> V get(K key, Class<V> type) {
if (key == null || type == null) {
throw new IllegalArgumentException("key or type can't be null!");
}
try (Jedis jedis = pool.getResource()) {
byte[] kBytes = translateObjToBytes(key);
byte[] vBytes = jedis.get(kBytes);
if (vBytes == null) {
return null;
}
return translateBytesToObj(vBytes, type);
}
}
@Override
public <K> boolean remove(K key) {
if (key == null) {
throw new IllegalArgumentException("key can't be null!");
}
try (Jedis jedis = pool.getResource()) {
byte[] kBytes = translateObjToBytes(key);
// 狀態碼為0或1(key數量)都可認為是正確的.0表示key原本就不存在.
jedis.del(kBytes);
// 暫時不考慮狀態碼的問題.
return true;
}
}
private <T> byte[] translateObjToBytes(T val) {
byte[] valBytes;
if (val instanceof String) {
valBytes = ((String) val).getBytes();
} else {
Class<?> classType = ClassUtil.getWrapperClassType(val.getClass().getSimpleName());
if (classType != null) {
// 如果是基本類型. Boolean,Void不可能會出現在參數傳值類型的位置.
if (classType.equals(Integer.TYPE)) {
valBytes = ByteUtil.intToByte4((Integer) val);
} else if (classType.equals(Character.TYPE)) {
valBytes = ByteUtil.charToByte2((Character) val);
} else if (classType.equals(Long.TYPE)) {
valBytes = ByteUtil.longToByte8((Long) val);
} else if (classType.equals(Double.TYPE)) {
valBytes = ByteUtil.doubleToByte8((Double) val);
} else if (classType.equals(Float.TYPE)) {
valBytes = ByteUtil.floatToByte4((Float) val);
} else if(val instanceof byte[]) {
valBytes = (byte[])val;
} else {
throw new IllegalArgumentException("unsupported value type, classType is:" + classType);
}
} else {
// 其它均采用序列化
valBytes = serializer.serialize(val);
}
}
return valBytes;
}
private <T> T translateBytesToObj(byte[] bytes, Class<T> type) {
Object obj;
if (type.equals(String.class)) {
obj = new String(bytes);
} else {
Class<?> classType = ClassUtil.getWrapperClassType(type.getSimpleName());
if (classType != null) {
// 如果是基本類型. Boolean,Void不可能會出現在參數傳值類型的位置.
if (classType.equals(Integer.TYPE)) {
obj = ByteUtil.byte4ToInt(bytes);
} else if (classType.equals(Character.TYPE)) {
obj = ByteUtil.byte2ToChar(bytes);
} else if (classType.equals(Long.TYPE)) {
obj = ByteUtil.byte8ToLong(bytes);
} else if (classType.equals(Double.TYPE)) {
obj = ByteUtil.byte8ToDouble(bytes);
} else if (classType.equals(Float.TYPE)) {
obj = ByteUtil.byte4ToFloat(bytes);
} else {
throw new IllegalArgumentException("unsupported value type, classType is:" + classType);
}
} else {
// 其它均采用序列化
obj = serializer.deserialize(bytes,type);
}
}
return (T) obj;
}
private void setExpire(byte[] kBytes,Option option, Jedis jedis) {
if (option.getExpireType().equals(ExpireType.SECONDS)) {
int seconds = (int)option.getExpireTime()/1000;
if(seconds > 0){
jedis.expire(kBytes, seconds);
}
} else {
jedis.expireAt(kBytes, option.getExpireTime());
}
}
public void setPool(JedisPool pool) {
this.pool = pool;
}
public void setSerializer(ISerializer serializer) {
this.serializer = serializer;
}
public void setOption(Option option) {
this.option = option;
}
}
Spring配置文件(spring-redis.xml)
<context:property-placeholder location="redis.properties"/>
<!-- JedisPool -->
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="4" />
<property name="maxIdle" value="2" />
<property name="maxWaitMillis" value="10000" />
<property name="testOnBorrow" value="true" />
</bean>
<bean id="jedisPool" class="redis.clients.jedis.JedisPool" destroy-method="destroy">
<constructor-arg index="0" ref="jedisPoolConfig" />
<constructor-arg index="1" value="${redis.host}" />
<constructor-arg index="2" value="${redis.port}" />
<constructor-arg index="3" value="10000" />
<constructor-arg index="4" value="${redis.password}" />
<constructor-arg index="5" value="0" />
</bean>
<bean id="jedisAppCache" class="org.wit.ff.cache.impl.JedisAppCache" >
<property name="pool" ref="jedisPool" />
</bean>
Redis配置文件
redis.host=192.168.21.125
redis.port=6379
redis.password=xxx
基於memcached實現#
- 基於Xmemcached API實現
- 自定義序列化,byte數組類型,默認Xmemcached不執行序列化
緩存API實現##
Xmemcached緩存API實現
package org.wit.ff.cache.impl;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.exception.MemcachedException;
import org.wit.ff.cache.AppCacheException;
import org.wit.ff.cache.ExpireType;
import org.wit.ff.cache.IAppCache;
import org.wit.ff.cache.Option;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* Created by F.Fang on 2015/9/24.
* 基於xmemcached.
* Version :2015/9/24
*/
public class XMemAppCache implements IAppCache {
/**
* memcached客戶端.
*/
private MemcachedClient client;
/**
* 選項.
*/
private Option option;
public XMemAppCache(){
option = new Option();
}
@Override
public <K> boolean contains(K key) {
String strKey = translateToStr(key);
try {
return client.get(strKey) != null;
} catch (InterruptedException | MemcachedException |TimeoutException e){
throw new AppCacheException(e);
}
}
@Override
public <K, V> boolean put(K key, V value) {
return put(key,value,option);
}
@Override
public <K, V> boolean put(K key, V value, Option option) {
if(option.getExpireType().equals(ExpireType.DATETIME)){
throw new UnsupportedOperationException("memcached no support ExpireType(DATETIME) !");
}
// 目前考慮 set, add方法如果key已存在會發生異常.
// 當前對緩存均不考慮更新操作.
int seconds = (int)option.getExpireTime()/1000;
String strKey = translateToStr(key);
try {
if(option.isAsync()){
// 異步操作.
client.setWithNoReply(strKey, seconds, value);
return true;
} else {
return client.set(strKey, seconds, value);
}
} catch (InterruptedException | MemcachedException |TimeoutException e){
throw new AppCacheException(e);
}
}
@Override
public <K, V> V get(K key, Class<V> type) {
String strKey = translateToStr(key);
try {
return client.get(strKey);
} catch (InterruptedException | MemcachedException |TimeoutException e){
throw new AppCacheException(e);
}
}
@Override
public <K> boolean remove(K key) {
String strKey = translateToStr(key);
try {
return client.delete(strKey);
} catch (InterruptedException | MemcachedException |TimeoutException e){
throw new AppCacheException(e);
}
}
private <K> String translateToStr(K key) {
if(key instanceof String){
return (String)key;
}
return key.toString();
}
public void setClient(MemcachedClient client) {
this.client = client;
}
public void setOption(Option option) {
this.option = option;
}
}
Spring配置文件(spring-memcached.xml)
<context:property-placeholder location="memcached.properties"/>
<bean
id="memcachedClientBuilder"
class="net.rubyeye.xmemcached.XMemcachedClientBuilder"
p:connectionPoolSize="${memcached.connectionPoolSize}"
p:failureMode="${memcached.failureMode}">
<!-- XMemcachedClientBuilder have two arguments.First is server list,and
second is weights array. -->
<constructor-arg>
<list>
<bean class="java.net.InetSocketAddress">
<constructor-arg>
<value>${memcached.server1.host}</value>
</constructor-arg>
<constructor-arg>
<value>${memcached.server1.port}</value>
</constructor-arg>
</bean>
</list>
</constructor-arg>
<constructor-arg>
<list>
<value>${memcached.server1.weight}</value>
</list>
</constructor-arg>
<property name="commandFactory">
<bean class="net.rubyeye.xmemcached.command.TextCommandFactory"/>
</property>
<property name="sessionLocator">
<bean class="net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator"/>
</property>
<property name="transcoder">
<bean class="net.rubyeye.xmemcached.transcoders.SerializingTranscoder"/>
</property>
</bean>
<!-- Use factory bean to build memcached client -->
<bean
id="memcachedClient"
factory-bean="memcachedClientBuilder"
factory-method="build"
destroy-method="shutdown"/>
<bean id="xmemAppCache" class="org.wit.ff.cache.impl.XMemAppCache" >
<property name="client" ref="memcachedClient" />
</bean>
memcached.properties
#連接池大小即客戶端個數
memcached.connectionPoolSize=3
memcached.failureMode=true
#server1
memcached.server1.host=xxxx
memcached.server1.port=21212
memcached.server1.weight=1
測試#
示例測試代碼:
package org.wit.ff.cache;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import tmodel.User;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
/**
* Created by F.Fang on 2015/10/19.
* Version :2015/10/19
*/
@ContextConfiguration("classpath:spring-redis.xml")
public class AppCacheTest extends AbstractJUnit4SpringContextTests {
@Autowired
private IAppCache appCache;
@Test
public void demo() throws Exception{
User user = new User(1, "ff", "ff@adchina.com");
appCache.put("ff", user);
TimeUnit.SECONDS.sleep(3);
User result = appCache.get("ff",User.class);
assertEquals(user, result);
}
}
小結&展望#
注:Redis支持支持集合(list,map)存儲結構,Memecached則不支持,因此可以考慮在基於Memcached緩存訪問API實現中的putList(...)方法直接拋出UnsupportedOperationException異常
- 支持集合操作(目前Redis版本實際已經實現)
- 支持更簡易的配置
- 補充對MongoDB的支持
QA##