redis讀寫分離及可用性設計


Redis緩存架構設計

對於下面兩個架構圖,有如下想法:

1)redis主從復制模式,為了解決master讀寫壓力,對master進行寫操作,對slave進行讀操作。

2)而在分片集群中,如果對部分分片進行寫,部分分片進行讀,那么會導致寫入后無法get指定key的情況。

3)二級緩存有必要嗎?二級緩存最主要的問題解決存儲介質由磁盤存儲轉變為內存存儲,而redis本身就作為內存數據庫,最主要的只能夠解決網絡問題。

其次多個應用對同一條key-value做修改,是否能保證各個二級緩存數據的一致性呢。

 

主從讀寫分離設計

//定義統一連接接口
public interface IRedisConnection {
    public String connection_name = "";
    /**
     * @return the connection_name
     */
    public String getConnection_name() ;
    /**
     * @param connectionName the connection_name to set
     */
    public void setConnection_name(String connectionName) ;
    /**
     * @param key value
     * @param value
     */
    public void write(String key,String value)throws Exception;
    /**
     * @param key
     * @return
     */
    public Object read(String key)throws Exception;
    
//    public boolean isValiate() ;
}

//讀連接
public class ReadConnection  extends Jedis implements IRedisConnection{
    public String connection_name = "";
    /**
     * @param host
     */
    public ReadConnection(String host,long port) {
        super(host,(int)port);
        // TODO Auto-generated constructor stub
    }
    /**
     * @return the connection_name
     */
    public String getConnection_name() {
        return connection_name;
    }
    /**
     * @param connectionName the connection_name to set
     */
    public void setConnection_name(String connectionName) {
        connection_name = connectionName;
    }
    
    public Object read(String key){
        return this.get(key);
    }
    public void write(String key,String value) throws Exception{
        throw new Exception("未定義的方法");
    }
}

//寫連接
public class WriteConnection  extends Jedis implements IRedisConnection{
    public String connection_name = "";
    /**
     * @param host
     */
    public WriteConnection(String host,long port) {
        super(host,(int)port);
        // TODO Auto-generated constructor stub
    }
    /**
     * @return the connection_name
     */
    public String getConnection_name() {
        return connection_name;
    }
    /**
     * @param connectionName the connection_name to set
     */
    public void setConnection_name(String connectionName) {
        connection_name = connectionName;
    }
    
    public void write(String key,String value){
        this.set(key, value);
    }
    public Object read(String key)throws Exception{
        throw new Exception("未錕斤拷錕斤拷姆錕斤拷錕?");
    }
    
}

//獲取一個指定讀/寫的連接  當然可以,故障檢測 ,故障隔離  ,使用 JedisPool保存在內存中
public static IRedisConnection getConnection(String code,
            int connectionType) throws Exception {
        IRedisBaseSV sv = (IRedisBaseSV) ServiceFactory
                .getService(IRedisBaseSV.class);
        CFG_REDIS_SERVERBean server = sv.getServerFromCode(code);
        if (server == null) {
            log.error("根據編碼:" + code + "不能找到對應的redis服務器資源");
            throw new Exception("根據編碼:" + code + "不能找到對應的redis服務器資源");
        }
        IRedisConnection conn;

        if (connectionType == RedisConstants.WRITE_ONLY_CONNECTION) {
           conn = new WriteConnection(server.getServerIp(), server .getServerPort());
        } else {
            conn = new ReadConnection(server.getServerIp(), server .getServerPort());
        }
        
        conn.setConnection_name(code);
        return conn;
    }

 初始化切片池

/**
   * 初始化切片池
   */
  private static void initialShardedPool() throws Exception {
    long start = System.nanoTime();

    IRedisBaseSV sv = (IRedisBaseSV) ServiceFactory.getService(IRedisBaseSV.class);
    // 初始化JedisPoolConfig
    CFG_REDIS_PARAMETERBean[] para = sv.getRedisConfig("DEFAULT");
    for (int i = 0; i < para.length; i++) {
      if (para[i].getParameterName().equalsIgnoreCase(RedisConstants.REDIS_SERVER_SHARED_MAXACTIVE))
        config.setMaxActive(Integer.parseInt(para[i].getParameterValue()));
      if (para[i].getParameterName().equalsIgnoreCase(RedisConstants.REDIS_SERVER_SHARED_MAXIDLE))
        config.setMaxIdle(Integer.parseInt(para[i].getParameterValue()));
      if (para[i].getParameterName().equalsIgnoreCase(RedisConstants.REDIS_SERVER_SHARED_MAXWAIT))
        config.setMaxWait(Long.parseLong(para[i].getParameterValue()));
      if (para[i].getParameterName().equalsIgnoreCase(
          RedisConstants.REDIS_SERVER_SHARED_TESTONBORROW))
        config.setTestOnBorrow(Boolean.getBoolean(para[i].getParameterValue()));

      if (para[i].getParameterName().equalsIgnoreCase(RedisConstants.REDIS_SERVER_SHARED_NEED_SYN))
        NEED_WRITE_SYNCHRONIZE = Boolean.parseBoolean(para[i].getParameterValue()); // Boolean.getBoolean不對
    }

    CFG_REDIS_SERVERBean[] servers = sv.getRedisServer();
    HashMap map = new HashMap();
    for (int i = 0; i < servers.length; i++) {
      map.put(servers[i].getBelongGroup().toUpperCase(), servers[i].getBelongGroup().toUpperCase());
    }
    serverGroupCodeList = new ArrayList(map.values());
    ShardedJedisPool[] readConnectionPools;
    ShardedJedisPool[] writeConnectionPools;
    ShardedJedisPool[] persistConnectionPools;
    readConnectionPools = new ShardedJedisPool[serverGroupCodeList.size()];
    writeConnectionPools = new ShardedJedisPool[serverGroupCodeList.size()];
    persistConnectionPools = new ShardedJedisPool[serverGroupCodeList.size()];
    for (int i = 0; i < serverGroupCodeList.size(); i++) {
      List<JedisShardInfo> readShards = new ArrayList<JedisShardInfo>();
      List<JedisShardInfo> writeShards = new ArrayList<JedisShardInfo>();
      List<JedisShardInfo> persistShards = new ArrayList<JedisShardInfo>();
      //遍歷所有的redis server實例
      for (int j = 0; j < servers.length; j++) {
        if (servers[j].getBelongGroup()
            .equalsIgnoreCase(String.valueOf(serverGroupCodeList.get(i)))
            && servers[j].getUseType().equalsIgnoreCase(
                String.valueOf(RedisConstants.READ_ONLY_CONNECTION))) {
          // 先測試該連接是否可用
          readServersAll.add(servers[j]);
          Boolean connectionTest =
              TestConnection(servers[j].getServerIp(), (int) servers[j].getServerPort(),
                  decryption(servers[j].getRequirepass()), servers[j].getBelongGroup());
          if (connectionTest) {
            JedisShardInfo jds =
                new JedisShardInfo(servers[j].getServerIp(), (int) servers[j].getServerPort());
            if (null != servers[j].getRequirepass()
                && !"".equals(servers[j].getRequirepass().trim()))
              jds.setPassword(decryption(servers[j].getRequirepass()));
            readShards.add(jds);

          }
        } else if (servers[j].getBelongGroup().equalsIgnoreCase(
            String.valueOf(serverGroupCodeList.get(i)))
            && servers[j].getUseType().equalsIgnoreCase(
                String.valueOf(RedisConstants.WRITE_ONLY_CONNECTION))) {
          Boolean connectionTest =
              TestConnection(servers[j].getServerIp(), (int) servers[j].getServerPort(),
                  decryption(servers[j].getRequirepass()), servers[j].getBelongGroup());
          if (connectionTest) {
            JedisShardInfo jds =
                new JedisShardInfo(servers[j].getServerIp(), (int) servers[j].getServerPort());
            if (null != servers[j].getRequirepass()
                && !"".equals(servers[j].getRequirepass().trim()))
              jds.setPassword(decryption(servers[j].getRequirepass()));
            writeShards.add(jds);
          }
        } else if (servers[j].getBelongGroup().equalsIgnoreCase(
            String.valueOf(serverGroupCodeList.get(i)))
            && servers[j].getUseType().equalsIgnoreCase(
                String.valueOf(RedisConstants.PERSIST_ONLYL_CONNECTION))) {
          Boolean connectionTest =
              TestConnection(servers[j].getServerIp(), (int) servers[j].getServerPort(),
                  decryption(servers[j].getRequirepass()), servers[j].getBelongGroup());
          if (connectionTest) {
            JedisShardInfo jds =
                new JedisShardInfo(servers[j].getServerIp(), (int) servers[j].getServerPort());
            if (null != servers[j].getRequirepass()
                && !"".equals(servers[j].getRequirepass().trim()))
              jds.setPassword(decryption(servers[j].getRequirepass()));
            persistShards.add(jds);
          }
        }
      }

      // 構造池    每個組分別對應三個分片池,可讀分片池,可寫分片池,持久化分片池
      readConnectionPools[i] =
          new ShardedJedisPool(config, readShards, Hashing.MURMUR_HASH,
              Sharded.DEFAULT_KEY_TAG_PATTERN);
      writeConnectionPools[i] =
          new ShardedJedisPool(config, writeShards, Hashing.MURMUR_HASH,
              Sharded.DEFAULT_KEY_TAG_PATTERN);
      persistConnectionPools[i] =
          new ShardedJedisPool(config, persistShards, Hashing.MURMUR_HASH,
              Sharded.DEFAULT_KEY_TAG_PATTERN);
      //按照組名  分別存可讀池,可寫池,持久化池於對應的map中【此處如果拿到一個可讀池,依然可以做寫操作】
      readPoolMap.put(serverGroupCodeList.get(i), readConnectionPools[i]);
      writePoolMap.put(serverGroupCodeList.get(i), writeConnectionPools[i]);
      persistPoolMap.put(serverGroupCodeList.get(i), persistConnectionPools[i]);
    }
    long end = System.nanoTime();
    log.debug("初始化連接池用時:" + (end - start));

  }

釋放回連接池和銷毀連接

  /**
   * 根據組將連接釋放,重新放回連接池,解決多個線程頻繁取獲連接導致連接池連接數不夠用的問題
   * 
   * @para gourp 組名
   * @para connectionType 連接類型:1.讀2.寫
   */
  public static void releaseConnection(ShardedJedis jedis, String group, int connectionType)
      throws Exception {
    try {
      ShardedJedisPool rp = null;
      if (connectionType == RedisConstants.READ_ONLY_CONNECTION) {
        rp = (ShardedJedisPool) readPoolMap.get(group.toUpperCase());
      } else if (connectionType == RedisConstants.WRITE_ONLY_CONNECTION) {
        rp = (ShardedJedisPool) writePoolMap.get(group.toUpperCase());
      } else if (connectionType == RedisConstants.PERSIST_ONLYL_CONNECTION) {
        rp = (ShardedJedisPool) persistPoolMap.get(group.toUpperCase());
      }
      rp.returnResource(jedis);
    } catch (Exception ex) {
      log.error("釋放連接debug");
    }
  }

  /**
   * 銷毀連接 連接超時以后不能只是釋放連接,需要銷毀,否則下次使用會取到上次的結果,出現類型轉換出錯的問題。
   * 
   * @param jedis
   * @param group
   * @param connectionType
   * @throws Exception
   */
  public static void destoryConnection(ShardedJedis jedis, String group, int connectionType)
      throws Exception {
    try {
      ShardedJedisPool rp = null;
      if (connectionType == RedisConstants.READ_ONLY_CONNECTION) {
        rp = (ShardedJedisPool) readPoolMap.get(group.toUpperCase());
      } else if (connectionType == RedisConstants.WRITE_ONLY_CONNECTION) {
        rp = (ShardedJedisPool) writePoolMap.get(group.toUpperCase());
      } else if (connectionType == RedisConstants.PERSIST_ONLYL_CONNECTION) {
        rp = (ShardedJedisPool) persistPoolMap.get(group.toUpperCase());
      }
      rp.returnBrokenResource(jedis);
    } catch (Exception ex) {
      log.error("銷毀連接debug");
    }
  }

 測試連接,可用於故障隔離

  /**
   * 初始化時調用,用於初始化set初始值,並測試連通性
   * @param host
   * @param port
   * @param passwd
   * @param group
   * @return
   */
  private static Boolean TestConnection(String host, int port, String passwd, String group) {
    Boolean rtn = Boolean.FALSE;
    JedisShardInfo jsd = new JedisShardInfo(host, port);
    if (null != passwd && !"".equals(passwd.trim()))
      jsd.setPassword(passwd);
    Jedis jd = null;
    try {
      jd = new Jedis(jsd);
      jd.set(group + "AILK_REDIS_CONNECT_TEST", "TRUE");
      rtn = Boolean.TRUE;
    } catch (Exception ex) {
      if (log.isDebugEnabled()) {
        log.error("【調試】," + host + ":" + port + "拒絕連接!" + ex.getMessage());
      }
    } finally {
      if (null != jd)
        jd.disconnect();
    }
    return rtn;
  }

  /**
   * 故障隔離調用,通過是否能夠get值來判斷
   * @param host
   * @param port
   * @param passwd
   * @param group
   * @return
   */
  private static Boolean isConnectioned(String host, int port, String passwd, String group) {
    Boolean rtn = Boolean.FALSE;
    JedisShardInfo jsd = new JedisShardInfo(host, port);
    if (null != passwd && !"".equals(passwd.trim()))
      jsd.setPassword(passwd);
    Jedis jd = null;
    try {
      jd = new Jedis(jsd);
      String ailkRedisConnectTest = jd.get(group + "AILK_REDIS_CONNECT_TEST");
      if (ailkRedisConnectTest != null && ailkRedisConnectTest.equalsIgnoreCase("true"))
        rtn = Boolean.TRUE;
    } catch (Exception ex) {
      if (log.isDebugEnabled()) {
        log.error("【調試】," + host + ":" + port + "拒絕連接!" + ex.getMessage());
      }
    } finally {
      if (null != jd)
        jd.disconnect();
    }
    return rtn;
  }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM