轉載自:https://www.cnblogs.com/lanbo203/p/7494587.html
解決方案
一、對強一致要求比較高的,應采用實時同步方案,即查詢緩存查詢不到再從DB查詢,保存到緩存;更新緩存時,先更新數據庫,再將緩存的設置過期(建議不要去更新緩存內容,直接設置緩存過期)。
二、對於並發程度較高的,可采用異步隊列的方式同步,可采用kafka等消息中間件處理消息生產和消費。
三、使用阿里的同步工具canal,canal實現方式是模擬mysql slave和master的同步機制,監控DB bitlog的日志更新來觸發緩存的更新,此種方法可以解放程序員雙手,減少工作量,但在使用時有些局限性。
四、采用UDF自定義函數的方式,面對mysql的API進行編程,利用觸發器進行緩存同步,但UDF主要是c/c++語言實現,學習成本高。
實時同步
spring3+提供了注解的方式進行緩存編程
@Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
@CachePut(key = "caches[0].name + T(String).valueOf(#user.userId)")
@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" )
@Caching(evict = {@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" ),
@CacheEvict(key = "caches[0].name + #result.name" )})
@Cacheable:查詢時使用,注意Long類型需轉換為Sting類型,否則會拋異常
@CachePut:更新時使用,使用此注解,一定會從DB上查詢數據
@CacheEvict:刪除時使用;
@Caching:組合用法 具體注解的使用可參考官網
注意:注解方式雖然能使我們的代碼簡潔,但是注解方式有局限性:對key的獲取,以及嵌套使用時注解無效,如下所示
public class User {
private Long userId;
private String name;
private Integer age;
private String sex;
private String addr;
//get set .....
}
service接口
|
1
2
3
4
5
6
7
|
public
interface
UserService {
User getUser(Long userId);
User updateUser(User user);
User getUserByName(String name);
int
insertUser(User user);
User delete (Long userId);
}<br>
//實現類<br>//假設有需求是由name查詢user的,一般我們是先由name->id,再由id->user,這樣會減少redis緩存的冗余信息
|
@Service(value = "userSerivceImpl")
@CacheConfig(cacheNames = "user")
public class UserServiceImpl implements UserService {
private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
@Autowired
UserMapper userMapper;
@Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
public User getUser(Long userId) {
User user = userMapper.selectByPrimaryKey(userId);
return user;
}
@Cacheable(key = "caches[0].name + #name")
public String getIdByName(String name){
Long userId = userMapper.getIdByName(name);
return String.valueOf(userId);
}
//使用getUserByName方式調用getIdByName 和getUser方法來實現查詢,但是如果用此方式在controller中直接調用
//getUserByName方法,緩存效果是不起作用的,必須是直接調用getIdByName和getUser方法才能起作用
public User getUserByName(String name) {
//通過name 查詢到主鍵 再由主鍵查詢實體
return getUser(Long.valueOf(getIdByName(name)));
}
非注解方式實現
1.先定義一個RedisCacheConfig類用於生成RedisTemplate和對CacheManager的管理
@Configuration
public class RedisCacheConfig extends CachingConfigurerSupport {
/*定義緩存數據 key 生成策略的bean
*包名+類名+方法名+所有參數
*/
@Bean
public KeyGenerator keyGenerator() {
return new KeyGenerator() {
@Override
public Object generate(Object target, Method method, Object... params) {
StringBuilder sb = new StringBuilder();
sb.append(target.getClass().getName());
sb.append(method.getName());
for (Object obj : params) {
sb.append(obj.toString());
}
return sb.toString();
}
};
}
//@Bean
public CacheManager cacheManager(
@SuppressWarnings("rawtypes") RedisTemplate redisTemplate) {
//RedisCacheManager cacheManager = new RedisCacheManager(redisTemplate);
//cacheManager.setDefaultExpiration(60);//設置緩存保留時間(seconds)
return cacheManager;
}
//1.項目啟動時此方法先被注冊成bean被spring管理
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
//使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值
Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);
System.out.println("==============obj:"+Object.class.getName());
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(mapper);
template.setValueSerializer(serializer);
//使用StringRedisSerializer來序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
}
2.定義一個redisUtil類用於存取緩存值
@Component
public class RedisCacheUtil {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 存儲字符串
* @param key string類型的key
* @param value String類型的value
*/
public void set(String key, String value) {
stringRedisTemplate.opsForValue().set(key, value);
}
/**
* 存儲對象
* @param key String類型的key
* @param value Object類型的value
*/
public void set(String key, Object value) {
redisTemplate.opsForValue().set(key, value);
}
/**
* 存儲對象
* @param key String類型的key
* @param value Object類型的value
*/
public void set(String key, Object value,Long timeOut) {
redisTemplate.opsForValue().set(key, value,timeOut, TimeUnit.SECONDS);
}
/**
* 根據key獲取字符串數據
* @param key
* @return
*/
public String getValue(String key) {
return stringRedisTemplate.opsForValue().get(key);
}
// public Object getValue(String key) {
// return redisTemplate.opsForValue().get(key);
// }
/**
* 根據key獲取對象
* @param key
* @return
*/
public Object getValueOfObject(String key) {
return redisTemplate.opsForValue().get(key);
}
/**
* 根據key刪除緩存信息
* @param key
*/
public void delete(String key) {
redisTemplate.delete(key);
}
/**
* 查詢key是否存在
* @param key
* @return
*/
@SuppressWarnings("unchecked")
public boolean exists(String key) {
return redisTemplate.hasKey(key);
}
}
3.實現類
/**
* Created by yexin on 2017/9/8.
*
* 在Impl基礎上+ 防止緩存雪崩和緩存穿透功能
*/
@Service(value = "userServiceImpl4")
public class UserServiceImpl4 implements UserService {
@Autowired
UserMapper userMapper;
@Autowired
RedisCacheUtil redisCacheUtil;
@Value("${timeOut}")
private long timeOut;
@Override
public User getUser(Long userId) {
String key = "user" + userId;
User user = (User) redisCacheUtil.getValueOfObject(key);
String keySign = key + "_sign";
String valueSign = redisCacheUtil.getValue(keySign);
if(user == null){//防止第一次查詢時返回時空結果
//防止緩存穿透
if(redisCacheUtil.exists(key)){
return null;
}
user = userMapper.selectByPrimaryKey(userId);
redisCacheUtil.set(key,user);
redisCacheUtil.set(keySign,"1",timeOut *(new Random().nextInt(10) + 1));
// redisCacheUtil.set(keySign,"1",0L); //過期時間不能設置為0,必須比0大的數
return user;
}
if(valueSign != null){
return user;
}else {
//設置標記的實效時間
Long tt = timeOut * (new Random().nextInt(10) + 1);
System.out.println("tt:"+tt);
redisCacheUtil.set(keySign,"1",tt);
//異步處理緩存更新 應對與高並發的情況,會產生臟讀的情況
ThreadPoolUtil.getExecutorService().execute(new Runnable(){
public void run() { //
System.out.println("-----執行異步操作-----");
User user1 = userMapper.selectByPrimaryKey(userId);
redisCacheUtil.set(key,user1);
}
});
// new Thread(){
// public void run() { //應對與高並發的情況,會產生臟讀的情況
// System.out.println("-----執行異步操作-----");
// User user1 = userMapper.selectByPrimaryKey(userId);
// redisCacheUtil.set(key,user1);
// }
// }.start();
}
return user;
}
}
異步實現
異步實現通過kafka作為消息隊列實現,異步只針對更新操作,查詢無需異步,實現類如下
1.pom文件需依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2.生產着代碼
@EnableBinding(Source.class)
public class SendService {
@Autowired
private Source source;
public void sendMessage(String msg) {
try{
source.output().send(MessageBuilder.withPayload(msg).build());
} catch (Exception e) {
e.printStackTrace();
}
}
//接受的是一個實體類,具體配置在application.yml
public void sendMessage(TransMsg msg) {
try {
//MessageBuilder.withPayload(msg).setHeader(KafkaHeaders.TOPIC,"111111").build();
source.output().send(MessageBuilder.withPayload(msg).build());
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.消費者代碼
@EnableBinding(Sink.class)
public class MsgSink {
@Resource(name = "userSerivceImpl3")
UserService userService;
@StreamListener(Sink.INPUT)
public void process(TransMsg<?> msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException {
System.out.println("sink......"+msg);
System.out.println("opt db strat ----");
userService.updateUser((User) msg.getParams());
System.out.println("執行db結束------");
}
}
4.application.yml配置
spring:
application:
name: demo-provider
redis:
database: 0
host: 192.168.252.128
#host: localhost
port: 6379
password:
pool:
max-active: 50
max-wait: -1
max-idle: 50
timeout: 0
#kafka
cloud:
stream:
kafka:
binder:
brokers: 192.168.252.128:9092
zk-nodes: 192.168.252.128:2181
minPartitionCount: 1
autoCreateTopics: true
autoAddPartitions: true
bindings:
input:
destination: topic-02
# content-type: application/json
content-type: application/x-java-object #此種類型配置在消費端接受到的為一個實體類
group: t1
consumer:
concurrency: 1
partitioned: false
output:
destination: topic-02
content-type: application/x-java-object
producer:
partitionCount: 1
instance-count: 1
instance-index: 0
5.實現類
@Service(value = "userServiceImpl2")
public class UserServiceImpl2 implements UserService{
@Autowired
UserMapper userMapper;
@Autowired
RedisCacheUtil redisCacheUtil;
private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
@Autowired
SendService sendService;
public User updateUser(User user) {
System.out.println(" impl2 active ");
String key = "user"+ user.getUserId();
System.out.println("key:"+key);
//是否存在key
if(!redisCacheUtil.exists(key)){
return userMapper.updateByPrimaryKeySelective(user) == 1 ? user : null;
}
/* 更新key對應的value
更新隊列
*/
User user1 = (User)redisCacheUtil.getValueOfObject(key);
try {
redisCacheUtil.set(key,user);
TransMsg<User> msg = new TransMsg<User>(key,user,this.getClass().getName(),"updateUser",user);
sendService.sendMessage(msg);
}catch (Exception e){
redisCacheUtil.set(key,user1);
}
return user;
}
}
注意:kafka與zookeeper的配置在此不介紹
canal實現方式
先要安裝canal,配置canal的example文件等,配置暫不介紹
package org.example.canal;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import org.example.canal.util.RedisUtil;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClient {
public static void main(String[] args) {
// 創建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認
// connector.rollback(batchId); // 處理失敗, 回滾數據
}
} finally {
connector.disconnect();
}
}
private static void printEntry( List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
System.out.println("tablename:"+entry.getHeaderOrBuilder().getTableName());
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn( List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
private static void redisInsert( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
}
}
private static void redisUpdate( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
}
}
private static void redisDelete( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.delKey("user:"+ columns.get(0).getValue());
}
}
}
package org.example.canal.util;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtil {
// Redis服務器IP
private static String ADDR = "192.168.252.128";
// Redis的端口號
private static int PORT = 6379;
// 訪問密碼
//private static String AUTH = "admin";
// 可用連接實例的最大數目,默認值為8;
// 如果賦值為-1,則表示不限制;如果pool已經分配了maxActive個jedis實例,則此時pool的狀態為exhausted(耗盡)。
private static int MAX_ACTIVE = 1024;
// 控制一個pool最多有多少個狀態為idle(空閑的)的jedis實例,默認值也是8。
private static int MAX_IDLE = 200;
// 等待可用連接的最大時間,單位毫秒,默認值為-1,表示永不超時。如果超過等待時間,則直接拋出JedisConnectionException;
private static int MAX_WAIT = 10000;
// 過期時間
protected static int expireTime = 60 * 60 *24;
// 連接池
protected static JedisPool pool;
static {
JedisPoolConfig config = new JedisPoolConfig();
//最大連接數
config.setMaxTotal(MAX_ACTIVE);
//最多空閑實例
config.setMaxIdle(MAX_IDLE);
//超時時間
config.setMaxWaitMillis(MAX_WAIT);
//
config.setTestOnBorrow(false);
pool = new JedisPool(config, ADDR, PORT, 1000);
}
/**
* 獲取jedis實例
*/
protected static synchronized Jedis getJedis() {
Jedis jedis = null;
try {
jedis = pool.getResource();
} catch (Exception e) {
e.printStackTrace();
if (jedis != null) {
pool.returnBrokenResource(jedis);
}
}
return jedis;
}
/**
* 釋放jedis資源
* @param jedis
* @param isBroken
*/
protected static void closeResource(Jedis jedis, boolean isBroken) {
try {
if (isBroken) {
pool.returnBrokenResource(jedis);
} else {
pool.returnResource(jedis);
}
} catch (Exception e) {
}
}
/**
* 是否存在key
* @param key
*/
public static boolean existKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
return jedis.exists(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return false;
}
/**
* 刪除key
* @param key
*/
public static void delKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
jedis.del(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
/**
* 取得key的值
* @param key
*/
public static String stringGet(String key) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.get(key);
jedis.expire(key, expireTime);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加string數據
* @param key
* @param value
*/
public static String stringSet(String key, String value) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.set(key, value);
jedis.expire(key, expireTime);
} catch (Exception e) {
e.printStackTrace();
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加hash數據
* @param key
* @param field
* @param value
*/
public static void hashSet(String key, String field, String value) {
boolean isBroken = false;
Jedis jedis = null;
try {
jedis = getJedis();
if (jedis != null) {
jedis.select(0);
jedis.hset(key, field, value);
jedis.expire(key, expireTime);
}
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
}
附redis關於緩存雪崩和緩存穿透,熱點key
穿透
穿透:頻繁查詢一個不存在的數據,由於緩存不命中,每次都要查詢持久層。從而失去緩存的意義。
解決辦法: 持久層查詢不到就緩存空結果,查詢時先判斷緩存中是否exists(key) ,如果有直接返回空,沒有則查詢后返回,
注意insert時需清除查詢的key,否則即便DB中有值也查詢不到(當然也可以設置空緩存的過期時間)
雪崩
雪崩:緩存大量失效的時候,引發大量查詢數據庫。
解決辦法:①用鎖/分布式鎖或者隊列串行訪問
②緩存失效時間均勻分布
熱點key
熱點key:某個key訪問非常頻繁,當key失效的時候有打量線程來構建緩存,導致負載增加,系統崩潰。
解決辦法:
①使用鎖,單機用synchronized,lock等,分布式用分布式鎖。
②緩存過期時間不設置,而是設置在key對應的value里。如果檢測到存的時間超過過期時間則異步更新緩存。
③在value設置一個比過期時間t0小的過期時間值t1,當t1過期的時候,延長t1並做更新緩存操作。
4設置標簽緩存,標簽緩存設置過期時間,標簽緩存過期后,需異步地更新實際緩存 具體參照userServiceImpl4的處理方式
總結
一、查詢redis緩存時,一般查詢如果以非id方式查詢,建議先由條件查詢到id,再由id查詢pojo
二、異步kafka在消費端接受信息后,該怎么識別處理那張表,調用哪個方法,此問題暫時還沒解決
三、比較簡單的redis緩存,推薦使用canal
參考文檔
http://blog.csdn.net/fly_time2012/article/details/50751316
http://blog.csdn.net/kkgbn/article/details/60576477
http://www.cnblogs.com/fidelQuan/p/4543387.html

