Zookeeper 掃盲 :disappointed_relieved:
配置文件詳解:
- tickTime:基本事件單元,以毫秒為單位,這個時間作為 Zookeeper 服務器之間或客戶端之間維持心跳的時間間隔
- dataDir:存儲內存中數據庫快照的位置,顧名思義就是 Zookeeper 保存數據的目錄,默認情況下,Zookeeper 將寫數據的日志文件也保存到這個目錄里
- clientPort:這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求
- initLimit:這個配置項是用來配置 Zookeeper 接受客戶端初始化連接時最長能忍受多少個心跳時間間隔
- 當已經超過 10 個心跳的時間也就是(ticktime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗,總的時間長度就是:10*2000 = 20s
- syncLimit:這個配置項表示 Leader 與 Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是:5*2000 = 10s
- server.A = B:C:D
- A:表示這是第幾號服務器
- B:服務器的 IP 地址
- C:服務器與集群中的 Leader 服務器交換信息的端口
- D:一旦集群中的 Leader 服務器掛了,需要一個端口重新進行選舉,選出一個新的 Leader
- 2181:對外提供端口
- 2888:內部同步端口
- 3888:節點掛了,選舉端口
ZK 設計目標
- 簡單的數據結構(樹形名字空間)
- 構建集群(一般奇數,超過半數以上就可以正常訪問)
- 順序訪問
- 高性能(12w QPS)
集群搭建
Java 連接 ZK
客戶端通過創建一個 ZK 實例來連接 ZK 服務器
Zookeeper(Arguments) 方法
- connectString:連接服務器列表,用“,”分隔
- sessionTimeout:心跳檢測時間周期(毫秒)
- wather:時間處理通知器
- canBeReadOnly:標識當前會話是否支持只讀
- SessionId 和 SessionPasswd:提供連接 Zookeeper 的sessionId 和 密碼,通過這倆個確定唯一一台客戶端,目的是可以提供重復會話
- ZK 客戶端和服務器會話的建立是一個異步的過程,我們程序方法在處理完客戶端初始化后立即返回,
- 也就是程序往下執行代碼,這樣,大多數情況下我們並沒有真正構建好一個可用會話,在會話生命周期處於“CONNECTING”時,才算建立完畢
Zookeeper 組成
- 根據其身份的特征分為三種:Leader、Follower、Observer,其中 Follower 和 Observer 統稱為 Learner (學習者)
- leader:負責客戶端的 writer 類型請求
- Follower:負責客戶端 reader 類型請求,參與 leader 選舉
- Observer:特殊的“Follower”,其可以接收客戶端 reader 請求,但不參與選舉。(擴容系統支撐能力,提高讀取速度)因為他不接受任何同步的寫入請求,只負責 leader 同步數據
Java 操作方法
創建節點(znode)方法:create
- 提供了兩套創建節點的方法,同步和異步創建節點方法
- 參數1:節點路徑:/nodeName (不允許遞歸創建節點,也就是在父節點不存在的情況下,不允許創建子節點)
- 參數2:節點內容:要求類型是字節數組(不支持序列化方式,如果需要實現程序化,可使用 Java 相關序列化框架,如 Hession、Kryo 框架)
- 參數3:節點權限:使用 Ids.OPEN_ACL_UNSAFE 開放權限
- 參數4:節點類型:創建節點的類型: CreateMode.*
- persistent(持久節點)
- persistent_sequential(持久順序節點)
- ephemeral(臨時節點)
- ephemeral_sequential(臨時順序節點)
- 參數5:注冊一個異步回調函數,要實現 AsynCallBack.StringCallBack 接口,重寫 processResult(int rc, String path, Object ctx, String name) 方法,當節點創建完畢后執行此方法。
- rc:為服務端相應碼 0 表示調用成功、-4 表示端口連接、-110 表示指定節點存在、-112 表示會話已經過期
- path:接口調用時傳入 API 的數據節點的路徑參數
- ctx:為調用接口傳入 API 的 ctx 值
- name:實際在服務器端創建節點的名稱
- 參數6:傳遞給回調函數的參數,一般為上下文(Context)信息
使用了 CountDownLatch 中的 countDown 只要是要確保我們的zk 連接成功再繼續往下進行。
對於 Zookeeper 中存在節點時,我們添加相同節點時,我們不能創建成功。
在創建臨時節點時,在本次回話有效,當本次回話結束時,我們的臨時節點就會失效
單一視圖,三個節點上數據是一致的,消息廣播,臨時的 temp
分布式鎖原理:對於臨時節點,同一時間只能有一個 Client 操作一個節點,同時貌似加了一把鎖的形式,可以對於相同的業務邏輯,不同的 Tomcat 操作,就確保了操作的唯一性。存在內存中,效率高 12WQPS
zk.create("/app/c1", "c1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
原生的API中 Zookeeper 不允許遞歸創建節點
public class ZookeeperBase {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.80.88:2181,192.168.80.87:2181,192.168.80.86:2181";
/** session超時時間 */
static final int SESSION_OUTTIME = 2000;//ms
/** 信號量,阻塞程序執行,用於等待zookeeper連接成功,發送成功信號 */
static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){
@Override
public void process(WatchedEvent event) {
//獲取事件的狀態
KeeperState keeperState = event.getState();
EventType eventType = event.getType();
//如果是建立連接
if(KeeperState.SyncConnected == keeperState){
if(EventType.None == eventType){
//如果建立連接成功,則發送信號量,讓后續阻塞程序向下執行
connectedSemaphore.countDown();
System.out.println("zk 建立連接");
}
}
}
});
//進行阻塞
connectedSemaphore.await();
System.out.println("..");
//創建父節點
// zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//創建子節點
// zk.create("/testRoot/children", "children data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//獲取節點洗信息
// byte[] data = zk.getData("/testRoot", false, null);
// System.out.println(new String(data));
// System.out.println(zk.getChildren("/testRoot", false));
//修改節點的值
// zk.setData("/testRoot", "modify data root".getBytes(), -1);
// byte[] data = zk.getData("/testRoot", false, null);
// System.out.println(new String(data));
//判斷節點是否存在
// System.out.println(zk.exists("/testRoot/children", false));
//刪除節點
// zk.delete("/testRoot/children", -1);
// System.out.println(zk.exists("/testRoot/children", false));
zk.close();
}
}
getChildren 只可以取下面直接的一層,
使用 -1 是跳過版本檢查,如果再刪除的時候,會檢查本地版本和遠程版本若相同則會刪除,否則不刪除。同時不支持遞歸的刪除
getChildren 讀取數據方法:包括子節點列表的獲取和子節點數據的獲取
- 參數1:path:獲取指定節點的下的數據(獲取子節點列表)
- 參數2:watcher:注冊的 watcher ,一旦在本次子節點獲取后,子節點列表發生變化的話,那么就會向客戶端發送通知,該參數允許為 null。
- 參數3:表明是否需要注冊一個 watcher:如果為 true,則會使用到 zookeeper 客戶端上下文中提到的那個默認 watcher ,如果為 false,則表明不需要注冊 watcher。
- 參數4:cb:回調函數
- 參數5:ctx:上下文信息對象
- 參數6:stat:指定數據節點的節點狀態信息
注意:
- 當我們獲取指定節點的子節點列表后,還需要訂閱這個子節點列表的變化通知,這時候就可以注冊一個 watcher 來實現
- 當子節點被添加或刪除時,服務器端就會觸發一個“NodeChildrenChanged“類型的時間通知,
- 服務器端發送給客戶端收到這個事件通知中,是不包含最新的節點列表的,客戶端必須主動從新進行獲取,通常在客戶端收到這個事件通知后,就可以再次主動獲取最新的子節點列表了
- ZK 服務端在向客戶端發送 watcher “NodeChildrenChanged”事件通知的時候,僅僅只發了一個通知,不會節點把節點的節點的變化情況發送給客戶端,需要客戶端自己重新獲取
- watcher 通知是一次性的,即觸發后失效,因此客戶端需要反復注冊 watcher 才行。
getData 方法:獲取指定節點的數據內容
- path:路徑
- watcher:注冊的 watcher 對象,一旦之后節點內容有變更,則會向客戶端發送通知,該參數允許為 null,觸發事件為“NodeDataChanged”事件通知
- stat:指定節點的狀態信息
- watch:是否使用 watcher,如果為 true 則使用默認上下文中的 watcher, false 則不使用 watcher
- cb:回調函數
- ctx:傳遞上下文信息對象
setData 方法:修改指定節點的數據內容
- path:路徑
- data:數據內容
- 版本號:(-1 覆蓋之前的所有的版本)
- cb:回調函數
- ctx:用於傳遞的下文信息對象
exists 方法:檢測節點是否存在
- path:路徑
- watcher:注冊的 watcher 對象,用於三類時間監聽(節點的創建,刪除,更新)
- cb:回調函數
- ctx:傳遞的上下文信息對象
- exits 方法的意義在於無論節點是否存在,都可以進行注冊 watcher,能夠對節點的創建,刪除和修改進行監聽,但是其子節點發送各種變化,都不會通知客戶端。
事件類型
ZK 有 watch 事件,是一次性觸發的,當 watch 監視的數據發生變化時,通知設置了該watch 的 client ,即 watcher
watcher 是監聽數據發送了某些變化,那就一定會有對應的事件類型和狀態類型
- 事件類型:(znode節點相關)
- EventType.NodeCreated
- EventType.NodeDataChanged
- EventType.NodeChildrenChaged
- EventType.NodeDeleted
狀態類型:(客戶端實例相關)
- KeeperState.Disconnected
- KeeperState.SyncConnected
- KeeperState.AuthFailed
- KeeperState.Expired
watch事件是一次性的,watcher 表示 client
package bjsxt.zookeeper.watcher;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* Zookeeper Wathcher
* 本類就是一個Watcher類(實現了org.apache.zookeeper.Watcher類)
* @author(alienware)
* @since 2015-6-14
*/
public class ZooKeeperWatcher implements Watcher {
/** 定義原子變量 */
AtomicInteger seq = new AtomicInteger();
/** 定義session失效時間 */
private static final int SESSION_TIMEOUT = 10000;
/** zookeeper服務器地址 */
private static final String CONNECTION_ADDR = "192.168.80.88:2181";
/** zk父路徑設置 */
private static final String PARENT_PATH = "/testWatch";
/** zk子路徑設置 */
private static final String CHILDREN_PATH = "/testWatch/children";
/** 進入標識 */
private static final String LOG_PREFIX_OF_MAIN = "【Main】";
/** zk變量 */
private ZooKeeper zk = null;
/** 信號量設置,用於等待zookeeper連接建立之后 通知阻塞程序繼續向下執行 */
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
/**
* 創建ZK連接
* @param connectAddr ZK服務器地址列表
* @param sessionTimeout Session超時時間
*/
public void createConnection(String connectAddr, int sessionTimeout) {
this.releaseConnection();
try {
zk = new ZooKeeper(connectAddr, sessionTimeout, this);
System.out.println(LOG_PREFIX_OF_MAIN + "開始連接ZK服務器");
connectedSemaphore.await();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 關閉ZK連接
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 創建節點
* @param path 節點路徑
* @param data 數據內容
* @return
*/
public boolean createPath(String path, String data) {
try {
//設置監控(由於zookeeper的監控都是一次性的所以 每次必須設置監控)
this.zk.exists(path, true);
System.out.println(LOG_PREFIX_OF_MAIN + "節點創建成功, Path: " +
this.zk.create( /**路徑*/
path,
/**數據*/
data.getBytes(),
/**所有可見*/
Ids.OPEN_ACL_UNSAFE,
/**永久存儲*/
CreateMode.PERSISTENT ) +
", content: " + data);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 讀取指定節點數據內容
* @param path 節點路徑
* @return
*/
public String readData(String path, boolean needWatch) {
try {
return new String(this.zk.getData(path, needWatch, null));
} catch (Exception e) {
e.printStackTrace();
return "";
}
}
/**
* 更新指定節點數據內容
* @param path 節點路徑
* @param data 數據內容
* @return
*/
public boolean writeData(String path, String data) {
try {
System.out.println(LOG_PREFIX_OF_MAIN + "更新數據成功,path:" + path + ", stat: " +
this.zk.setData(path, data.getBytes(), -1));
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 刪除指定節點
*
* @param path
* 節點path
*/
public void deleteNode(String path) {
try {
this.zk.delete(path, -1);
System.out.println(LOG_PREFIX_OF_MAIN + "刪除節點成功,path:" + path);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 判斷指定節點是否存在
* @param path 節點路徑
*/
public Stat exists(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 獲取子節點
* @param path 節點路徑
*/
private List<String> getChildren(String path, boolean needWatch) {
try {
return this.zk.getChildren(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 刪除所有節點
*/
public void deleteAllTestPath() {
if(this.exists(CHILDREN_PATH, false) != null){
this.deleteNode(CHILDREN_PATH);
}
if(this.exists(PARENT_PATH, false) != null){
this.deleteNode(PARENT_PATH);
}
}
/**
* 收到來自Server的Watcher通知后的處理。
*/
@Override
public void process(WatchedEvent event) {
System.out.println("進入 process 。。。。。event = " + event);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (event == null) {
return;
}
// 連接狀態
KeeperState keeperState = event.getState();
// 事件類型
EventType eventType = event.getType();
// 受影響的path
String path = event.getPath();
String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
System.out.println(logPrefix + "收到Watcher通知");
System.out.println(logPrefix + "連接狀態:\t" + keeperState.toString());
System.out.println(logPrefix + "事件類型:\t" + eventType.toString());
if (KeeperState.SyncConnected == keeperState) {
// 成功連接上ZK服務器
if (EventType.None == eventType) {
System.out.println(logPrefix + "成功連接上ZK服務器");
connectedSemaphore.countDown();
}
//創建節點
else if (EventType.NodeCreated == eventType) {
System.out.println(logPrefix + "節點創建");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.exists(path, true);
}
//更新節點
else if (EventType.NodeDataChanged == eventType) {
System.out.println(logPrefix + "節點數據更新");
System.out.println("我看看走不走這里……..");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(logPrefix + "數據內容: " + this.readData(PARENT_PATH, true));
}
//更新子節點
else if (EventType.NodeChildrenChanged == eventType) {
System.out.println(logPrefix + "子節點變更");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(logPrefix + "子節點列表:" + this.getChildren(PARENT_PATH, true));
}
//刪除節點
else if (EventType.NodeDeleted == eventType) {
System.out.println(logPrefix + "節點 " + path + " 被刪除");
}
else ;
}
else if (KeeperState.Disconnected == keeperState) {
System.out.println(logPrefix + "與ZK服務器斷開連接");
}
else if (KeeperState.AuthFailed == keeperState) {
System.out.println(logPrefix + "權限檢查失敗");
}
else if (KeeperState.Expired == keeperState) {
System.out.println(logPrefix + "會話失效");
}
else ;
System.out.println("--------------------------------------------");
}
/**
* <B>方法名稱:</B>測試zookeeper監控<BR>
* <B>概要說明:</B>主要測試watch功能<BR>
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//建立watcher
ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
//創建連接
zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
//System.out.println(zkWatch.zk.toString());
Thread.sleep(1000);
// 清理節點
zkWatch.deleteAllTestPath();
if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
Thread.sleep(1000);
// 讀取數據
System.out.println("---------------------- read parent ----------------------------");
//zkWatch.readData(PARENT_PATH, true);
// 讀取子節點
System.out.println("---------------------- read children path ----------------------------");
zkWatch.getChildren(PARENT_PATH, true);
// 更新數據
zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
Thread.sleep(1000);
// 創建子節點
zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");
Thread.sleep(1000);
zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
}
Thread.sleep(50000);
// 清理節點
zkWatch.deleteAllTestPath();
Thread.sleep(1000);
zkWatch.releaseConnection();
}
}
Zookeeper 的 ACL (AUTH)
ACL(Access Control List),Zookeeper 作為一個分布式協調框架,其內部存儲的都是一些關乎分布式系統運行時狀態的元數據,尤其是涉及到一些分布式鎖、Master選舉和協調等應用場景。我們需要有效地保障 Zookeeper 中的數據安全,Zookeeper 提供一套完善的 ACL 權限控制機制來保障數據的安全。
ZK 提供了三種模式:權限模式、授權對象、權限
權限模式:Scheme 開發人員最多使用的如下四種權限模式:
- IP :IP 模式通過 IP 地址粒度來進行控制權限,例如配置了:IP:192.168.1.107 即表示權限控制都是針對這個 IP 地址的,同時也支持按網段分配,比如 192.168.1.*
- Digest:digest 是最常用的權限控制模式,更符合我們對權限控制的認識,起類似於 username:password 形式的權限標識進行權限配置,ZK 會對形成的權限標識先后進行兩次編碼處理,分別是 SHA-1 加密算法、BASE64 編碼
- world:world 是一直最開放的權限控制模式,這種模式可以看做為特殊的 Disgest,它僅僅是一個標識而已
- Super:超級用戶模式,在超級用戶模式下可以對 ZK 任意進行控制
權限對象:指的是權限賦予用戶或者一個指定的實體,例如 IP 地址或機器等,在不同的模式下,授權對象是不同的,這種模式和權限對象一一對應。
權限:權限就是指那些通過權限檢測后可以被允許執行的操作,在 ZK 中,對數據的操作權限分為五大類:CREATE、DELETE、READ、WRITE、ADMIN
認證只是針對某一個節點。
zkClient 的使用
創建客戶端方法:ZKClient(Arguments)
- arg1:zkServers zookeeper服務器的地址,用“,”分隔
- arg2:sessionTimeout 超時會話,為毫秒,默認為 30 000ms
- arg3:connectionTimeout 連接超時會話
- arg4:IZkConnection 接口的實現類
- arg5:zkSerializer 自定義序列化實現
可以使用遞歸創建,每個節點沒法指定 value,可以遞歸刪除
zkclient 最大的一個優點就是: 把重復watch的那個事情去掉,不需要再寫數據時watch
readeata 時,直接讀取的就是字符串,不是byte字節流
創建節點方法:create、createEphemeral、createEphemeralSequential、createPersistent、createPersistentSequential
- arg1:path,路徑
- arg2:data,數據內容,可以傳入 null
- arg3:mode,節點類型,為一個枚舉類型,四種形式
- arg4:acl 策略
- arg5:callback 回調函數
- arg6:context 上下文對象
- arg7:createParents 是否創建父節點
刪除節點方法:delete、deleteRecursive
- arg1:Path,路徑
- arg2:callback,回調函數
- arg3:context,上下文對象
讀取子節點數據方法:getChildren
- arg1:path,路徑
讀取節點數據方法:readData
- arg1:path,路徑
- arg2:returnNullIFPathNotExists (避免為空節點拋出異常,直接返回 null)
- arg3:節點狀態
更新數據方法:writeData
- arg1:path,路徑
- arg2:data,數據信息
- arg3:version 版本號
檢測節點是否存在方法:exists
- arg:path,路徑
我們發現,ZkClient 里面並沒有類似的 watcher、watch 參數,這也就是我們說開發人員無需關心反復注冊 Watcher 的問題,ZkClient 給我們提供了一套監聽方式,我們可以使用監聽節點的方式進行操作,剔除了繁瑣的反復 watcher 操作,簡化了代碼復雜程度
subscribeChildChanges 方法
- arg1:path,路徑
- 實現 IZkChildListener 接口的類,只需要重寫其
handleChaildChanges(String parentPath,List<String> currentChilds)
方法,其中參數 parentPath 為所監聽節點全路徑,currentChilds 為最新的子節點列表(相對路徑) - IZkChildListener 事件說明針對於下面三個事件觸發:
- 新增子節點
- 減少子節點
- 刪除節點
IZkChildListener 有以下特點:
客戶端可以對一個不存在的節點進行變更的監聽
一旦客戶端對一個節點注冊了子節點列表變更監聽后,那么當前節點的子節點雷彪發送變更的時候,服務端都會通知客戶端,並將最新的自己誒單列表發送給客戶端
該節點本身創建或刪除也會通知到客戶端
另外最重要的是這個監聽是一直存在的,不是單次監聽,相比較原生 API 提供的要簡單的多
subscribeChildChange("/super",new IZkChildListener() {})
對於 Zkclient 會告訴你變化之后的數據是多少,對於節點的當前和子級的狀態
當對於一個節點的update 的時候,並不會監聽,只會監聽當前節點或子節點的添加和刪除
subscribeDataChange("/super",new IZkDataListener() {})
- 對於變更的狀態包括:刪除和修改,其中的狀態分開走
原先zk設計的狀況:
event 只會告訴你變化的事件,觸發什么事件,自己根據path,讀取數據,並且還是一次,
通知,狀態,節點路徑,對於變化的之后的數據沒有告訴,設計的理念就是輕量,敏捷。
Curator 封裝
是否可以監控一個節點下面所有節點的狀態,分布式鎖,原子統計
Curator 框架中使用了鏈式編程風格,易讀性更強,使用工程方法創建連接對象
使用 CuratorFrameworkFactory 的兩個靜態工廠方法(參數不同)來實現
- arg1:connectString,連接串
- arg2:retryPolicy,重試連接策略,有四種實現分別為:
- ExponentialBackoffRetry
- RetryNTimes
- RetryOneTimes
- RetryUntilElapsed
- arg3:sessionTimeoutMs 會話超時時間 默認為 60 000ms
- arg4:connectionTimeoutMs 連接超時時間,默認為15 000ms
- 對於 retryPolicy 策略通過一個接口來讓用戶自定義實現
創建節點 create 方法,可選鏈式項:
- creatingParentsIfNeeded
- withMode
- forPath
- withACL
刪除節點 delete 方法,可選鏈式項
- deletingChildrenIfNedded
- guaranteed
- withVersion
- forPath
讀取和修改數據
- getData
- setData
異步綁定回調方法
- 創建節點時綁定一個回調函數,該回調函數可以輸出服務器的狀態碼以及服務器事件類型
- 可以加入線程池進行優化
讀取子節點方法
- getChildren
判斷子節點是否存在
- checkExists
回調函數為什么會使用線程池:
- 一次性批量創建多個節點,不必要每次都回調
- ThreadPoolExcutor 底層自定義線程
- 批量節點操作,自己規划 callback ,多余的緩沖在隊列中
Watcher 監聽功能
依賴 maven jar包
<dependency> <groupid>org.apache.curator</groupid> <artifactid>curator-recipes</artifactid> <version>2.4.2</version> </dependency>
我們使用 NodeCache 的方式去客戶端實例中注冊一個監聽緩存,然后實現對應的監聽方法即可,
- 主要的監聽方式:
- NodeCacheListener:監聽節點的新增,修改操作
- PathchildrenCacheListener:監聽子節點的新增、修改、刪除操作
在客戶端使用緩存,在服務端進行變化時,和本地的進行對比,有差異數據同步,空間換取時間,替換了原來的注冊的思路。
只能監聽一級節點,再下一級就不能實現,對於刪除也不能迭代刪除
分布式鎖
分布式鎖就是在共享的一段代碼中,一個服務器使用,其他的服務器不允許訪問,分布式鎖
實現分布式鎖,對於Java程序寫出花來也沒用,就是只針對同一個JVM,當多個JVM如何同步
分布式計數器:DistributedAtomicInteger
使用了distribute,
重試的時間次數,
barrier 同時開始,同時結束,代碼,
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5);
其中的 5 代表五個客戶端的連接,當五個連接上就可以同時開始,我們使用
barrier.enter();
此時就同時開始,
barrier.leave();
同時結束
在聲明 Barrier 的時候也可以不設置程序的數量
同時還有另外一種寫法:
實現聲明 barrier
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超時時間 */
static final int SESSION_OUTTIME = 5000;//ms
static DistributedBarrier barrier = null;
public static void main(String[] args) throws Exception {
for(int i = 0; i < 5; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR)
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
.build();
cf.start();
barrier = new DistributedBarrier(cf, "/super");
System.out.println(Thread.currentThread().getName() + "設置barrier!");
barrier.setBarrier(); //設置
barrier.waitOnBarrier(); //等待
System.out.println("---------開始執行程序----------");
} catch (Exception e) {
e.printStackTrace();
}
}
},"t" + i).start();
}
Thread.sleep(5000);
barrier.removeBarrier(); //釋放
}
其中 barrier 作為第六人就是吹哨的人,一般我們自己設定的 const 包,用於設置的吹哨的公共
使用 curator 的分布式鎖,對於監聽的相同節點,若之前發生了變更,之后連接還會貌似數據恢復一點,數據同步,相當於重復注冊,當前 /super 節點,持續訂閱服務,
curator 人性化操作:
- 對於一個節點的CRUD監控
- 實現分布式鎖
- 實現barrier、原子計數器
- 實現隊列