Zookeeper + Guava loading cache 實現分布式緩存


1. 概述

項目中,創建的活動內容存入redis,然后需要用到活動內容的地方,從redis去取,然后參與計算。

活動數據的一個特點是更新不頻繁、數據量不大。因為項目部署一般是多機器、多實例,除了redis,有沒有其他實現呢?

Guava的 loading cache是本地緩存,數據量不是很大時 可以適用(如果有大量的key-value數據緩存本地,本機也吃不消啊),

然后多機器多實例怎么同步呢?想到了zookeeper....

 

2. 代碼

2.1 模擬多實例

package TestZK;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/20.
 */
public class InstanceOne {
    private static final String ADDRESS = "xxx:2181";
    private static final String PATH = "/strategy";
    private static CuratorFramework client;
    static {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
                .connectString(ADDRESS)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
//                .namespace(BASE)
                .build();
        client.start();
    }

    public static void main(String[] args) throws Exception {
        startCache();
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static void startCache() throws Exception {
        PathChildrenCache childrenCache = new PathChildrenCache(client, PATH, true);
        childrenCache.start();
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                System.out.println("catch that: the path of changed node "+ event.getData().getPath()
                        + ", the data of changed node is " + new String(event.getData().getData()));
                //load data to loading cache(guava)
                doSomething();
            }
        });
    }
    //load data to loading cache
    private static void doSomething() {
//        LoadingCache<String, Map<Integer, DriverIntegral>> driverIntegralCache = CacheBuilder.newBuilder().maximumSize(500)
//                .expireAfterWrite(5, TimeUnit.MINUTES).build(new CacheLoader<String, Map<Integer, DriverIntegral>>() {
//                    @Override
//                    public Map<Integer, DriverIntegral> load(String key) throws Exception {
//                        Map<Integer, DriverIntegral> integerDriverIntegralMap = ..;
//                        logger.info("guava load ::driverIntegralMap"+integerDriverIntegralMap!=null?integerDriverIntegralMap.toString():"");
//                        return integerDriverIntegralMap;
//                    }
//                });
//
//        Map<Integer, DriverIntegral> driverIntegralMap = driverIntegralCache.get(INTEGRAL);
    }
}

package TestZK;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/20.
 */
public class InstanceTwo {
    private static final String ADDRESS = "xxx:2181";
    private static final String PATH = "/strategy";
    private static CuratorFramework client;
    static {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
                .connectString(ADDRESS)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
//                .namespace(BASE)
                .build();
        client.start();
    }

    public static void main(String[] args) throws Exception {
        startCache();
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static void startCache() throws Exception {
        PathChildrenCache childrenCache = new PathChildrenCache(client, PATH, true);
        childrenCache.start();
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                System.out.println("catch that: the path of changed node "+ event.getData().getPath()
                        + ", the data of changed node is " + new String(event.getData().getData()));
            }
        });
    }
}

package TestZK;

        import org.apache.curator.RetryPolicy;
        import org.apache.curator.framework.CuratorFramework;
        import org.apache.curator.framework.CuratorFrameworkFactory;
        import org.apache.curator.framework.recipes.cache.PathChildrenCache;
        import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
        import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
        import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/20.
 */
public class InstanceThree {
    private static final String ADDRESS = "xxx:2181";
    private static final String PATH = "/strategy";
    private static CuratorFramework client;
    static {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
                .connectString(ADDRESS)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
//                .namespace(BASE)
                .build();
        client.start();
    }

    public static void main(String[] args) throws Exception {
        startCache();
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static void startCache() throws Exception {
        PathChildrenCache childrenCache = new PathChildrenCache(client, PATH, true);
        childrenCache.start();
        childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                System.out.println("catch that: the path of changed node is "+ event.getData().getPath()
                        + ", the data of changed node is " +  new String(event.getData().getData()));
            }
        });
    }
}
View Code

監聽zk孩子節點,有變化時(創建、更新、刪除),重新從DB加載活動數據,緩存到loading cache,用到活動數據的地方,從cache中取。

2.2 模擬客戶端

package TestZK;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @Author: rocky
 * @Date: Created in 2018/5/20.
 */
public class NodeChangeTest {
    private static final String ADDRESS = "xxx:2181";
    private static final String PATH = "/strategy";
    private static CuratorFramework client;
    static {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.builder()
                .connectString(ADDRESS)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
//                .namespace(BASE)
                .build();
        client.start();
    }

    public static void main(String[] args) throws Exception {
        String path_node_1 = PATH + "/node_1";
        //create
        client.create().creatingParentsIfNeeded().forPath(path_node_1);
        Thread.sleep(2000);
        //change
        client.setData().forPath(path_node_1, "yahahaha".getBytes());
        Thread.sleep(2000);
        //delete
        client.delete().forPath(path_node_1);
        Thread.sleep(Integer.MAX_VALUE);
    }
}
View Code

客戶端實際上可能是某個后台,在后台創建、修改、或刪除某活動,然后對應操作zookeeper節點,其他監聽到節點變化的地方,做相應操作(如查詢DB並緩存數據)

控制台

 

3. 說明

Zookeeper原生的API只能實現一次監聽,這里用到的Curator的封裝jar包,免去了手動重復注冊。另外Curator的NodeCache和TreeCache分別監聽本節點及分支所有節點,

該實例演示的PathChildrenCache只能監控一級子節點(即兒子節點、孫子節點也不能監控),根據需要選擇相應NodeCache.

很low的實現,請多指教^_^

 

 

Zookeeper Curator API 使用 www.cnblogs.com/rocky-fang/p/9037509.html

Zookeeper JAVA API的使用 http://www.cnblogs.com/rocky-fang/p/9030438.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM