Redis學習之Redis集群模式缺陷及其處理


一、Redis使用有哪些常見問題

  在我們已經有了Jedis客戶端、集群模式支持后,Redis基本使用已經沒有大的問題了。關於Jedis以及集群模式請參考博文:

  但是這樣依舊有很多缺陷,比如:

  • 動態擴容不方便,擴容需要重啟才能生效
  • 對於集群配置,有客戶端代碼侵入
  • 集群中沒有業務進行隔離,單業務沖高可能影響其余業務運行
  • 沒有監控,無法提前預估風險
  • 不能解決緩存穿透以及緩存雪崩問題

          

二、Redis解決方案項目簡述

  在上面的問題中,有一個開源項目有很好的參考作用,項目地址:https://gitee.com/ym-monkey/flasher

  1、這個主要解決了什么

  • 1、基於Jedis Cluster開發的客戶端支持Redis Cluster集群。
  • 2、對被調用方(客戶端)侵入極少,上手極快。
  • 3、支持動態增加節點,客戶端自動感知。(zookeeper)
  • 4、支持客戶端驗證與攔截。 (token)
  • 5、異步監控調用數據,支持異步上報。
  • 6、方便管理有效的區分業務系統。會員(memmber) 商品(goods)
  • 7、支持Falcon協議. 監控系統。(Open-Falcon小米開源項目:監控展示)
  • 8、國內一線互聯網公司上線項目

   2、項目結構是怎樣的

  結構圖如下:

            

  3、項目目錄結構及入口

  包結構如下:

  • Enums:枚舉類型
  • Impl:實現功能,具體實現了rediscluster和redis的命令
  • Jedis:封裝jedis操作
  • Monitor:方法攔截、數據存儲、數據上傳
  • Spring:對redis集群實列管理
  • Utils:工具類
  • Zookeeper:動態感應集群變動

  關鍵入口如下:

  • 使用調用:IRedis>RedisImpl>RedisClusterImpl(業務隔離)
  • 收集數據:bat.ke.qq.com.flasher.monitor.interceptor.MonitorInterceptor(監控數據整理)、bat.ke.qq.com.flasher.monitor.job.MonitorJob(監控數據上報)
  • 定時任務(上報內容):MonitorQuartzJob>ScheduledExecutor>ScheduledQuartz(監控定時上報管理)
  • 高可用集群管理:bat.ke.qq.com.flasher.spring.RedisClusterConnectionVHFactory

三、部分代碼思想詳解

  1、業務隔離

  在這個項目里是實現了業務隔離的,其隔離方式就是對Key進行格式化,如 com.tl.flasher.impl.RedisClusterImpl 以及 com.tl.flasher.impl.RedisImpl 這兩個基於集群、非集群的二次包裝的Jedis API:

    @Override
    public String hget(String business, String key, String field) {
        return getJedisCluster().hget(RedisUtil.buildKey(getBusiness(business), key), field);
    }

    @Override
    public Map<String, String> hgetAll(String business, String key) {
        return getJedisCluster().hgetAll(RedisUtil.buildKey(getBusiness(business), key));
    }

  其ID都進行了二次build,其二次build實現如下:

    public static String buildKey(String business,String key){
        return new StringBuilder(business).append(Constants.DEFAULT_SEPARATOR).append(key).toString();
    }

    public static String buildKeys(String business,String ... keys){
        StringBuilder sb = new StringBuilder(business).append(Constants.DEFAULT_SEPARATOR);
        for(String key : keys){
            sb.append(key);
        }
        return sb.toString();
    }

    public static byte[] buildKey(String business,byte[] key){
        byte[] busi = (business + Constants.DEFAULT_SEPARATOR).getBytes();
        byte[] bytes = new byte[busi.length + key.length];
        System.arraycopy(busi, 0, bytes, 0, busi.length);
        System.arraycopy(key, 0, bytes, busi.length, key.length);
        return bytes;
    }

  因此就實現了業務簡單隔離。

  2、高可用集群動態管理

  這一部分就用到了zookeeper,以及InitializingBean。首先看加載bean的InitializingBean,如代碼其獲取集群或者非集群Redis客戶端的時候繼承了InitializingBean:

public interface IRedisClusterConnection extends InitializingBean, DisposableBean {
    JedisCluster getJedisCluster();
    String getBusiness();
}

  在具體的實現類,提供了三個方法,分別是首次初始化 afterPropertiesSet方法,二次更新的refresh方法以及銷毀的destory方法:

  @Override
    public void afterPropertiesSet() throws Exception {      //繼承自InitializingBean
       Assert.hasText(hostPort);
       Assert.hasText(business);
       
       hostGroups = new GedisGroups(hostPort,getBusiness());
       hostGroups.addChangeListner(new DataChangeListener());        
       List<String> hostPs = hostGroups.getValues();

       Set<HostAndPort> hostAndPorts = Sets.newHashSet();
       for(String s : hostPs){
           String[] ss = s.split(":");
           hostAndPorts.add(new HostAndPort(ss[0], Integer.valueOf(ss[1])));
       }

        if(null == jedisCluster){
            if(null == jedisPoolConfig){
                jedisPoolConfig = new JedisPoolConfig();
            }
            jedisCluster = new JedisCluster(hostAndPorts,timeout,maxRedirections,jedisPoolConfig);
        }

        LOGGER.info("RedisClusterConnectionVHFactory is running!");
    }

  private void refresh(){              //refresh:刷新配置
        List<String> hostPs = hostGroups.getValues();
        Set<HostAndPort> hostAndPorts = Sets.newHashSet();
        for(String s : hostPs){
            String[] ss = s.split(":");
            hostAndPorts.add(new HostAndPort(ss[0], Integer.valueOf(ss[1])));
        }

        JedisCluster newjedisCluster = new JedisCluster(hostAndPorts,timeout,maxRedirections,jedisPoolConfig);
        jedisCluster = newjedisCluster;

        LOGGER.info("RedisClusterConnectionVHFactory.refresh() running!");
    }

    @Override
    public void destroy() throws Exception {      //繼承自DisposableBean if(null != jedisCluster){
            jedisCluster.close();
        }
        jedisCluster = null;

        LOGGER.info("RedisClusterConnectionVHFactory.destroy() is running!");
    }

  那么刷新配置在什么時候用呢?

  private class DataChangeListener implements ZkListener{
        @Override
        public void dataEvent(WatchedEvent event) {
            // TODO Auto-generated method stub
            if(event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeDataChanged){
                refresh();
            }
        }
    }

  這個監聽是在初始化的時候就添加了的,當zookeeper檢測到節點變動時,就會自動更新集群信息,實現高可用。然后zookeeper的管理就在目錄zookeeper下,需要自行查看。

  3、監控數據整理與上報

  ScheduledService接口定義了兩個方法:startJob和shutdown。

public interface ScheduledService {
    void startJob(Map<String,Object> context,int intervalInSeconds);
    void shutdown();
}

  然后具體實現如下:

public class ScheduledExecutor implements ScheduledService {
    private ScheduledExecutorService service = Executors.newScheduledThreadPool(2);

    @Override
    public void startJob(Map<String,Object> context,int intervalInSeconds) {
        MonitorExecutorJob job = new MonitorExecutorJob(context);
        service.scheduleAtFixedRate(job,0,intervalInSeconds,TimeUnit.SECONDS);//start run task
    }

    @Override
    public void shutdown() {
        if(null != service){
            service.shutdown();
        }
        service = null;
    }
}

  然后會調用這個定時的執行的線程池就會調用方法執行:

public class MonitorJob {
    private static final Logger LOGGER = LoggerFactory.getLogger(MonitorJob.class);

    public static void doJob(Map<String,Object> map){
        MonitorPushTypeEnum mPushType = (MonitorPushTypeEnum) map.get(Constants.MONITOR_PUSH_TYPE_NAME);
        AbstractProtocol protocol = (AbstractProtocol) map.get(Constants.MONITOR_PROTOCOL_NAME);
        String host = (String) map.get(Constants.MONITOR_HOST_NAME);
        Integer port = (Integer) map.get(Constants.MONITOR_PORT_NAME);
        List<Serializable> datas = null;
        if(null != protocol && null != (datas = AbstractProtocol.buildLocalData(protocol)) && !datas.isEmpty()){
            String result = null;
            try {
                Gson gson = new Gson();
                switch (mPushType){
                    case HTTP_ASYN:
                        result = HttpUtil.doPostAsyn(host,gson.toJson(datas));    //HTTP異步 break;
                    case HTTP_SYNC:
                        result = HttpUtil.doPostSync(host, gson.toJson(datas));    //HTTP同步 break;
                    default:

                }
            } catch (Exception e){
                e.printStackTrace();
                LOGGER.error(e.getMessage());
            }
            LOGGER.info("post "+ host + port +",result is " +result);
        }
        // 清除本地內存數據
        AbstractProtocol.clearLocalData();
    }

}

  那么觸發數據記錄的地方是哪兒呢:

    <bean id="monitorInterceptor" class="bat.ke.qq.com.flasher.monitor.interceptor.MonitorInterceptor" />
    <bean id="autoProxyCreator" class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator">
        <!-- 設置目標對象 -->
        <property name="beanNames">
            <list>
                <value>redisCluster</value>
            </list>
        </property>
        <!-- 代理對象所使用的攔截器 -->
        <property name="interceptorNames">
            <list>
                <value>monitorInterceptor</value>
            </list>
        </property>
    </bean>

  這個攔截器就進行了監控數據的准備:

public class MonitorInterceptor implements MethodInterceptor {
    @Override
    public Object invoke(MethodInvocation methodInvocation) throws Throwable {
        //調用目標方法之前執行的動作
//        System.out.println("調用方法之前: invocation對象:[" + methodInvocation + "]");
        long beginTime = System.currentTimeMillis();
        //調用目標方法
        Object rval = methodInvocation.proceed();
        long endTime = System.currentTimeMillis();
        StringBuilder methodFullName = new StringBuilder("[");
        methodFullName.append(methodInvocation.getMethod());
        methodFullName.append("]");
        String usedNumName = methodFullName + Constants.DEFAULT_SEPARATOR + Constants.MONITOR_GEDIS_USED_NUM_NAME;
        String usedTimeName = methodFullName + Constants.DEFAULT_SEPARATOR + Constants.MONITOR_GEDIS_USED_TIME_NAME;
        Integer usedNum = Constants.MONITOR_MAP.get(usedNumName);
        if(usedNum == null){
            usedNum = 0;
        }
        usedNum +=1;        //調用計數
        Integer usedTime = Constants.MONITOR_MAP.get(usedTimeName);
        if(usedTime == null){
            usedTime = 0;
        }
        usedTime += Long.bitCount(endTime - beginTime);    //調用計時
        Constants.MONITOR_MAP.put(usedNumName, usedNum);
        Constants.MONITOR_MAP.put(usedTimeName, usedTime);
//        System.out.println("調用方法: invocation對象:[" + methodInvocation.getMethod() + "]"
//                + " Run time is " + usedTime +" ms");
//        System.out.println(Constants.MONITOR_MAP);
        //調用目標方法之后執行的動作
//        System.out.println("調用結束...");
        return rval;
    }
}

  以上就是監控流程,如果對接Open-Falcon開源項目工具,就可以實現圖形化動態展示監控。

四、緩存雪崩與緩存穿透

  關於緩存雪崩和緩存穿透,是這樣兩種情況:

  • 緩存擊穿:請求一些故意系統沒有緩存的數據,於是只能走數據庫查詢,數據庫承受不了掛了。
  • 緩存雪崩:緩存都是有超時時間的,如果大量緩存同時失效,那么會在訪問時大量數據只能走數據庫查詢再次緩存,數據庫承受不了掛了。

  關於解決方案:

  • 事先:保證redis高可用,進行redis cluster部署,防止redis掛掉。
  • 發生:本地cache+hystrix限流保證防止大量惡意訪問,避免數據庫mysql掛掉。
  • 事后:Redis持久化,緩存掛掉可以快速恢復。

   同時緩存還可以采用布隆過濾器,這樣能提高緩存訪問性能,提高未緩存數據的訪問性能。

  關於第四部分內容后續會進行項目實戰補充,到時再進行詳細說明。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM