一、Redis使用有哪些常見問題
在我們已經有了Jedis客戶端、集群模式支持后,Redis基本使用已經沒有大的問題了。關於Jedis以及集群模式請參考博文:
但是這樣依舊有很多缺陷,比如:
- 動態擴容不方便,擴容需要重啟才能生效
- 對於集群配置,有客戶端代碼侵入
- 集群中沒有業務進行隔離,單業務沖高可能影響其余業務運行
- 沒有監控,無法提前預估風險
- 不能解決緩存穿透以及緩存雪崩問題
二、Redis解決方案項目簡述
在上面的問題中,有一個開源項目有很好的參考作用,項目地址:https://gitee.com/ym-monkey/flasher
1、這個主要解決了什么
- 1、基於Jedis Cluster開發的客戶端支持Redis Cluster集群。
- 2、對被調用方(客戶端)侵入極少,上手極快。
- 3、支持動態增加節點,客戶端自動感知。(zookeeper)
- 4、支持客戶端驗證與攔截。 (token)
- 5、異步監控調用數據,支持異步上報。
- 6、方便管理有效的區分業務系統。會員(memmber) 商品(goods)
- 7、支持Falcon協議. 監控系統。(Open-Falcon小米開源項目:監控展示)
- 8、國內一線互聯網公司上線項目
2、項目結構是怎樣的
結構圖如下:
3、項目目錄結構及入口
包結構如下:
- Enums:枚舉類型
- Impl:實現功能,具體實現了rediscluster和redis的命令
- Jedis:封裝jedis操作
- Monitor:方法攔截、數據存儲、數據上傳
- Spring:對redis集群實列管理
- Utils:工具類
- Zookeeper:動態感應集群變動
關鍵入口如下:
- 使用調用:IRedis>RedisImpl>RedisClusterImpl(業務隔離)
- 收集數據:bat.ke.qq.com.flasher.monitor.interceptor.MonitorInterceptor(監控數據整理)、bat.ke.qq.com.flasher.monitor.job.MonitorJob(監控數據上報)
- 定時任務(上報內容):MonitorQuartzJob>ScheduledExecutor>ScheduledQuartz(監控定時上報管理)
- 高可用集群管理:bat.ke.qq.com.flasher.spring.RedisClusterConnectionVHFactory
三、部分代碼思想詳解
1、業務隔離
在這個項目里是實現了業務隔離的,其隔離方式就是對Key進行格式化,如 com.tl.flasher.impl.RedisClusterImpl 以及 com.tl.flasher.impl.RedisImpl 這兩個基於集群、非集群的二次包裝的Jedis API:
@Override public String hget(String business, String key, String field) { return getJedisCluster().hget(RedisUtil.buildKey(getBusiness(business), key), field); } @Override public Map<String, String> hgetAll(String business, String key) { return getJedisCluster().hgetAll(RedisUtil.buildKey(getBusiness(business), key)); }
其ID都進行了二次build,其二次build實現如下:
public static String buildKey(String business,String key){ return new StringBuilder(business).append(Constants.DEFAULT_SEPARATOR).append(key).toString(); } public static String buildKeys(String business,String ... keys){ StringBuilder sb = new StringBuilder(business).append(Constants.DEFAULT_SEPARATOR); for(String key : keys){ sb.append(key); } return sb.toString(); } public static byte[] buildKey(String business,byte[] key){ byte[] busi = (business + Constants.DEFAULT_SEPARATOR).getBytes(); byte[] bytes = new byte[busi.length + key.length]; System.arraycopy(busi, 0, bytes, 0, busi.length); System.arraycopy(key, 0, bytes, busi.length, key.length); return bytes; }
因此就實現了業務簡單隔離。
2、高可用集群動態管理
這一部分就用到了zookeeper,以及InitializingBean。首先看加載bean的InitializingBean,如代碼其獲取集群或者非集群Redis客戶端的時候繼承了InitializingBean:
public interface IRedisClusterConnection extends InitializingBean, DisposableBean { JedisCluster getJedisCluster(); String getBusiness(); }
在具體的實現類,提供了三個方法,分別是首次初始化 afterPropertiesSet方法,二次更新的refresh方法以及銷毀的destory方法:
@Override public void afterPropertiesSet() throws Exception { //繼承自InitializingBean Assert.hasText(hostPort); Assert.hasText(business); hostGroups = new GedisGroups(hostPort,getBusiness()); hostGroups.addChangeListner(new DataChangeListener()); List<String> hostPs = hostGroups.getValues(); Set<HostAndPort> hostAndPorts = Sets.newHashSet(); for(String s : hostPs){ String[] ss = s.split(":"); hostAndPorts.add(new HostAndPort(ss[0], Integer.valueOf(ss[1]))); } if(null == jedisCluster){ if(null == jedisPoolConfig){ jedisPoolConfig = new JedisPoolConfig(); } jedisCluster = new JedisCluster(hostAndPorts,timeout,maxRedirections,jedisPoolConfig); } LOGGER.info("RedisClusterConnectionVHFactory is running!"); } private void refresh(){ //refresh:刷新配置 List<String> hostPs = hostGroups.getValues(); Set<HostAndPort> hostAndPorts = Sets.newHashSet(); for(String s : hostPs){ String[] ss = s.split(":"); hostAndPorts.add(new HostAndPort(ss[0], Integer.valueOf(ss[1]))); } JedisCluster newjedisCluster = new JedisCluster(hostAndPorts,timeout,maxRedirections,jedisPoolConfig); jedisCluster = newjedisCluster; LOGGER.info("RedisClusterConnectionVHFactory.refresh() running!"); } @Override public void destroy() throws Exception { //繼承自DisposableBean if(null != jedisCluster){ jedisCluster.close(); } jedisCluster = null; LOGGER.info("RedisClusterConnectionVHFactory.destroy() is running!"); }
那么刷新配置在什么時候用呢?
private class DataChangeListener implements ZkListener{ @Override public void dataEvent(WatchedEvent event) { // TODO Auto-generated method stub if(event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeDataChanged){ refresh(); } } }
這個監聽是在初始化的時候就添加了的,當zookeeper檢測到節點變動時,就會自動更新集群信息,實現高可用。然后zookeeper的管理就在目錄zookeeper下,需要自行查看。
3、監控數據整理與上報
ScheduledService接口定義了兩個方法:startJob和shutdown。
public interface ScheduledService { void startJob(Map<String,Object> context,int intervalInSeconds); void shutdown(); }
然后具體實現如下:
public class ScheduledExecutor implements ScheduledService { private ScheduledExecutorService service = Executors.newScheduledThreadPool(2); @Override public void startJob(Map<String,Object> context,int intervalInSeconds) { MonitorExecutorJob job = new MonitorExecutorJob(context); service.scheduleAtFixedRate(job,0,intervalInSeconds,TimeUnit.SECONDS);//start run task } @Override public void shutdown() { if(null != service){ service.shutdown(); } service = null; } }
然后會調用這個定時的執行的線程池就會調用方法執行:
public class MonitorJob { private static final Logger LOGGER = LoggerFactory.getLogger(MonitorJob.class); public static void doJob(Map<String,Object> map){ MonitorPushTypeEnum mPushType = (MonitorPushTypeEnum) map.get(Constants.MONITOR_PUSH_TYPE_NAME); AbstractProtocol protocol = (AbstractProtocol) map.get(Constants.MONITOR_PROTOCOL_NAME); String host = (String) map.get(Constants.MONITOR_HOST_NAME); Integer port = (Integer) map.get(Constants.MONITOR_PORT_NAME); List<Serializable> datas = null; if(null != protocol && null != (datas = AbstractProtocol.buildLocalData(protocol)) && !datas.isEmpty()){ String result = null; try { Gson gson = new Gson(); switch (mPushType){ case HTTP_ASYN: result = HttpUtil.doPostAsyn(host,gson.toJson(datas)); //HTTP異步 break; case HTTP_SYNC: result = HttpUtil.doPostSync(host, gson.toJson(datas)); //HTTP同步 break; default: } } catch (Exception e){ e.printStackTrace(); LOGGER.error(e.getMessage()); } LOGGER.info("post "+ host + port +",result is " +result); } // 清除本地內存數據 AbstractProtocol.clearLocalData(); } }
那么觸發數據記錄的地方是哪兒呢:
<bean id="monitorInterceptor" class="bat.ke.qq.com.flasher.monitor.interceptor.MonitorInterceptor" /> <bean id="autoProxyCreator" class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator"> <!-- 設置目標對象 --> <property name="beanNames"> <list> <value>redisCluster</value> </list> </property> <!-- 代理對象所使用的攔截器 --> <property name="interceptorNames"> <list> <value>monitorInterceptor</value> </list> </property> </bean>
這個攔截器就進行了監控數據的准備:
public class MonitorInterceptor implements MethodInterceptor { @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable { //調用目標方法之前執行的動作 // System.out.println("調用方法之前: invocation對象:[" + methodInvocation + "]"); long beginTime = System.currentTimeMillis(); //調用目標方法 Object rval = methodInvocation.proceed(); long endTime = System.currentTimeMillis(); StringBuilder methodFullName = new StringBuilder("["); methodFullName.append(methodInvocation.getMethod()); methodFullName.append("]"); String usedNumName = methodFullName + Constants.DEFAULT_SEPARATOR + Constants.MONITOR_GEDIS_USED_NUM_NAME; String usedTimeName = methodFullName + Constants.DEFAULT_SEPARATOR + Constants.MONITOR_GEDIS_USED_TIME_NAME; Integer usedNum = Constants.MONITOR_MAP.get(usedNumName); if(usedNum == null){ usedNum = 0; } usedNum +=1; //調用計數 Integer usedTime = Constants.MONITOR_MAP.get(usedTimeName); if(usedTime == null){ usedTime = 0; } usedTime += Long.bitCount(endTime - beginTime); //調用計時 Constants.MONITOR_MAP.put(usedNumName, usedNum); Constants.MONITOR_MAP.put(usedTimeName, usedTime); // System.out.println("調用方法: invocation對象:[" + methodInvocation.getMethod() + "]" // + " Run time is " + usedTime +" ms"); // System.out.println(Constants.MONITOR_MAP); //調用目標方法之后執行的動作 // System.out.println("調用結束..."); return rval; } }
以上就是監控流程,如果對接Open-Falcon開源項目工具,就可以實現圖形化動態展示監控。
四、緩存雪崩與緩存穿透
關於緩存雪崩和緩存穿透,是這樣兩種情況:
- 緩存擊穿:請求一些故意系統沒有緩存的數據,於是只能走數據庫查詢,數據庫承受不了掛了。
- 緩存雪崩:緩存都是有超時時間的,如果大量緩存同時失效,那么會在訪問時大量數據只能走數據庫查詢再次緩存,數據庫承受不了掛了。
關於解決方案:
- 事先:保證redis高可用,進行redis cluster部署,防止redis掛掉。
- 發生:本地cache+hystrix限流保證防止大量惡意訪問,避免數據庫mysql掛掉。
- 事后:Redis持久化,緩存掛掉可以快速恢復。
同時緩存還可以采用布隆過濾器,這樣能提高緩存訪問性能,提高未緩存數據的訪問性能。
關於第四部分內容后續會進行項目實戰補充,到時再進行詳細說明。