zookeeper framework 之 Netflix curator


zookeeper framework 之 Netflix curator(完美支持永久監聽)

介紹

curator是Netflix公司開源的zookeeper client library
官方地址:https://github.com/Netflix/curator/wiki/Recipes
詳細介紹1:http://macrochen.iteye.com/blog/1366136/
詳細介紹2:http://blog.csdn.net/alivetime/article/details/7101014
Curator主要解決了三類問題:

  • 封裝ZooKeeper client與ZooKeeper server之間的連接處理;
  • 提供了一套Fluent風格的操作API;
  • 提供ZooKeeper各種應用場景(recipe, 比如共享鎖服務, 集群領導選舉機制)的抽象封裝.

Curator幾個組成部分

  • Client: 是ZooKeeper客戶端的一個替代品, 提供了一些底層處理和相關的工具方法.
  • Framework: 用來簡化ZooKeeper高級功能的使用, 並增加了一些新的功能, 比如管理到ZooKeeper集群的連接, 重試處理
  • Recipes: 實現了通用ZooKeeper的recipe, 該組件建立在Framework的基礎之上
  • Utilities:各種ZooKeeper的工具類
  • Errors: 異常處理, 連接, 恢復等.
  • Extensions: recipe擴展

maven dependency

<dependency>
    <groupId>com.netflix.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>1.3.3</version>
</dependency>

curator framework 使用

String path = "/test_path";

CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString("test:2181").namespace("/test1")
        .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
        .connectionTimeoutMs(5000).build();

// start
client.start();

// create a node
client.create().forPath("/head", new byte[0]);

// delete a node in background
client.delete().inBackground().forPath("/head");

// create a EPHEMERAL_SEQUENTIAL
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child", new byte[0]);

// get the data 
client.getData().watched().inBackground().forPath("/test");

// check the path exits
client.checkExists().forPath(path);

InterProcessMutex(進程間互斥鎖)

String lockName = "/lock1";
InterProcessLock lock1 = new InterProcessMutex(this.curator, lockName);
InterProcessLock lock2 = new InterProcessMutex(this.curator, lockName);
lock1.acquire();
boolean result = lock2.acquire(1, TimeUnit.SECONDS);
assertFalse(result);
lock1.release();
result = lock2.acquire(1, TimeUnit.SECONDS);
assertTrue(result);

原理:每次調用acquire在lock1節點下使用createMode.EPHEMERAL_SEQUENTIAL創建的ephemeral節點,然后getChildren獲取所有的children,判斷剛剛創建的臨時節點是否為第一個,如果是,則獲取鎖成功;如果不是則刪除剛剛創建的臨時節點。
注意:每次accquire操作,成功則請求zkserver 2次,一次寫,一次getChildren;如果失敗則請求zkserver三次(一次寫,一次getChildren,一次刪除)

InterProcessReadWriteLock(節點讀寫鎖)

示例

	@Test
	public void testReadWriteLock() throws Exception{
		String readWriteLockPath = "/RWLock";
		InterProcessReadWriteLock readWriteLock1 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);
		InterProcessMutex writeLock1 = readWriteLock1.writeLock();
		InterProcessMutex readLock1 = readWriteLock1.readLock();
		
		InterProcessReadWriteLock readWriteLock2 = new InterProcessReadWriteLock(this.curator, readWriteLockPath);
		InterProcessMutex writeLock2 = readWriteLock2.writeLock();
		InterProcessMutex readLock2 = readWriteLock2.readLock();
		writeLock1.acquire();
		
		// same with WriteLock, can read
		assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));
		
		// different lock, can't read while writting
		assertFalse(readLock2.acquire(1, TimeUnit.SECONDS));
		
		// different write lock, can't write
		assertFalse(writeLock2.acquire(1, TimeUnit.SECONDS));
		
		// release the write lock
		writeLock1.release();
		
		//both read lock can read
		assertTrue(readLock1.acquire(1, TimeUnit.SECONDS));
		assertTrue(readLock2.acquire(1, TimeUnit.SECONDS));
	}

原理: 同InterProcessMutext,在ephemeral node的排序算法上做trick,write lock的排序在前。
注意: 同一個InterProcessReadWriteLock如果已經獲取了write lock,則獲取read lock也會成功

LeaderSelector(leader 選舉)

	@Test
	public void testLeader() throws Exception{
		LeaderSelectorListener listener = new LeaderSelectorListener(){


			@Override
			public void takeLeadership(CuratorFramework client)
					throws Exception {
				System.out.println("i'm leader");
			}

			@Override
			public void handleException(CuratorFramework client,
					Exception exception) {
				
			}

			@Override
			public void notifyClientClosing(CuratorFramework client) {
				
			}};
		String leaderPath = "/leader";
		LeaderSelector selector1 = new LeaderSelector(this.curator, leaderPath, listener);
		selector1.start();
		LeaderSelector selector2 = new LeaderSelector(this.curator, leaderPath, listener);
		selector2.start();
		assertFalse(selector2.hasLeadership());
	}

原理:內部基於InterProcessMutex實現

NodeCache(監聽節點數據)

/**
 * 在注冊監聽器的時候,如果傳入此參數,當事件觸發時,邏輯由線程池處理
 */
ExecutorService pool = Executors.newFixedThreadPool(2);

/**
 * 監聽數據節點的變化情況
 */
final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false);
nodeCache.start(true);
nodeCache.getListenable().addListener(
    new NodeCacheListener() {
        @Override
        public void nodeChanged() throws Exception {
            System.out.println("Node data is changed, new data: " + 
                new String(nodeCache.getCurrentData().getData()));
        }
    }, 
    pool
);

PathChildrenCache(監聽子節點目錄變化)

/**
 * 監聽子節點的變化情況
 */
final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true);
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(
    new PathChildrenCacheListener() {
        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
                throws Exception {
                switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED: " + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED: " + event.getData().getPath());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED: " + event.getData().getPath());
                    break;
                default:
                    break;
            }
        }
    },
    pool
);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM