Zookeeepr實現分布式集群監控
Zookeeper中節點有兩種:臨時節點和永久節點
從類型上看節點又可以分為四種節點類型:PERSIST,PERSIST_SEQUENTIAL,EPHEMERAL,EPHEMERAL_SEQUENTIAL
臨時節點有一個特點:當創建臨時節點的程序停掉之后,這個臨時節點就會消失。
監視器的特點:可以給zk中的節點注冊監視器,見識這個節點的變化情況。
監視器注冊一次,只能使用一次,多次使用就要多次注冊。
我們利用這個Zookeeper的臨時節點特性+監視器(Watch)來實現分布式集群監控
我們在/monitor(永久節點)下創建臨時節點。
實際上,zookeeper的sdk不是特別好用,很多邊界情況需要用戶自己處理。curator是對Zookeeper的sdk進行了封裝,所以說使用curator操作Zookeeper更加方便
在maven官網找到curator的依賴
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework,支持zookeeper3.4.6--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.10.0</version> </dependency>
我們通過curator來使用zookeeper,那么我們就必須知道如何使用curator來連上zookeeper,下面代碼是官方文檔所給出
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start();
開始我們的代碼
TestCurator.java ,實現功能:創建Zookeeper臨時節點
1 package zkdemo; 2 3 import java.net.InetAddress; 4 5 import org.apache.curator.RetryPolicy; 6 import org.apache.curator.framework.CuratorFramework; 7 import org.apache.curator.framework.CuratorFrameworkFactory; 8 import org.apache.curator.retry.ExponentialBackoffRetry; 9 10 import org.apache.zookeeper.CreateMode; 11 import org.apache.zookeeper.ZooDefs.Ids; 12 import org.apache.zookeeper.ZooKeeper; 13 import org.junit.Test; 14 15 16 public class TestCurator { 17 18 @Test 19 public void test1() throws Exception{ 20 //1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 21 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 22 String connectString = "djt1:2181,djt2:2181,djt3:2181,djt4:2181,djt5:2181"; 23 int sessionTimeoutMs = 5000;//這個值只能在4000-40000ms之間表示鏈接斷開后多長時間臨時節點會小時 24 int connectionTimeoutMs = 3000;//獲取鏈接的超時時間 25 //創建一個zk連接 26 CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs 27 ,connectionTimeoutMs,retryPolicy); 28 29 client.start(); 30 31 InetAddress localHost = InetAddress.getLocalHost(); 32 String ip = localHost.getHostAddress(); 33 34 client.create().creatingParentsIfNeeded() 35 .withMode(CreateMode.EPHEMERAL)//指定節點類型 36 .withACL(Ids.OPEN_ACL_UNSAFE)//指定設置節點權限信息 37 .forPath("/monitor/"+ip);//指定節點名稱 38 39 while(true) 40 { 41 ; 42 } 43 } 44 }
ZkNodeWatcher.java 實現功能:注冊watcher,監視節點的變化
1 package zk; 2 3 import java.util.List; 4 import java.util.ArrayList; 5 6 import org.apache.curator.RetryPolicy; 7 import org.apache.curator.framework.CuratorFramework; 8 import org.apache.curator.framework.CuratorFrameworkFactory; 9 import org.apache.curator.retry.ExponentialBackoffRetry; 10 import org.apache.zookeeper.WatchedEvent; 11 import org.apache.zookeeper.Watcher; 12 13 14 15 public class ZkNodeWatcher implements Watcher{ 16 CuratorFramework client; 17 List<String> childrenList = new ArrayList<String>(); 18 List<String> newChildrenList = new ArrayList<String>(); 19 20 public ZkNodeWatcher(){ 21 //1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 22 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 23 String connectString = "djt1:2181,djt2:2181,djt3:2181,djt4:2181,djt5:2181"; 24 int sessionTimeoutMs = 5000;//這個值只能在4000-40000ms之間表示鏈接斷開后多長時間臨時節點會消失 25 int connectionTimeoutMs = 3000;//獲取鏈接的超時時間 26 //創建一個zk連接 27 client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs 28 ,connectionTimeoutMs,retryPolicy); 29 client.start(); 30 31 //監視monitor節點,獲取下面的所有子節點的變化情況 32 try { 33 childrenList = client.getChildren().usingWatcher(this).forPath("/monitor"); 34 } catch (Exception e) { 35 // TODO Auto-generated catch block 36 e.printStackTrace(); 37 } 38 } 39 40 41 42 /** 43 * 實現一個zk監視器,監視某個節點的變化情況 44 * 45 * 這個監視程序需要一直運行 46 * @CPH 47 */ 48 49 public void process(WatchedEvent event) { 50 System.out.println("我被調用了"); 51 try { 52 newChildrenList = client.getChildren().usingWatcher(this).forPath("/monitor"); 53 for(String ip : childrenList) 54 { 55 if(!newChildrenList.contains(ip)){ 56 System.out.println("節點消失了"+ip); 57 //TODO 給管理員發送短信什么的 58 59 } 60 } 61 62 for(String ip : newChildrenList){ 63 if(!childrenList.contains(ip)){ 64 System.out.println("節點新增"+ip); 65 } 66 } 67 //重要 68 childrenList = newChildrenList; 69 } catch (Exception e) { 70 // TODO Auto-generated catch block 71 e.printStackTrace(); 72 } 73 74 75 76 } 77 78 public void start(){ 79 while (true){;} 80 } 81 82 public static void main(String[] args) { 83 ZkNodeWatcher watcher = new ZkNodeWatcher(); 84 watcher.start(); 85 } 86 }
我們先開啟Zookeeper集群,啟動/bin/zkCli.sh,然后啟動這2個集群,我們可以看到由對應的ip目錄,這個ip不是虛擬機的ip,而是本地的ip,同時我們console下看到
然后暫停TestCurator.java,不一會兒,就會看到
這樣,通過Zookeeper實現分布式集群監控的功能就完成了!