Java代碼操作zookeeper


以下為一個完整JAVA操作Zookeeper項目步驟:

 

1. 項目中pom.xml中添加需要的依賴jar包信息

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.9</version>
    </dependency>
    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.10</version>
    </dependency>
</dependencies>

 

2. 在resource下添加log4j.properties日志打印信息

log4j.rootLogger=DEBUG,myConsole
log4j.appender.myConsole=org.apache.log4j.ConsoleAppender
log4j.appender.myConsole.ImmediateFlush=true
log4j.appender.myConsole.Target=System.out
log4j.appender.myConsole.layout=org.apache.log4j.PatternLayout
log4j.appender.myConsole.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

 

 3. 使用Java代碼操作Zookeeper

  包括創建節點、設置節點值、獲取節點值、判斷節點是否存在

  創建節點時,存在四種模式:(即在createZKNode方法中)

    1. CreateMode.PERSISTENT :持久節點,一旦創建就保存到硬盤上面

    2. CreateMode.SEQUENTIAL : 順序持久節點

    3. CreateMode.EPHEMERAL :臨時節點,創建以后如果斷開連接則該節點自動刪除

    4. CreateMode.EPHEMERAL_SEQUENTIAL :順序臨時節點

  創建ZKOperaDemo.java類:

package com.hxc.zookeeperDemo;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooDefs.Ids;

/**
 * 
 * @author sun_flower
 * 
 */
public class ZKOperaDemo {
    
    private static String connectString = "192.168.202.132:2181";
    private static int sessionTimeout = 50 * 1000;
    /**
     * 連接Zookeeper服務器
     * @return
     * @throws IOException
     */
    public ZooKeeper connectionZooKeeper() throws IOException {
        
        ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            
            public void process(WatchedEvent event) {
                //可做其他操作(設置監聽或觀察者)
            }
        });
        return zooKeeper;
    }
    
    /**
     * 創建節點
     * 1. CreateMode.PERSISTENT :持久節點,一旦創建就保存到硬盤上面
     2.  CreateMode.SEQUENTIAL : 順序持久節點
     3.  CreateMode.EPHEMERAL :臨時節點,創建以后如果斷開連接則該節點自動刪除
     4.  CreateMode.EPHEMERAL_SEQUENTIAL :順序臨時節點
     * @param zooKeeper Zookeeper已經建立連接的對象
     * @param path 要創建節點的路徑
     * @param data 該節點上的數據
     * @return 返回創建的節點的路徑
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String createZKNode(ZooKeeper zooKeeper, String path, String data) throws KeeperException, InterruptedException {
        byte[] bytesData = data.getBytes();
        //訪問控制列表
        ArrayList<ACL> openAclUnsafe = Ids.OPEN_ACL_UNSAFE;
        //創建模式
        CreateMode mode = CreateMode.PERSISTENT;
        String result = zooKeeper.create(path, bytesData, openAclUnsafe, mode);
        System.out.println("創建節點成功: " + result);
        return result;
    }
    
    /**
     * 獲取節點上的數據
     * @param zooKeeper Zookeeper已經建立連接的對象
     * @param path 節點路徑
     * @return 返回節點上的數據
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String getZKNodeData(ZooKeeper zooKeeper, String path) throws KeeperException, InterruptedException {
        byte[] data = zooKeeper.getData(path, false, new Stat());
//        System.out.println("該節點" + path + "上的數據偉: " + new String(data));
        return new String(data);
    }
    
    /**
     * 設置節點上的數據
     * @param zooKeeper Zookeeper已經建立連接的對象
     * @param path 節點路徑
     * @param data
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public Stat setZKNodeData(ZooKeeper zooKeeper, String path, String data) throws KeeperException, InterruptedException {
        return zooKeeper.setData(path, data.getBytes(), -1);
    }
    
    /**
     * 判斷節點是否存在
     * @param zooKeeper
     * @param path 節點路徑
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public Stat isExitZKPath(ZooKeeper zooKeeper, String path) throws KeeperException, InterruptedException {
        Stat stat = zooKeeper.exists(path, false);
        return stat;
    }
}

 

4. 測試代碼TestZK.java

  1)測試連接是否成功:

  //1.測試連接是否成功
    @Test
    public void testConnection() throws IOException, InterruptedException {
        ZKOperaDemo zkOperaDemo = new ZKOperaDemo();
        ZooKeeper zooKeeper = zkOperaDemo.connectionZooKeeper();
        System.out.println("====================");
        System.out.println(zooKeeper);
        System.out.println("====================");
        Thread.sleep(Long.MAX_VALUE);
    }

 運行之后控制台打印的信息如下,最后兩條可以看到一直在打印 ping ZooKeeper服務的信息(Got ping response for sessionid: 0x16c9968e7d0000e after 2ms  

[INFO ] 2019-08-21 09:42:56,068(0) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 06:50 GMT  
[INFO ] 2019-08-21 09:42:56,079(11) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:host.name=LAPTOP-L6EGT293  
[INFO ] 2019-08-21 09:42:56,079(11) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.version=1.8.0_60  
[INFO ] 2019-08-21 09:42:56,080(12) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.vendor=Oracle Corporation  
[INFO ] 2019-08-21 09:42:56,080(12) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.home=D:\ProgramFiles\JRE1.8  
[INFO ] 2019-08-21 09:42:56,081(13) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.class.path=D:\eclipse_mySpaces\zookeeperDemo\target\test-classes;D:\eclipse_mySpaces\zookeeperDemo\target\classes;D:\ProgramFiles\eclipse\plugins\org.junit_4.12.0.v201504281640\junit.jar;D:\ProgramFiles\eclipse\plugins\org.hamcrest.core_1.3.0.v20180420-1519.jar;C:\Users\sun_flower\.m2\repository\org\apache\zookeeper\zookeeper\3.4.9\zookeeper-3.4.9.jar;C:\Users\sun_flower\.m2\repository\org\slf4j\slf4j-api\1.6.1\slf4j-api-1.6.1.jar;C:\Users\sun_flower\.m2\repository\org\slf4j\slf4j-log4j12\1.6.1\slf4j-log4j12-1.6.1.jar;C:\Users\sun_flower\.m2\repository\log4j\log4j\1.2.16\log4j-1.2.16.jar;C:\Users\sun_flower\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\sun_flower\.m2\repository\io\netty\netty\3.10.5.Final\netty-3.10.5.Final.jar;C:\Users\sun_flower\.m2\repository\com\101tec\zkclient\0.10\zkclient-0.10.jar;D:\ProgramFiles\eclipse\configuration\org.eclipse.osgi\407\0\.cp;D:\ProgramFiles\eclipse\configuration\org.eclipse.osgi\406\0\.cp  
[INFO ] 2019-08-21 09:42:56,082(14) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.library.path=D:\ProgramFiles\JRE1.8\bin;C:\WINDOWS\Sun\Java\bin;C:\WINDOWS\system32;C:\WINDOWS;C:\ProgramData\Oracle\Java\javapath;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Windows\System32\OpenSSH\;C:\Program Files (x86)\NVIDIA Corporation\PhysX\Common;C:\Program Files\NVIDIA Corporation\NVIDIA NvDLISR;D:\ProgramFiles\JDK1.8\bin;D:\ProgramFiles\JDK1.8\jre\bin;C:\WINDOWS\system32;C:\WINDOWS;C:\WINDOWS\System32\Wbem;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;C:\WINDOWS\System32\OpenSSH\;D:\ProgramFiles\apache-maven-3.3.9\bin;D:\ProgramFiles\mysql5.0\bin;C:\Users\sun_flower\AppData\Local\Microsoft\WindowsApps;;D:\ProgramFiles\IntelliJIDEA2018.3.5\bin;;.  
[INFO ] 2019-08-21 09:42:56,082(14) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.io.tmpdir=C:\Users\SUN_FL~1\AppData\Local\Temp\  
[INFO ] 2019-08-21 09:42:56,083(15) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:java.compiler=<NA>  
[INFO ] 2019-08-21 09:42:56,083(15) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:os.name=Windows 10  
[INFO ] 2019-08-21 09:42:56,083(15) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:os.arch=amd64  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:os.version=10.0  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:user.name=sun_flower  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:user.home=C:\Users\sun_flower  
[INFO ] 2019-08-21 09:42:56,084(16) --> [main] org.apache.zookeeper.Environment.logEnv(Environment.java:100): Client environment:user.dir=D:\eclipse_mySpaces\zookeeperDemo  
[INFO ] 2019-08-21 09:42:56,087(19) --> [main] org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:438): Initiating client connection, connectString=192.168.202.132:2181 sessionTimeout=50000 watcher=com.hxc.zookeeperDemo.ZKOperaDemo$1@6073f712  
[DEBUG] 2019-08-21 09:42:56,094(26) --> [main] org.apache.zookeeper.ClientCnxn.<clinit>(ClientCnxn.java:117): zookeeper.disableAutoWatchReset is false  
[INFO ] 2019-08-21 09:42:56,296(228) --> [main] org.apache.zookeeper.ZooKeeper.<init>(ZooKeeper.java:438): Initiating client connection, connectString=192.168.202.132:2181 sessionTimeout=50000 watcher=com.hxc.zookeeperDemo.ZKOperaDemo$1@6ea6d14e  
==================== State:CONNECTING sessionid:0x0 local:null remoteserver:null lastZxid:0 xid:1 sent:0 recv:0 queuedpkts:0 pendingresp:0 queuedevents:0
====================
[INFO ] 2019-08-21 09:42:56,317(249) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect(ClientCnxn.java:1032): Opening socket connection to server 192.168.202.132/192.168.202.132:2181. Will not attempt to authenticate using SASL (unknown error)  
[INFO ] 2019-08-21 09:42:56,317(249) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.logStartConnect(ClientCnxn.java:1032): Opening socket connection to server 192.168.202.132/192.168.202.132:2181. Will not attempt to authenticate using SASL (unknown error)  
[INFO ] 2019-08-21 09:42:56,325(257) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:876): Socket connection established to 192.168.202.132/192.168.202.132:2181, initiating session  
[DEBUG] 2019-08-21 09:42:56,327(259) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:949): Session establishment request sent on 192.168.202.132/192.168.202.132:2181  
[INFO ] 2019-08-21 09:42:56,333(265) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.onConnected(ClientCnxn.java:1299): Session establishment complete on server 192.168.202.132/192.168.202.132:2181, sessionid = 0x16c9968e7d0000d, negotiated timeout = 40000  
[INFO ] 2019-08-21 09:42:57,320(1252) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:876): Socket connection established to 192.168.202.132/192.168.202.132:2181, initiating session  
[DEBUG] 2019-08-21 09:42:57,321(1253) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:949): Session establishment request sent on 192.168.202.132/192.168.202.132:2181  
[INFO ] 2019-08-21 09:42:57,325(1257) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.onConnected(ClientCnxn.java:1299): Session establishment complete on server 192.168.202.132/192.168.202.132:2181, sessionid = 0x16c9968e7d0000e, negotiated timeout = 40000  
[DEBUG] 2019-08-21 09:43:09,671(13603) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16c9968e7d0000d after 6ms  
[DEBUG] 2019-08-21 09:43:10,656(14588) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16c9968e7d0000e after 2ms  

 如果控制台報如下錯誤(java.net.ConnectException: Connection refused: no further information),則可能是ZooKeeper服務沒有開啟或者防火牆沒有關閉或防火牆沒有開啟2181端口

[WARN ] 2019-08-21 09:59:42,274(2080) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1162): Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect  
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
[DEBUG] 2019-08-21 09:59:42,276(2082) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxnSocketNIO.cleanup(ClientCnxnSocketNIO.java:203): Ignoring exception during shutdown input  

 

  2)測試連接ZooKeeper服務通后之后,就可以完整的測試其他功能了

  所有完整的測試代碼:(測試創建節點、設置節點值、獲取節點值、判斷節點是否存在)

package com.hxc.zookeeperDemo;

import java.io.IOException;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

public class TestZK {
    
    //1.測試連接是否成功
    @Test
    public void testConnection() throws IOException, InterruptedException {
        ZKOperaDemo zkOperaDemo = new ZKOperaDemo();
        ZooKeeper zooKeeper = zkOperaDemo.connectionZooKeeper();
        System.out.println("====================");
        System.out.println(zooKeeper);
        System.out.println("====================");
        Thread.sleep(Long.MAX_VALUE);
    }
    private ZKOperaDemo nodeOperation = new ZKOperaDemo();
    private ZooKeeper zooKeeper = null;
    {
        try {
            zooKeeper = nodeOperation.connectionZooKeeper();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //測試創建節點
    @Test
    public void testCreateZKNode() throws KeeperException, InterruptedException {
        String result = nodeOperation.createZKNode(zooKeeper, "/address", "ShenZhen");
        System.out.println(result);
    }
    //測試獲取節點數據
    @Test
    public void testGetZKNodeData() throws KeeperException, InterruptedException {
        String result = nodeOperation.getZKNodeData(zooKeeper, "/address");
        System.out.println(result);
    }
    //測試設置節點數據
    @Test
    public void testSetZKNodeData() throws KeeperException, InterruptedException {
        Stat stat = nodeOperation.setZKNodeData(zooKeeper, "/address", "Shen Zhen update");
        System.out.println(stat);    //結果是二進制數據
        if(null != null)
        System.out.println(stat.getCversion());
    }
    //測試節點是否存在
    @Test
    public void testIsExitZKPath() throws KeeperException, InterruptedException {
        Stat stat = nodeOperation.isExitZKPath(zooKeeper, "/addressaa");
        System.out.println(stat);    //結果是二進制數據  如果節點不存在,則返回null
        if(null != null)
            System.out.println(stat.getCversion());
    }
    
    
}

  3)在Zookeeper服務上就能看到測試后的信息了:

            

 5. zookeeper的通知機制

  在初次建立連接和設置節點時均可設置觀察者(監聽),每一次的觀察只使用一次,每次使用完觀察若還想監聽下次的操作,需要重新設置觀察者。即Watcher設置到節點上之后是一次性的,通知一次之后就會失效。所以我們在通知的回調方法中接收執行通知操作后需要再繼續設置一個Watcher。

  實現持續的觀察代碼部分:(遞歸回調)

  //通知機制
    /**
     *   監聽節點 獲取節點上的數據  
     * @param zooKeeper Zookeeper已經建立連接的對象
     * @param path 節點路徑
     * @return 返回節點上的數據
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void getZKNodeData2(final ZooKeeper zooKeeper, final String path) throws KeeperException, InterruptedException {
        byte[] data = zooKeeper.getData(path, new Watcher() {
            
            public void process(WatchedEvent event) {
                try {
                    String data2 = ZKOperaDemo.process(zooKeeper, path);
                    System.out.println("第一次調用============= " + data2 + " =================");
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, new Stat());
        System.out.println("該節點" + path + "上的數據為: " + new String(data));
        Thread.sleep(Long.MAX_VALUE);
    }
    
    public static String process(final ZooKeeper zooKeeper, final String path) throws KeeperException, InterruptedException {
        byte[] data = zooKeeper.getData(path, new Watcher() {
            public void process(WatchedEvent event) {
                try {
                    String data = ZKOperaDemo.process(zooKeeper, path);
                    System.out.println("============= " + data + " =================");
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, new Stat());
        return new String(data);
    }
    

 測試代碼:

   @Test
    public void testGetZKNodeData2() throws KeeperException, InterruptedException {
        nodeOperation.getZKNodeData2(zooKeeper, "/address");
    }

啟動了這個服務后,然后向節點設置值之后,就可以在控制台打印相應的監聽信息:

  1)初次啟動時,控制台打印的部分主要信息:

[DEBUG] 2019-08-21 16:08:16,230(95) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:843): Reading reply sessionid:0x16cb1e90922000e, packet:: clientPath:null serverPath:null finished:false header:: 1,4  replyHeader:: 1,512,0  request:: '/address,T  response:: #5368656e205a68656e20757064617465,s{448,511,1566270677948,1566374882666,19,0,0,0,16,0,448}   
該節點/address上的數據為: Shen Zhen update
[DEBUG] 2019-08-21 16:08:29,564(13429) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16cb1e90922000e after 2ms  

  2)在服務端或者在測試獲取節點值的部分重新設置:(為了更直觀查看,在服務端設置值)

            

  然后控制台上馬上打印對應的觀察者信息:

[DEBUG] 2019-08-21 16:10:27,104(130969) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:843): Reading reply sessionid:0x16cb1e90922000e, packet:: clientPath:null serverPath:null finished:false header:: 2,4  replyHeader:: 2,515,0  request:: '/address,T  response:: #6265696a696e67,s{448,515,1566270677948,1566375027091,20,0,0,0,7,0,448}   
第一次調用============= beijing =================
[DEBUG] 2019-08-21 16:10:40,437(144302) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16cb1e90922000e after 1ms  

 

   3)再次調用:

         

  控制台打印的信息:(會持續的監聽)

[DEBUG] 2019-08-21 16:12:36,498(260363) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:843): Reading reply sessionid:0x16cb1e90922000e, packet:: clientPath:null serverPath:null finished:false header:: 3,4  replyHeader:: 3,516,0  request:: '/address,T  response:: #7368616e67686169,s{448,516,1566270677948,1566375156485,21,0,0,0,8,0,448}   
============= shanghai =================
[DEBUG] 2019-08-21 16:12:49,832(273697) --> [main-SendThread(192.168.202.132:2181)] org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:742): Got ping response for sessionid: 0x16cb1e90922000e after 1ms  

  

 

以上就是簡單的java操作zookeeper的方式

 希望各位給出寶貴意見,不甚感激!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM