Redis緩存架構設計
對於下面兩個架構圖,有如下想法:
1)redis主從復制模式,為了解決master讀寫壓力,對master進行寫操作,對slave進行讀操作。
2)而在分片集群中,如果對部分分片進行寫,部分分片進行讀,那么會導致寫入后無法get指定key的情況。
3)二級緩存有必要嗎?二級緩存最主要的問題解決存儲介質由磁盤存儲轉變為內存存儲,而redis本身就作為內存數據庫,最主要的只能夠解決網絡問題。
其次多個應用對同一條key-value做修改,是否能保證各個二級緩存數據的一致性呢。
主從讀寫分離設計
//定義統一連接接口 public interface IRedisConnection { public String connection_name = ""; /** * @return the connection_name */ public String getConnection_name() ; /** * @param connectionName the connection_name to set */ public void setConnection_name(String connectionName) ; /** * @param key value * @param value */ public void write(String key,String value)throws Exception; /** * @param key * @return */ public Object read(String key)throws Exception; // public boolean isValiate() ; } //讀連接 public class ReadConnection extends Jedis implements IRedisConnection{ public String connection_name = ""; /** * @param host */ public ReadConnection(String host,long port) { super(host,(int)port); // TODO Auto-generated constructor stub } /** * @return the connection_name */ public String getConnection_name() { return connection_name; } /** * @param connectionName the connection_name to set */ public void setConnection_name(String connectionName) { connection_name = connectionName; } public Object read(String key){ return this.get(key); } public void write(String key,String value) throws Exception{ throw new Exception("未定義的方法"); } } //寫連接 public class WriteConnection extends Jedis implements IRedisConnection{ public String connection_name = ""; /** * @param host */ public WriteConnection(String host,long port) { super(host,(int)port); // TODO Auto-generated constructor stub } /** * @return the connection_name */ public String getConnection_name() { return connection_name; } /** * @param connectionName the connection_name to set */ public void setConnection_name(String connectionName) { connection_name = connectionName; } public void write(String key,String value){ this.set(key, value); } public Object read(String key)throws Exception{ throw new Exception("未錕斤拷錕斤拷姆錕斤拷錕?"); } } //獲取一個指定讀/寫的連接 當然可以,故障檢測 ,故障隔離 ,使用 JedisPool保存在內存中 public static IRedisConnection getConnection(String code, int connectionType) throws Exception { IRedisBaseSV sv = (IRedisBaseSV) ServiceFactory .getService(IRedisBaseSV.class); CFG_REDIS_SERVERBean server = sv.getServerFromCode(code); if (server == null) { log.error("根據編碼:" + code + "不能找到對應的redis服務器資源"); throw new Exception("根據編碼:" + code + "不能找到對應的redis服務器資源"); } IRedisConnection conn; if (connectionType == RedisConstants.WRITE_ONLY_CONNECTION) { conn = new WriteConnection(server.getServerIp(), server .getServerPort()); } else { conn = new ReadConnection(server.getServerIp(), server .getServerPort()); } conn.setConnection_name(code); return conn; }
初始化切片池
/** * 初始化切片池 */ private static void initialShardedPool() throws Exception { long start = System.nanoTime(); IRedisBaseSV sv = (IRedisBaseSV) ServiceFactory.getService(IRedisBaseSV.class); // 初始化JedisPoolConfig CFG_REDIS_PARAMETERBean[] para = sv.getRedisConfig("DEFAULT"); for (int i = 0; i < para.length; i++) { if (para[i].getParameterName().equalsIgnoreCase(RedisConstants.REDIS_SERVER_SHARED_MAXACTIVE)) config.setMaxActive(Integer.parseInt(para[i].getParameterValue())); if (para[i].getParameterName().equalsIgnoreCase(RedisConstants.REDIS_SERVER_SHARED_MAXIDLE)) config.setMaxIdle(Integer.parseInt(para[i].getParameterValue())); if (para[i].getParameterName().equalsIgnoreCase(RedisConstants.REDIS_SERVER_SHARED_MAXWAIT)) config.setMaxWait(Long.parseLong(para[i].getParameterValue())); if (para[i].getParameterName().equalsIgnoreCase( RedisConstants.REDIS_SERVER_SHARED_TESTONBORROW)) config.setTestOnBorrow(Boolean.getBoolean(para[i].getParameterValue())); if (para[i].getParameterName().equalsIgnoreCase(RedisConstants.REDIS_SERVER_SHARED_NEED_SYN)) NEED_WRITE_SYNCHRONIZE = Boolean.parseBoolean(para[i].getParameterValue()); // Boolean.getBoolean不對 } CFG_REDIS_SERVERBean[] servers = sv.getRedisServer(); HashMap map = new HashMap(); for (int i = 0; i < servers.length; i++) { map.put(servers[i].getBelongGroup().toUpperCase(), servers[i].getBelongGroup().toUpperCase()); } serverGroupCodeList = new ArrayList(map.values()); ShardedJedisPool[] readConnectionPools; ShardedJedisPool[] writeConnectionPools; ShardedJedisPool[] persistConnectionPools; readConnectionPools = new ShardedJedisPool[serverGroupCodeList.size()]; writeConnectionPools = new ShardedJedisPool[serverGroupCodeList.size()]; persistConnectionPools = new ShardedJedisPool[serverGroupCodeList.size()]; for (int i = 0; i < serverGroupCodeList.size(); i++) { List<JedisShardInfo> readShards = new ArrayList<JedisShardInfo>(); List<JedisShardInfo> writeShards = new ArrayList<JedisShardInfo>(); List<JedisShardInfo> persistShards = new ArrayList<JedisShardInfo>(); //遍歷所有的redis server實例 for (int j = 0; j < servers.length; j++) { if (servers[j].getBelongGroup() .equalsIgnoreCase(String.valueOf(serverGroupCodeList.get(i))) && servers[j].getUseType().equalsIgnoreCase( String.valueOf(RedisConstants.READ_ONLY_CONNECTION))) { // 先測試該連接是否可用 readServersAll.add(servers[j]); Boolean connectionTest = TestConnection(servers[j].getServerIp(), (int) servers[j].getServerPort(), decryption(servers[j].getRequirepass()), servers[j].getBelongGroup()); if (connectionTest) { JedisShardInfo jds = new JedisShardInfo(servers[j].getServerIp(), (int) servers[j].getServerPort()); if (null != servers[j].getRequirepass() && !"".equals(servers[j].getRequirepass().trim())) jds.setPassword(decryption(servers[j].getRequirepass())); readShards.add(jds); } } else if (servers[j].getBelongGroup().equalsIgnoreCase( String.valueOf(serverGroupCodeList.get(i))) && servers[j].getUseType().equalsIgnoreCase( String.valueOf(RedisConstants.WRITE_ONLY_CONNECTION))) { Boolean connectionTest = TestConnection(servers[j].getServerIp(), (int) servers[j].getServerPort(), decryption(servers[j].getRequirepass()), servers[j].getBelongGroup()); if (connectionTest) { JedisShardInfo jds = new JedisShardInfo(servers[j].getServerIp(), (int) servers[j].getServerPort()); if (null != servers[j].getRequirepass() && !"".equals(servers[j].getRequirepass().trim())) jds.setPassword(decryption(servers[j].getRequirepass())); writeShards.add(jds); } } else if (servers[j].getBelongGroup().equalsIgnoreCase( String.valueOf(serverGroupCodeList.get(i))) && servers[j].getUseType().equalsIgnoreCase( String.valueOf(RedisConstants.PERSIST_ONLYL_CONNECTION))) { Boolean connectionTest = TestConnection(servers[j].getServerIp(), (int) servers[j].getServerPort(), decryption(servers[j].getRequirepass()), servers[j].getBelongGroup()); if (connectionTest) { JedisShardInfo jds = new JedisShardInfo(servers[j].getServerIp(), (int) servers[j].getServerPort()); if (null != servers[j].getRequirepass() && !"".equals(servers[j].getRequirepass().trim())) jds.setPassword(decryption(servers[j].getRequirepass())); persistShards.add(jds); } } } // 構造池 每個組分別對應三個分片池,可讀分片池,可寫分片池,持久化分片池 readConnectionPools[i] = new ShardedJedisPool(config, readShards, Hashing.MURMUR_HASH, Sharded.DEFAULT_KEY_TAG_PATTERN); writeConnectionPools[i] = new ShardedJedisPool(config, writeShards, Hashing.MURMUR_HASH, Sharded.DEFAULT_KEY_TAG_PATTERN); persistConnectionPools[i] = new ShardedJedisPool(config, persistShards, Hashing.MURMUR_HASH, Sharded.DEFAULT_KEY_TAG_PATTERN); //按照組名 分別存可讀池,可寫池,持久化池於對應的map中【此處如果拿到一個可讀池,依然可以做寫操作】 readPoolMap.put(serverGroupCodeList.get(i), readConnectionPools[i]); writePoolMap.put(serverGroupCodeList.get(i), writeConnectionPools[i]); persistPoolMap.put(serverGroupCodeList.get(i), persistConnectionPools[i]); } long end = System.nanoTime(); log.debug("初始化連接池用時:" + (end - start)); }
釋放回連接池和銷毀連接
/** * 根據組將連接釋放,重新放回連接池,解決多個線程頻繁取獲連接導致連接池連接數不夠用的問題 * * @para gourp 組名 * @para connectionType 連接類型:1.讀2.寫 */ public static void releaseConnection(ShardedJedis jedis, String group, int connectionType) throws Exception { try { ShardedJedisPool rp = null; if (connectionType == RedisConstants.READ_ONLY_CONNECTION) { rp = (ShardedJedisPool) readPoolMap.get(group.toUpperCase()); } else if (connectionType == RedisConstants.WRITE_ONLY_CONNECTION) { rp = (ShardedJedisPool) writePoolMap.get(group.toUpperCase()); } else if (connectionType == RedisConstants.PERSIST_ONLYL_CONNECTION) { rp = (ShardedJedisPool) persistPoolMap.get(group.toUpperCase()); } rp.returnResource(jedis); } catch (Exception ex) { log.error("釋放連接debug"); } } /** * 銷毀連接 連接超時以后不能只是釋放連接,需要銷毀,否則下次使用會取到上次的結果,出現類型轉換出錯的問題。 * * @param jedis * @param group * @param connectionType * @throws Exception */ public static void destoryConnection(ShardedJedis jedis, String group, int connectionType) throws Exception { try { ShardedJedisPool rp = null; if (connectionType == RedisConstants.READ_ONLY_CONNECTION) { rp = (ShardedJedisPool) readPoolMap.get(group.toUpperCase()); } else if (connectionType == RedisConstants.WRITE_ONLY_CONNECTION) { rp = (ShardedJedisPool) writePoolMap.get(group.toUpperCase()); } else if (connectionType == RedisConstants.PERSIST_ONLYL_CONNECTION) { rp = (ShardedJedisPool) persistPoolMap.get(group.toUpperCase()); } rp.returnBrokenResource(jedis); } catch (Exception ex) { log.error("銷毀連接debug"); } }
測試連接,可用於故障隔離
/** * 初始化時調用,用於初始化set初始值,並測試連通性 * @param host * @param port * @param passwd * @param group * @return */ private static Boolean TestConnection(String host, int port, String passwd, String group) { Boolean rtn = Boolean.FALSE; JedisShardInfo jsd = new JedisShardInfo(host, port); if (null != passwd && !"".equals(passwd.trim())) jsd.setPassword(passwd); Jedis jd = null; try { jd = new Jedis(jsd); jd.set(group + "AILK_REDIS_CONNECT_TEST", "TRUE"); rtn = Boolean.TRUE; } catch (Exception ex) { if (log.isDebugEnabled()) { log.error("【調試】," + host + ":" + port + "拒絕連接!" + ex.getMessage()); } } finally { if (null != jd) jd.disconnect(); } return rtn; } /** * 故障隔離調用,通過是否能夠get值來判斷 * @param host * @param port * @param passwd * @param group * @return */ private static Boolean isConnectioned(String host, int port, String passwd, String group) { Boolean rtn = Boolean.FALSE; JedisShardInfo jsd = new JedisShardInfo(host, port); if (null != passwd && !"".equals(passwd.trim())) jsd.setPassword(passwd); Jedis jd = null; try { jd = new Jedis(jsd); String ailkRedisConnectTest = jd.get(group + "AILK_REDIS_CONNECT_TEST"); if (ailkRedisConnectTest != null && ailkRedisConnectTest.equalsIgnoreCase("true")) rtn = Boolean.TRUE; } catch (Exception ex) { if (log.isDebugEnabled()) { log.error("【調試】," + host + ":" + port + "拒絕連接!" + ex.getMessage()); } } finally { if (null != jd) jd.disconnect(); } return rtn; }