(原) 2.1 Zookeeper原生API使用


本文為原創文章,轉載請注明出處,謝謝

Zookeeper原生API使用

1、jar包引入,演示版本為3.4.6,非maven項目,可以下載jar包導入到項目中

  

<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
</dependency>

 

2、創建zookeeper連接

  ZooKeeper(java.lang.String connectString, int sessionTimeout, org.apache.zookeeper.Watcher watcher) 

    • connectString:zookeeper服務地址,例如“192.168.117.128:2181”
    • sessionTimeout :超時時間,單位為毫秒
    • watcher:實現org.apache.zookeeper.Watcher接口的實現類,需實現process(WatchedEvent watchedEvent) 方法

  ZooKeeper zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new MyWatcher());

示例代碼:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;

/**
 *
 * zookeeper連接
 */
public class CreateSession implements Watcher{
    private static ZooKeeper zooKeeper;

    @Override
    public void process(WatchedEvent watchedEvent) {

        if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
            doBus();
        }
        System.out.println("接收內容:"+watchedEvent.toString());
    }

    private void doBus() {
        System.out.println("做業務!");
    }

    public static void main(String[] args) {
        try {
            zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new CreateSession());
            System.out.println(zooKeeper.getState());
            Thread.sleep(5000);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  ps:此處沒有新創建新的java類實現Watcher,而是直接在本類中實現Watcher接口並重寫process方法

 

3、同步創建

  create(java.lang.String path, byte[] data, java.util.List<org.apache.zookeeper.data.ACL> acl, org.apache.zookeeper.CreateMode createMode)

    • path:創建節點路徑,需保證父節點已存在
    • data:節點數據
    • acl:權限列表
      • 提供默認的權限OPEN_ACL_UNSAFE、CREATOR_ALL_ACL、READ_ACL_UNSAFE
        • OPEN_ACL_UNSAFE:完全開放
        • CREATOR_ALL_ACL:創建該znode的連接擁有所有權限
        • READ_ACL_UNSAFE:所有的客戶端都可讀
      • 自定義權限  
        ACL aclIp = new ACL(ZooDefs.Perms.READ,new Id("ip","127.0.0.1"));
                        ACL aclDigest = new ACL(ZooDefs.Perms.READ| ZooDefs.Perms.WRITE,
                                new Id("digest", DigestAuthenticationProvider.generateDigest("id:pass")));
      • session設置權限 
        zk.addAuthInfo("digest", "id:pass".getBytes());  
    • createMode:節點類型
      • PERSISTENT:持久化節點
      • PERSISTENT_SEQUENTIAL:持久化有序節點
      • EPHEMERAL:臨時節點(連接斷開自動刪除)
      • EPHEMERAL_SEQUENTIAL:臨時有序節點(連接斷開自動刪除)

示例代碼:  

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;

/**
 * 創建節點(同步)
 * Created by scot on 2016/6/8.
 */
public class CreateSessionSync implements Watcher{
    private static ZooKeeper zooKeeper;

    @Override
    public void process(WatchedEvent watchedEvent) {

        if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
            doBus();
        }
        System.out.println("接收內容:"+watchedEvent.toString());
    }

    private void doBus() {
        try {
            if(null != zooKeeper.exists("/note_scot/note_scot_a",false)) {
                System.out.println("/note_scot/note_scot_a 節點已存在");
                return;
            }
            String path = zooKeeper.create("/note_scot/note_scot_a","aa".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);


           /*權限相關
            try {
                ACL aclIp = new ACL(ZooDefs.Perms.READ,new Id("ip","127.0.0.1"));
                ACL aclDigest = new ACL(ZooDefs.Perms.READ| ZooDefs.Perms.WRITE,
                        new Id("digest", DigestAuthenticationProvider.generateDigest("id:pass")));
                zooKeeper.addAuthInfo("digest", "id:pass".getBytes()); 
            } catch (NoSuchAlgorithmException e) {
                e.printStackTrace();
            }*/

            System.out.println("zookeeper return:" + path);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        try {
            zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new CreateSessionSync());
            System.out.println(zooKeeper.getState());
            Thread.sleep(5000);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

4、異步創建

  create(java.lang.String path, byte[] data, java.util.List<org.apache.zookeeper.data.ACL> acl,

      org.apache.zookeeper.CreateMode createMode, org.apache.zookeeper.AsyncCallback.StringCallback cb, java.lang.Object ctx) 

    • StringCallback cb:回調接口,執行創建操作后,結果以及數據發送到此接口的實現類中
    • Object ctx:自定義回調數據,在回調實現類可以獲取此數據

示例代碼:

import org.apache.zookeeper.*;

import java.io.IOException;

/**
 * 創建節點(異步)
 * Created by scot on 2016/6/8.
 */
public class CreateSessionASync implements Watcher{
    private static ZooKeeper zooKeeper;

    @Override
    public void process(WatchedEvent watchedEvent) {

        if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
            doBus();
        }
        System.out.println("接收內容:"+watchedEvent.toString());
    }

    private void doBus() {
            zooKeeper.create("/note_scot/note_scot_b","aa".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT_SEQUENTIAL,new IStringCallBack(),"testAsync");
    }

    public static void main(String[] args) {
        try {
            zooKeeper = new ZooKeeper("192.168.117.128:2181",5000, new CreateSessionASync());
            System.out.println(zooKeeper.getState());
            Thread.sleep(5000);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class IStringCallBack implements AsyncCallback.StringCallback {

        @Override
        public void processResult(int i, String s, Object o, String s2) {
            System.out.println("i="+i);//創建成功返回0
            System.out.println("s="+s);//自定義節點名稱
            System.out.println("o="+o);//自定義回調數據
            System.out.println("s2="+s2);//最終節點名稱(順序節點最終名稱與自定義名稱不同)

        }
    }
}

 

常用方法列表:

String create(final String path, byte data[], List acl, CreateMode createMode)

參數: 路徑、 znode內容,ACL(訪問控制列表)、 znode創建類型;

用途:創建znode節點

void delete(final String path, int version)

參數: 路徑、版本號;如果版本號與znode的版本號不一致,將無法刪除,是一種樂觀加鎖機制;如果將版本號設置為-1,不會去檢測版本,直接刪除;

用途:刪除節點

Stat exists(final String path, Watcher watcher)

參數: 路徑、Watcher(監視器);當這個znode節點被改變時,將會觸發當前Watcher

用途:判斷znode節點是否存在

Stat exists(String path, boolean watch)

參數: 路徑、並設置是否監控這個目錄節點,這里的 watcher 是在創建 ZooKeeper 實例時指定的 watcher;

判斷znode節點是否存在

Stat setData(final String path, byte data[], int version)

參數: 路徑、數據、版本號;如果為-1,跳過版本檢查

用途:設置znode上的數據

byte[] getData(final String path, Watcher watcher, Stat stat)

參數: 路徑、監視器、數據版本等信息

用途:獲取znode上的數據

List getChildren(final String path, Watcher watcher)

參數: 路徑、監視器;該方法有多個重載

用途:獲取節點下的所有子節點

 下一節:2.2ZkClient使用  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM