-----------------------------目錄-----------------------------------
第一部分:zookeeper簡介
第二部分:zookeeper環境搭建
1、單機環境
2、集群環境
第三部分:zookeeper基本使用
1、java原生zk客戶端api操作
2、zkClient客戶端操作(推薦)
3、curator客戶端操作(推薦)
第四部分:zookeeper應用場景
第五部分:zookeeper深入進階
第六部分:zookeeper源碼分析
-----------------------------目錄-----------------------------------
第一部分:zookeeper簡介
1、 zookeeper基本概念
zookeeper是一個開源的分布式協調服務,其設計目標是將那些復雜並且容易出差錯的分布式一致性服務封裝起來,構成一個高效可靠的原語集,並提供給用戶一些簡單的接口,zookeeper是一個典型的分布式一致性的解決方案(CP模式),分布式應用程序可以基於它實現數據訂閱/發布、負載均衡,命名服務、集群管理、分布式鎖和分布式隊列等功能。
2、基本概念
@1、集群角色
@2、會話session
@3、數據節點znode
@4、版本
@5、ACL
Zookeeper采用ACL(Access Control Lists)策略來進行權限控制,定義一下五種權限:
CREATE:創建子節點權限
READ:獲取節點數據和子節點列表的權限
WRITE:更新節點數據的權限
DELETE:刪除子節點的權限
ADMIN:設置節點ACL的權限
注意的是create和delete兩盒權限是針對子節點的權限控制
第二部分:zookeeper環境搭建
zookeeper安裝有一下三種方式
1、單機模式:zk只運行在一台服務器上,適用測試環境
2、集群模式:zk運行在一個集群環境中,適用於線上生產環境
3、偽集群模式:一台服務器上運行多個zk實例監聽不同的端口
這里的環境搭建適用了兩個模式,單機模式和偽集群模式
一、單機模式(Linux)
1、下載穩定版本https://zookeeper.apache.org/releases.html
下面是下載命令(注意版本號),也可以登錄上面地址進行本地下載,在上傳服務器
wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz
2、解壓
tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
3、進入apache-zookeeper-3.6.1目錄創建data文件,用於持久話數據的文件夾
cd apache-zookeeper-3.6.1 mkdir data
4、修改配置文件名稱
zk默認讀取zoo.cfg文件,提供參考文件zoo_sample.cfg文件。
cd conf cp zoo_sample.cfg zoo.cfg
5、修改zoo.cfg文件中的dataDir屬性,定位到剛創建的data文件
dataDir=/user/apache-zookeeper-3.6.1/data
6、啟動服務
cd ../bin ./zkServer.sh start
上面start 命令為啟動,stop為停止,status為查看zk啟動狀態以及身份
看到下圖提示啟動成功:
二、偽集群模式
點擊--->>> ZK安裝、ZK配置、ZK集群部署
第三部分:zookeeper基本使用
java代碼調用zookeeper原生Api進行增刪改查節點以及數據的代碼
一、java調用原生zk客戶端
引入pom文件
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.9</version> </dependency>
1、創建zk的客戶端連接
主要是獲取zk對象,並進行操作監聽。等同於zk開啟客戶端命令./zkCli.sh。
需要注意的是監聽類實現Watcher接口,重寫process 方法
package city.albert.api; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/7/29 6:39 PM */ public class OpenSession implements Watcher { /** * 應為zk的監聽為獨立線程,需要保持主線程main方法不會直接結束使用CountDownLatch,設置線程數為1 */ private static CountDownLatch countDownLatch = new CountDownLatch(1); /** * 建立會話 * 客戶端創建一個連接鏈接zk * * @param args */ public static void main(String[] args) throws IOException, InterruptedException { ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new OpenSession()); System.out.println(zooKeeper.getState()); countDownLatch.await(); } /** * 處理來自服務器端的watcher * * @param watchedEvent */ @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.toString()); if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("process 執行了。。。"); countDownLatch.countDown(); } } }
2、創建zk的數據節點
由於代碼篇幅問題,下面先介紹重點代碼,然后附帶測試的源代碼~~注意哦!
類似在客戶端中執行: create /my_persist2 123
重點代碼:
private static void createZNode(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { /** * path 創建節點路徑 * data[] 節點值的字節數組 * acl 節點權限4中類型 * ANYONE_ID_UNSAFE 標識任何人 * AUTH_IDS 僅僅這個id才可以設置ACL,他將被客戶端驗證ID替換 * OPEN_ACL_UNSAFE 開放的ACL(常用) * CREATOR_ALL_ACL 此ACL授予創建者身份驗證ID的所有權限 * createMode 創建節點的4中類型 * PERSISTENT 創建持久節點 * PERSISTENT_SEQUENTIAL 創建持久順序節點 * EPHEMERAL 創建臨時幾點 * EPHEMERAL_SEQUENTIAL 創建臨時順序節點 * */ String persistent = zooKeeper.create("/my_persistent", "創建持久節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); String persistentSequential = zooKeeper.create("/my_persistent_sequential", "創建持久順序節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); String ephemeral = zooKeeper.create("/my_ephemeral", "創建臨時節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); String ephemeralSequential= zooKeeper.create("/my_ephemeral_sequential", "創建臨時順序節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("持久節點 "+persistent); System.out.println("持久順序節點 "+persistentSequential); System.out.println("臨時節點 "+ephemeral); System.out.println("臨時順序節點 "+ephemeralSequential); }
完整的test代碼

package city.albert.api; import org.apache.zookeeper.*; import java.io.IOException; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/7/30 3:30 PM */ public class CreateZNodeBySession implements Watcher { private static ZooKeeper zooKeeper; /** * 建立會話 * 客戶端創建一個連接鏈接zk * * @param args */ public static void main(String[] args) throws IOException, InterruptedException, KeeperException { //獲取zk鏈接會話對象 ip+端口,超時時間 watcher回調函數 zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new CreateZNodeBySession()); System.out.println(zooKeeper.getState()); Thread.sleep(Integer.MAX_VALUE); } /** * 創建節點 * * @param zooKeeper */ private static void createZNode(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { /** * path 創建節點路徑 * data[] 節點值的字節數組 * acl 節點權限4中類型 * ANYONE_ID_UNSAFE 標識任何人 * AUTH_IDS 僅僅這個id才可以設置ACL,他將被客戶端驗證ID替換 * OPEN_ACL_UNSAFE 開放的ACL(常用) * CREATOR_ALL_ACL 此ACL授予創建者身份驗證ID的所有權限 * createMode 創建節點的4中類型 * PERSISTENT 創建持久節點 * PERSISTENT_SEQUENTIAL 創建持久順序節點 * EPHEMERAL 創建臨時幾點 * EPHEMERAL_SEQUENTIAL 創建臨時順序節點 * */ String persistent = zooKeeper.create("/my_persistent", "創建持久節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); String persistentSequential = zooKeeper.create("/my_persistent_sequential", "創建持久順序節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); String ephemeral = zooKeeper.create("/my_ephemeral", "創建臨時節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); String ephemeralSequential= zooKeeper.create("/my_ephemeral_sequential", "創建臨時順序節點".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("持久節點 "+persistent); System.out.println("持久順序節點 "+persistentSequential); System.out.println("臨時節點 "+ephemeral); System.out.println("臨時順序節點 "+ephemeralSequential); } /** * 處理來自服務器端的watcher * * @param watchedEvent */ @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.toString()); if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("process 執行了。。。"); //創建節點 try { createZNode(zooKeeper); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
3、獲取zk的數據節點以及子節點
由於代碼篇幅問題,下面先介紹重點代碼,然后附帶測試的源代碼~~注意哦!
類似命令:
ls / 獲取當前節點多有節點
ls /my_persist2 獲取當前節點所有子節點(下面getChildren方法)
getAllChildrenNumber 獲取當前節點下子節點個數
get /my_persist2 獲取節點數據值(下面getData方法)
重點代碼:
public void getData() throws KeeperException, InterruptedException { /** * 獲取某個節點數據api * path, 獲取數據的節點路徑 * watcher, 是否開啟監聽, * stat 節點狀態 null為最新版本數據 */ byte[] data = zooKeeper.getData("/my_persistent", false, null); System.out.println(new String(data)); /** * 獲取某個節點的子節點方法 * path, 獲取數據的節點路徑 * watcher, 是否開啟監聽, * 當開啟監聽在節點變更的時候Watcher.process會返回: watchedEvent.getType()==Event.EventType.NodeChildrenChanged 類型, * 監聽通知成功監聽失效(因為通知是1次性有效,需要反復注冊) */ List<String> children = zooKeeper.getChildren("/", true, null); System.out.println(children); }
下面為測試源碼獲取節點數據的

package city.albert.api; import org.apache.zookeeper.*; import java.io.IOException; import java.util.List; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/7/30 3:30 PM */ public class GetZNodeBySession implements Watcher { private static ZooKeeper zooKeeper; /** * 建立會話 * 客戶端創建一個連接鏈接zk * * @param args */ public static void main(String[] args) throws IOException, InterruptedException, KeeperException { //獲取zk鏈接會話對象 ip+端口,超時時間 watcher回調函數 zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new GetZNodeBySession()); System.out.println(zooKeeper.getState()); Thread.sleep(Integer.MAX_VALUE); } /** * 處理來自服務器端的watcher * * @param watchedEvent */ @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { try { getData(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(watchedEvent.toString()); if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("process 執行了。。。"); //獲取zk節點 try { getData(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } public void getData() throws KeeperException, InterruptedException { /** * 獲取某個節點數據api * path, 獲取數據的節點路徑 * watcher, 是否開啟監聽, * stat 節點狀態 null為最新版本數據 */ byte[] data = zooKeeper.getData("/my_persistent", false, null); System.out.println(new String(data)); /** * 獲取某個節點的子節點方法 * path, 獲取數據的節點路徑 * watcher, 是否開啟監聽, * 當開啟監聽在節點變更的時候Watcher.process會返回: watchedEvent.getType()==Event.EventType.NodeChildrenChanged 類型, * 監聽通知成功監聽失效(因為通知是1次性有效,需要反復注冊) */ List<String> children = zooKeeper.getChildren("/", true, null); System.out.println(children); } }
4、更新zk的數據節點
由於代碼篇幅問題,下面先介紹重點代碼,然后附帶測試的源代碼~~注意哦!
類似操作:set /my_persist2 678
重點代碼:
private void updateData() throws KeeperException, InterruptedException { /** * 更新節點內容 * path, 節點路徑 * data, 更新節點數據內容 * version 更新版本 -1默認最新版本 */ zooKeeper.setData("/my_persist2","687".getBytes(),-1); }
下面為獲取節點數據的測試源碼

package city.albert.api; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.List; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/7/30 3:30 PM */ public class UpdateZNodeBySession implements Watcher { private static ZooKeeper zooKeeper; /** * 建立會話 * 客戶端創建一個連接鏈接zk * * @param args */ public static void main(String[] args) throws IOException, InterruptedException, KeeperException { //獲取zk鏈接會話對象 ip+端口,超時時間 watcher回調函數 zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new UpdateZNodeBySession()); System.out.println(zooKeeper.getState()); Thread.sleep(Integer.MAX_VALUE); } /** * 處理來自服務器端的watcher * * @param watchedEvent */ @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.toString()); if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("process 執行了。。。"); //獲取zk節點 try { updateData(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void updateData() throws KeeperException, InterruptedException { /** * 更新節點內容 * path, 節點路徑 * data, 更新節點數據內容 * version 更新版本 -1默認最新版本 */ zooKeeper.setData("/my_persist2","687".getBytes(),-1); } }
5、刪除zk的數據節點
由於代碼篇幅問題,下面先介紹重點代碼,然后附帶測試的源代碼~~注意哦!
類似操作:delete /my_persist2
private void deleteData() throws KeeperException, InterruptedException { /** * 刪除節點內容 * path, 節點路徑 * version 更新版本 -1默認最新版本 */ zooKeeper.delete("/my_persist2", -1); }
下面為獲取節點數據的測試源碼

package city.albert.api; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/7/30 3:30 PM */ public class DeleteZNodeBySession implements Watcher { private static ZooKeeper zooKeeper; /** * 建立會話 * 客戶端創建一個連接鏈接zk * * @param args */ public static void main(String[] args) throws IOException, InterruptedException, KeeperException { //獲取zk鏈接會話對象 ip+端口,超時時間 watcher回調函數 zooKeeper = new ZooKeeper("127.0.0.1:2181", 10000, new DeleteZNodeBySession()); System.out.println(zooKeeper.getState()); Thread.sleep(Integer.MAX_VALUE); } /** * 處理來自服務器端的watcher * * @param watchedEvent */ @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.toString()); if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println("process 執行了。。。"); //獲取zk節點 try { deleteData(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void deleteData() throws KeeperException, InterruptedException { /** * 刪除節點內容 * path, 節點路徑 * version 更新版本 -1默認最新版本 */ zooKeeper.delete("/my_persist2", -1); } }
二、使用zkClient客戶端連接
1、引入pom文件
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency>
2、創建zkClient對象以及操作
操作包括:
創建節點、順序節點、臨時節點、臨時順序節點、遞歸創建節點。
獲取子節點列表,以及注冊監聽。
修改節點值
判斷節點是否存在
讀取節點值
刪除節點、遞歸刪除節點
package city.albert.api; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.CreateMode; import java.util.List; import java.util.concurrent.TimeUnit; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/8/5 5:17 PM */ public class ZKSession { public static void main(String[] args) throws InterruptedException { //借助zkClient創建會話 ,ZkClient借助zk原生spi的異步建立連接進行了實現為同步 ZkClient zkClient = new ZkClient("127.0.0.1:2181"); //創建節點可以實現原生的方法遞歸調用創建createPersistent(path,true)方法非遞歸,先創建父節點在創建子節點 //創建節點 zkClient.create("/temp1", "11111", CreateMode.PERSISTENT); zkClient.create("/temp1/z1", "11111", CreateMode.PERSISTENT); //(遞歸)創建持久節點 zkClient.createPersistent("/zkClient/zk1", true); //創建持久順序節點 zkClient.createPersistentSequential("/zkClient/zk2", "z2"); //創建臨時節點 zkClient.createEphemeral("/zkClient/zk3", "z3"); //創建臨時順序節點 zkClient.createEphemeralSequential("/zkClient/zk4", "z4"); //獲取子節點,同時注冊監聽 List<String> temp1 = zkClient.getChildren("/temp1"); System.out.println("temp1 =" + temp1); List<String> zc = zkClient.getChildren("/zkClient"); System.out.println("zkClient =" + zc); //為節點注冊監聽 zkClient.subscribeChildChanges("/zkClient", new IZkChildListener() { @Override public void handleChildChange(String s, List<String> list) throws Exception { System.out.println("監聽節點:" + s); System.out.println("監聽剩余節點:" + list); } }); //修改節點 zkClient.writeData("/zkClient/zk3", "zzzz"); //修改節點為異步事件 TimeUnit.SECONDS.sleep(2); //判斷節點是否存在 boolean exists = zkClient.exists("/zkClient/zk3"); System.out.println("是否存在:" + exists); //讀取節點值 Object o = zkClient.readData("/zkClient/zk3"); System.out.println("讀取值:" + o); //刪除節點 zkClient.delete("/temp1/z1"); zkClient.delete("/temp1"); //(遞歸) 刪除節點 ,父節點為/zkClient zkClient.deleteRecursive("/zkClient"); } }
3、創curator對象以及操作
curator客戶端使用fluent風格編程,是netflix公司開源
1、引入pom文件
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency>
2
package city.albert.api; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; /** * @author niunafei * @function * @email niunafei0315@163.com * @date 2020/8/5 6:24 PM */ public class CuratorSession { public static void main(String[] args) throws Exception { //重試策略 ExponentialBackoffRetry基於ackoff重試,RetryNTimes重連n次策略,RetryForever永遠重試策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //創建客戶端對象 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") //鏈接超時時間 .connectionTimeoutMs(30000) //創建session鏈接時間 .sessionTimeoutMs(50000) //重試策略 .retryPolicy(retryPolicy) //獨立的命名空間,父節點為/base .namespace("base").build(); //創建鏈接 client.start(); //創建節點 ,默認創建持久節點 //1、創建初始空節點 client.create().forPath("/temp"); //2、創建有內容節點 client.create().forPath("/temp/tm", "內容".getBytes()); client.create().forPath("/temp/tm/t1", "內容".getBytes()); //3、遞歸創建父節點並且指定創建類型,EPHEMERAL是臨時節點,還有持久節點,持久、臨時順序節點。 client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/temp2/tm2", "內容2".getBytes()); //獲取節點值與狀態信息 byte[] bytes = client.getData().forPath("/temp/tm/t1"); System.out.println("節點值:"+new String(bytes)); //獲取狀態信息 Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/temp/tm/t1"); System.out.println("狀態信息:" + stat); //更新節點數據 //1、普通更新 Stat stat1 = client.setData().forPath("/temp/tm/t1", "更新666".getBytes()); System.out.println("更新后狀態信息:"+stat1); System.out.println("更新后的值:"+new String(client.getData().forPath("/temp/tm/t1"))); //2、指定版本更新,-1為最新版本 client.setData().withVersion(-1).forPath("/temp/tm/t1","更新8888".getBytes()); System.out.println("版本更新后的值:"+new String(client.getData().forPath("/temp/tm/t1"))); //刪除節點 //1、刪除節點 client.delete().forPath("/temp/tm/t1"); //2、刪除並遞歸刪除子節點 client.delete().deletingChildrenIfNeeded().forPath("/temp2"); //3、指定版本刪除,版本不一致會拋出異常 client.delete().withVersion(-1).forPath("/temp/tm"); //4、保證強制刪除 client.delete().guaranteed().forPath("/temp"); } }
第四部分:zookeeper應用場景
一、數據發布/訂閱
基於節點和注冊監聽機制實現
二、命名服務
給應用,主機命名,也可以使用持久順序節點(臨時順序節點)生成全局id,格式是job_000000001
三、日志收集
日志源機器和日志收集器機器的管理
四、分布式鎖
共享鎖(讀鎖):在固定節點下創建臨時順序節點(主機名-讀/寫(R/w)-0000000001),
排它鎖: 在固定節點下創建臨時節點,創建鎖(zk保證多線程創建唯一節點),使用監聽機制來確定鎖被釋放
五、master選舉
場景:一個廣告推送服務,需要大量計算才可以獲取到某個類型的推廣id,使用master-slave方式
應用技術點:選舉master方式,通過創建臨時節點選擇(節點不可以重復創建),利用監聽節點檢測master狀態(或者添加state節點,使用心跳機制)
結果:master負責計算,計算結束寫入某個節點,salve和master進行業務讀取即可。避免了大量的重復計算
六、分布式隊列
先進先出隊列(FIFO):創建queue_fifo
Barrier分布式屏障,類似多線程計算,計算結束進行結果匯總

第五部分:zookeeper深入進階
第六部分:zookeeper源碼分析