[版權申明:本文系作者原創,轉載請注明出處]
文章出處:http://www.cnblogs.com/sdksdk0/p/5585192.html
作者: 朱培 ID:sdksdk0
今天分享的是大數據實踐的zookeeper。
zookeeper內部就是一個集群,主節點是選舉出來的,外部看起來就像只有一台一樣,保存的是一份狀態數據。 做分布式應用協調的時候,可以降低開發難度。
具有高可用性,松耦合交互方式。
關於zookeeper的配置可以查看我的文章:http://blog.csdn.net/sdksdk0/article/details/51517460,在這里不再重復講解安裝配置。
一、zookeeper的API接口
String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) Stat exists(String path, boolean watch) void delete(String path, int version) List<String> getChildren(String path, boolean watch) List<String> getChildren(String path, boolean watch) Stat setData(String path, byte[] data, int version) byte[] getData(String path, boolean watch, Stat stat) void addAuthInfo(String scheme, byte[] auth) Stat setACL(String path, List<ACL> acl, int version) List<ACL> getACL(String path, Stat stat)
zookeeper一般來說保管的數據不超過1M.主要是保存一些配置信息,主要特點是監聽數據實時更新。
二、主要應用:
1、集群管理:規定編號最小的為master,所以當我們對SERVERS節點做監控的時候,得到服務器列表,只要所有集群機器邏輯認為最小編號節點為master,那么master就被選出,而這個master宕機的時候,相應的znode會消失,然后新的服務器列表就被推送到客戶端,然后每個節點邏輯認為最小編號節點為master,這樣就做到動態master選舉。
2、配置的管理:在分布式應用環境中很常見,例如同一個應用系統需要多台 PC Server 運行,但是它們運行的應用系統的某些配置項是相同的,如果要修改這些相同的配置項,那么就必須同時修改每台運行這個應用系統的 PC Server,這樣非常麻煩而且容易出錯。 將配置信息保存在 Zookeeper 的某個目錄節點中,然后將所有需要修改的應用機器監控配置信息的狀態,一旦配置信息發生變化,每台應用機器就會收到 Zookeeper 的通知,然后從 Zookeeper 獲取新的配置信息應用到系統中。
3、共享鎖:在同一個進程中很容易實現,但是在跨進程或者在不同 Server 之間就不好實現了。Zookeeper 卻很容易實現這個功能,實現方式也是需要獲得鎖的 Server 創建一個 EPHEMERAL_SEQUENTIAL 目錄節點,然后調用 getChildren方法獲取當前的目錄節點列表中最小的目錄節點是不是就是自己創建的目錄節點,如果正是自己創建的,那么它就獲得了這個鎖,如果不是那么它就調用 exists(String path, boolean watch) 方法並監控 Zookeeper 上目錄節點列表的變化,一直到自己創建的節點是列表中最小編號的目錄節點,從而獲得鎖,釋放鎖很簡單,只要刪除前面它自己所創建的目錄節點就行了。
4、隊列管理:Zookeeper 可以處理兩種類型的隊列:當一個隊列的成員都聚齊時,這個隊列才可用,否則一直等待所有成員到達,這種是同步隊列;隊列按照 FIFO 方式進行入隊和出隊操作,例如實現生產者和消費者模型
三、使用eclipse連接zookeeper
在eclipse中,我們可以導入需要的包,然后對節點進行增刪改查操作。 當連接的時候可以這樣,這里我是采用了3台zookeeper來進行操作的。ubuntu1,2,3都分別是主機名
ZooKeeper zk = null; @Before public void init() throws Exception{ zk = new ZooKeeper("ubuntu2:2181,ubuntu1:2181,ubuntu3:2181", 5000, new Watcher() { //監聽事件發生時的回調方法 @Override public void process(WatchedEvent event) { System.out.println(event.getPath()); System.out.println(event.getType()); } }); }
創建節點: 這里我是采用創建一個永久節點,在zookeeper節點中有臨時節點和永久節點之分。在跟目錄下創建一個eclipse節點,內容的編碼格式是utf-8,Ids是指權限控制,我這里采用的是開放ACL權限控制。最后需要把流關閉。
@Test public void testZkNode() throws Exception { String path = zk.create("/eclipse", "指令匯科技".getBytes("utf-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("創建了一個永久節點: " + path); zk.close(); }
然后是注冊監聽器,畢竟zookeeper有一個很重要的功能就是是用來監聽整個服務的狀態。
@Test public void testGet() throws Exception { //監聽器的注冊只能生效一次 byte[] data = zk.getData("/eclipse", true, new Stat()); System.out.println(new String(data,"utf-8")); Thread.sleep(Long.MAX_VALUE); }
在main方法中調用執行。
@Test public void testSet() throws UnsupportedEncodingException, KeeperException, InterruptedException{ zk.setData("/eclipse", "誰是英雄".getBytes("utf-8"), -1); zk.close(); }
四、動態服務器
因為我主要分享的是如何在客戶端上動態的監聽服務器的上線和離線,所以我們先來寫一個服務器的進程。
首先我們需要把后面需要的節點信息定義一下,先去zookeeper的客戶端上面運行一下,創建一個grpnode節點,以便我們的后續操作。
private ZooKeeper zk; private String groupNode = "grpnode"; private String subNode = "sub"; // 向zookeeper注冊信息 public void connectZK(String name) throws KeeperException, InterruptedException, IOException { zk = new ZooKeeper("ubuntu2:2181,ubuntu1:2181,ubuntu3:2181", 5000, new Watcher() { //監聽事件發生時的回調方法 @Override public void process(WatchedEvent event) { } }); String path = zk.create("/" + groupNode + "/" + subNode, name.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("服務器上線,創建了一個子節點: " + path); }
接下來就是zookeeper默認的業務邏輯的處理,最后在主方法中調用。當然也可以把這個打成一個jar包放到hadoop上面去運行。
// 業務處理邏輯 public void handle() throws Exception { Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { if(args.length==0){ System.err.println("參數個數不對,請附加服務器名作為參數來啟動....."); System.exit(1); } // 去向zookeeper注冊本服務器信息 AppServer server = new AppServer(); server.connectZK(args[0]); server.handle(); }

五、動態客戶端
服務器寫好之后,我們就需要一個客戶端來監聽這個服務器的上下線操作了。同樣使用一個zookeeper的監聽回調方法。一旦服務器發生變化,這里就可以動態監聽到。主要是監聽父子節點的變化情況。
private volatile List<String> servers; private ZooKeeper zk; //使用zk的監聽器功能觸發服務器更新的動作 public void connectZK() throws IOException, KeeperException, InterruptedException{ zk = new ZooKeeper("ubuntu2:2181,ubuntu1:2181,ubuntu3:2181", 5000, new Watcher() { //監聽事件發生時的回調方法 @Override public void process(WatchedEvent event) { if("/grpnode".equals(event.getPath()) && event.getType()==EventType.NodeChildrenChanged ){ //觸發更新服務器列表的動作 try { updateServerList(); } catch (Exception e) { e.printStackTrace(); } } } }); updateServerList(); }
動態獲取服務器列表,這里主要就是監聽父子節點的變化。
//動態獲取服務器列表 public void updateServerList() throws KeeperException, InterruptedException, UnsupportedEncodingException{ ArrayList<String> serverList=new ArrayList<String>(); //監聽子節點,並且對父節點注冊監聽器 List<String> childer=zk.getChildren("/grpnode", true); //遍歷子節點 for(String child:childer){ byte[] data=zk.getData("/grpnode/"+child,false, new Stat()); String server=new String(data,"utf-8"); //將獲取到的服務器名稱存入list serverList.add(server); } //把暫存的list放到全局的list中 servers=serverList; System.out.println("最新的在線服務器是:"+serverList); }
最后就是我們最熟悉的main方法了
//客戶端的業務功能 public void handle() throws InterruptedException{ Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws IOException, InterruptedException, KeeperException{ AppClient client=new AppClient(); client.connectZK(); client.handle(); }
在hadoop中運行結果如下:
Zookeeper 作為 Hadoop 項目中的一個子項目,是 Hadoop 集群管理的一個必不可少的模塊,它主要用來控制集群中的數據,如它管理 Hadoop 集群中的 NameNode,還有 Hbase 中 Master Election、Server 之間狀態同步等。 Zoopkeeper 提供了一套很好的分布式集群管理的機制,就是它這種基於層次型的目錄樹的數據結構,並對樹中的節點進行有效管理,從而可以設計出多種多樣的分布式的數據管理模型