1. 概述
項目中,創建的活動內容存入redis,然后需要用到活動內容的地方,從redis去取,然后參與計算。
活動數據的一個特點是更新不頻繁、數據量不大。因為項目部署一般是多機器、多實例,除了redis,有沒有其他實現呢?
Guava的 loading cache是本地緩存,數據量不是很大時 可以適用(如果有大量的key-value數據緩存本地,本機也吃不消啊),
然后多機器多實例怎么同步呢?想到了zookeeper....
2. 代碼
2.1 模擬多實例
package TestZK; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; /** * @Author: rocky * @Date: Created in 2018/5/20. */ public class InstanceOne { private static final String ADDRESS = "xxx:2181"; private static final String PATH = "/strategy"; private static CuratorFramework client; static { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) // .namespace(BASE) .build(); client.start(); } public static void main(String[] args) throws Exception { startCache(); Thread.sleep(Integer.MAX_VALUE); } private static void startCache() throws Exception { PathChildrenCache childrenCache = new PathChildrenCache(client, PATH, true); childrenCache.start(); childrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception { System.out.println("catch that: the path of changed node "+ event.getData().getPath() + ", the data of changed node is " + new String(event.getData().getData())); //load data to loading cache(guava) doSomething(); } }); } //load data to loading cache private static void doSomething() { // LoadingCache<String, Map<Integer, DriverIntegral>> driverIntegralCache = CacheBuilder.newBuilder().maximumSize(500) // .expireAfterWrite(5, TimeUnit.MINUTES).build(new CacheLoader<String, Map<Integer, DriverIntegral>>() { // @Override // public Map<Integer, DriverIntegral> load(String key) throws Exception { // Map<Integer, DriverIntegral> integerDriverIntegralMap = ..; // logger.info("guava load ::driverIntegralMap"+integerDriverIntegralMap!=null?integerDriverIntegralMap.toString():""); // return integerDriverIntegralMap; // } // }); // // Map<Integer, DriverIntegral> driverIntegralMap = driverIntegralCache.get(INTEGRAL); } } package TestZK; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; /** * @Author: rocky * @Date: Created in 2018/5/20. */ public class InstanceTwo { private static final String ADDRESS = "xxx:2181"; private static final String PATH = "/strategy"; private static CuratorFramework client; static { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) // .namespace(BASE) .build(); client.start(); } public static void main(String[] args) throws Exception { startCache(); Thread.sleep(Integer.MAX_VALUE); } private static void startCache() throws Exception { PathChildrenCache childrenCache = new PathChildrenCache(client, PATH, true); childrenCache.start(); childrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception { System.out.println("catch that: the path of changed node "+ event.getData().getPath() + ", the data of changed node is " + new String(event.getData().getData())); } }); } } package TestZK; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; /** * @Author: rocky * @Date: Created in 2018/5/20. */ public class InstanceThree { private static final String ADDRESS = "xxx:2181"; private static final String PATH = "/strategy"; private static CuratorFramework client; static { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) // .namespace(BASE) .build(); client.start(); } public static void main(String[] args) throws Exception { startCache(); Thread.sleep(Integer.MAX_VALUE); } private static void startCache() throws Exception { PathChildrenCache childrenCache = new PathChildrenCache(client, PATH, true); childrenCache.start(); childrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception { System.out.println("catch that: the path of changed node is "+ event.getData().getPath() + ", the data of changed node is " + new String(event.getData().getData())); } }); } }
監聽zk孩子節點,有變化時(創建、更新、刪除),重新從DB加載活動數據,緩存到loading cache,用到活動數據的地方,從cache中取。
2.2 模擬客戶端
package TestZK; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; /** * @Author: rocky * @Date: Created in 2018/5/20. */ public class NodeChangeTest { private static final String ADDRESS = "xxx:2181"; private static final String PATH = "/strategy"; private static CuratorFramework client; static { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) // .namespace(BASE) .build(); client.start(); } public static void main(String[] args) throws Exception { String path_node_1 = PATH + "/node_1"; //create client.create().creatingParentsIfNeeded().forPath(path_node_1); Thread.sleep(2000); //change client.setData().forPath(path_node_1, "yahahaha".getBytes()); Thread.sleep(2000); //delete client.delete().forPath(path_node_1); Thread.sleep(Integer.MAX_VALUE); } }
客戶端實際上可能是某個后台,在后台創建、修改、或刪除某活動,然后對應操作zookeeper節點,其他監聽到節點變化的地方,做相應操作(如查詢DB並緩存數據)
控制台

3. 說明
Zookeeper原生的API只能實現一次監聽,這里用到的Curator的封裝jar包,免去了手動重復注冊。另外Curator的NodeCache和TreeCache分別監聽本節點及分支所有節點,
該實例演示的PathChildrenCache只能監控一級子節點(即兒子節點、孫子節點也不能監控),根據需要選擇相應NodeCache.
很low的實現,請多指教^_^
Zookeeper Curator API 使用 www.cnblogs.com/rocky-fang/p/9037509.html
Zookeeper JAVA API的使用 http://www.cnblogs.com/rocky-fang/p/9030438.html
