zookeeper是Apache Hadoop的子項目,是一個樹形的目錄服務。
在官網下載想要的版本: http://zookeeper.apache.org/releases.html 如這里使用的是 apache-zookeeper-3.5.9-bin.tar.gz(該版本是windows和Linux通用的版本)
*安裝 (溫馨提示,zookeeper是java語言編寫的,它依賴於JDK7及以上版本).
1, 上傳到Linux服務器並解壓到指定路徑
> mkdir zookeeper
> tar -zxvf apache-zookeeper-3.5.9-bin.tar.gz
2, zookeeper的啟動必須有zoo.cfg配置文件,默認是沒有的
> cp zoo_sample.cfg zoo.cfg
> vi zoo.cfg 編輯該文件,主要是指定zookeeper的數據保存路徑
dataDir=/app/zookeeper/zkdata
3, 啟動 (默認是 2181端口 )
> cd bin
> ./zkServer.sh start 啟動,,,當然還有 status stop restart 參數可用.
zookeeper中每個節點被稱為 ZNode,每個節點上都會保存自己的數據和節點信息, 節點下可以有子節點;
通常節點分為 持久化節點、臨時節點 -e 、 持久化順序節點 -s 臨時順序節點 -es
*客戶端命令
> ./zkCli.sh [-server localhost:2181] 如果客戶端和服務端在一台,客戶端就是要連這一台可以省略參數 連接成功后,退出用 quit
*JavaApi調用zkServer
Curator,它是Apache提供的Java客戶端庫連接zookeeper,最初是Netflix研發的捐獻給Apache基金會,目前是Apache的頂級項目。
官網: http://curator.apache.org
0. 准備工程 pom.xml
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!--curator start--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <!--curator end--> <!--日志--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.30</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
1. 連接到zkServer
2. 創建節點
3. 查詢節點
4. 修改節點
5. 刪除節點
6. Watcher監聽
•ZooKeeper 中引入了Watcher機制來實現了發布/訂閱功能,能夠讓多個訂閱者同時監聽某一個對象,當一個對象自身狀態變化時,會通知所有訂閱者。
•ZooKeeper 原生支持通過注冊Watcher來進行事件監聽,但是其使用並不是特別方便 需要開發人員自己反復注冊Watcher,比較繁瑣。
•Curator引入了 Cache 來實現對 ZooKeeper 服務端事件的監聽。
•ZooKeeper提供了三種Watcher:
•NodeCache : 只是監聽某一個特定的節點
•PathChildrenCache : 監控一個ZNode的子節點.
6.1 NodeCache 示例
6.2 PathChildrenCache 示例
6.3 TreeCache 示例
7. zookeeper分布式鎖原理
•核心思想:當客戶端要獲取鎖,則創建節點,使用完鎖,則刪除該節點。
1.客戶端獲取鎖時,在lock節點下創建臨時順序節點。(lock節點是舉例的節點名)
臨時:保證任何時候哪怕是宕機節點也能被刪除, 順序: 因為要找序號最小的節點獲取到鎖,其它節點依次監聽比它小的那一個節點的刪除事件
2.然后獲取lock下面的所有子節點,客戶端獲取到所有的子節點之后,如果發現自己創建的子節點序號最小,那么就認為該客戶端獲取到了鎖。使用完鎖后,將該節點刪除。
3.如果發現自己創建的節點並非lock所有子節點中最小的,說明自己還沒有獲取到鎖,此時客戶端需要找到比自己小的那個節點,同時對其注冊事件監聽器,監聽刪除事件。
Watcher會收到相應通知,此時再次判斷自己創建的節點是否是lock子節點中序號最小的,如果是則獲取到了鎖,如果不是則重復以上步驟繼續獲取到比自己小的一個節點
-
在Curator中有五種鎖方案:
-
InterProcessSemaphoreMutex:分布式排它鎖(非可重入鎖)
-
InterProcessMutex:分布式可重入排它鎖
-
InterProcessReadWriteLock:分布式讀寫鎖
-
InterProcessMultiLock:將多個鎖作為單個實體管理的容器
-
資源在誰手上,分布式鎖就加到誰手上。
package com.laoyang.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.TimeUnit; //這里用多線程的方式模擬多個12306的服務 public class Ticket12306 implements Runnable { private int tickets = 10; //假定有10張票 //這里使用curator提供的5種鎖當中的InterProcessMutex InterProcessMutex lock ; CuratorFramework client = null; public Ticket12306(){ RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,5); //調用CuratorFrameworkFactory工廠的builder()方法創建客戶端對象 client = CuratorFrameworkFactory.builder() .connectString("192.168.239.135:2181") //設置連接串 ip:port,ip:port... .sessionTimeoutMs(60000) //設置會話超時毫秒數,默認值是60*1000 .connectionTimeoutMs(15000) //設置連接超時毫秒數,默認值是 15*1000 .retryPolicy(retryPolicy) //重試策略 .build(); client.start(); //開啟連接 lock = new InterProcessMutex(client,"/lock"); } @Override public void run() { while (true){ //獲取鎖 try { lock.acquire(1, TimeUnit.SECONDS); if (tickets >0){ System.out.println(Thread.currentThread()+":"+tickets--); Thread.sleep(200); } } catch (Exception e) { e.printStackTrace(); } finally { //釋放鎖 try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
package com.laoyang.curator; import org.junit.Test; public class LockTest { public static void main(String[] args){ Ticket12306 ticket12306 = new Ticket12306(); Thread t1 = new Thread(ticket12306,"飛豬"); Thread t2 = new Thread(ticket12306,"攜程"); t1.start(); t2.start(); } }