本博文的主要內容有
一、zookeeper編程入門系列之利用zookeeper的臨時節點的特性來監控程序是否還在運行
二、zookeeper編程入門系列之zookeeper實現分布式進程監控
三、zookeeper編程入門系列之zookeeper實現分布式共享鎖
我這里采用的是maven項目,這個很簡單,不會的博友,見我下面寫的這篇博客
Zookeeper項目開發環境搭建(Eclipse\MyEclipse + Maven)
這里,推薦用下面的eclipse版本(當然你若也有myeclipse,請忽視我這句話)



Group Id:zhouls.bigdata
Artifact Id:zkDemo
Package:zhouls.bigdata.zkDemo

將默認的jdk,修改為jdk1.7

修改默認的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>zhouls.bigdata</groupId> <artifactId>zkDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>zkDemo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> </project>
修改后的pom.xml為

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>zhouls.bigdata</groupId> <artifactId>zkDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>zkDemo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!-- 此版本的curator操作的zk是3.4.6版本 --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.10.0</version> </dependency> </dependencies> </project>

junit是單元測試。

也許,大家的jdk1.7會報錯誤,那就改為jdk1.8。

一、zookeeper編程入門系列之利用zookeeper的臨時節點的特性來監控程序是否還在運行
寫一個TestCurator.java
怎么通過Curator連接到zookeeper官網,其實是有固定的。


這打開需要好幾分鍾的時間,里面會有示范代碼,教我們怎么連接zookeeper。

我這里的zookeeper集群是master(192.168.80.145)、slave1(192.168.80.146)和slave2(192.168.80.147)。


WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 1]

WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 1]

WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 1]
現在,我想通過Curator來連接到zookeeper集群,並在里面創建臨時節點。
這里,永久節點為monitor、臨時節點為test123。(當然,大家可以自行去命名)(同時,大家也可以通過命令行方式來創建,我這里就是以代碼api形式來創建了)



比如,這樣,monitor是父節點(作為永久節點),test123是臨時節點。
而現在,是monitor都沒有,它不會給我們一次性創建完。

除非,大家在命令行里先創建好monitor節點,之后,然后上述代碼可以操作成功。否則,就需如下修改代碼。


package zhouls.bigdata.zkDemo; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.junit.Test; /** * * @author zhouls * */ public class TestCurator { @Test public void testName() throws Exception { // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失 int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點 .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 .forPath("/monitor/test123");//指定節點名稱 } }





可以看到成功monitor生成,其實啊,/monitor/test123節點也是有的。(只是中間又消失了)
為什么會中間消失了呢?是因為,test123是臨時節點。創建完之后,它就會消失了。

WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 1] ls / [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 2] ls /monitor []
那么,我想看,怎么用代碼來實現呢?
增加以下代碼

此時的代碼是TestCurator.java
package zhouls.bigdata.zkDemo; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.junit.Test; /** * * @author zhouls * */ public class TestCurator { @Test public void testName() throws Exception { // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失 int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點 .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 .forPath("/monitor/test123");//指定節點名稱 while (true) { ; } } }




WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 1] ls / [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 2] ls /monitor [test123] [zk: localhost:2181(CONNECTED) 3]
然后,我這邊,把代碼,來停掉,則它就會消失了。



WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 1] ls / [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 2] ls /monitor [] [zk: localhost:2181(CONNECTED) 3] ls /monitor [test123] [zk: localhost:2181(CONNECTED) 4] ls /monitor [] [zk: localhost:2181(CONNECTED) 5]
好的,那么,現在,又有一個疑問出來了,在往monitor節點里,注冊節點如test123,那么,我怎么知道是哪一台的呢?則此時,需要做如下修改

此刻的代碼如下TestCurator.java
package zhouls.bigdata.zkDemo; import java.net.InetAddress; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.junit.Test; /** * * @author zhouls * */ public class TestCurator { @Test public void testName() throws Exception { // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失 int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 InetAddress localhost = InetAddress.getLocalHost(); String ip = localhost.getHostAddress(); client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點 .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 .forPath("/monitor/" + ip);//指定節點名稱 while (true) { ; } } }

WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 1] ls / [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 2] ls /monitor [] [zk: localhost:2181(CONNECTED) 3] ls /monitor [test123] [zk: localhost:2181(CONNECTED) 4] ls /monitor [] [zk: localhost:2181(CONNECTED) 5] ls /monitor [169.254.28.160] [zk: localhost:2181(CONNECTED) 6]

WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 1] ls / [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 2] ls /monitor [test123] [zk: localhost:2181(CONNECTED) 3] ls /monitor [169.254.28.160] [zk: localhost:2181(CONNECTED) 4]

WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0] ls / [hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 1] ls / [monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 2] ls /monitor [test123] [zk: localhost:2181(CONNECTED) 3] ls /monitor [169.254.28.160] [zk: localhost:2181(CONNECTED) 4]
這個ip怎么不是我集群里的ip呢?是哪里的???
原來是這里的

因為,我是在test測試,所以,拿到的是windows本地的ip地址。
如果,放在mian去測試,則就是拿到集群里的ip地址了。



至此,我們是用臨時節點的這個特性,來監控程序有沒有運行的。並不是說臨時節點就是來只做這個事!!!
package zhouls.bigdata.zkDemo; import java.net.InetAddress; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.junit.Test; /** * * @author zhouls * */ public class TestCurator { @Test public void testName() throws Exception { // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失 int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 InetAddress localhost = InetAddress.getLocalHost(); String ip = localhost.getHostAddress(); client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點 .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 .forPath("/monitor/" + ip);//指定節點名稱 while (true) { ; } //或者 // client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 // .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定節點類型,注意:臨時節點必須在某一個永久節點下面 // .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 // .forPath("/monitor/ + ip");// 指定節點名稱 // while (true) { // ; // } } }
可以將這個,寫入到入口類或構造函數里。每次開始前都調用執行。以此來監控程序是否還在運行,非常重要!
二、zookeeper編程入門系列之zookeeper實現分布式進程監控

思路: 即在/下,先注冊一個監視器,即monitor節點(為永久節點)
然后,監視monitor節點下面的所有子節點(為臨時節點)
概念見
Zookeeper概念學習系列之zookeeper實現分布式進程監控
先執行

然后執行

ZkNodeWacter.java
package zhouls.bigdata.zkDemo; import java.util.ArrayList; import java.util.List; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; /** * 這個監視器需要一直在后台運行,所以相當於是一個死循環的進程 * @author zhouls * */ public class ZkNodeWacter implements Watcher { CuratorFramework client; List<String> childrenList = new ArrayList<String>(); public ZkNodeWacter() { //在啟動監視器的時候,鏈接到zk RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000; int connectionTimeoutMs = 3000; client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 //監視monitor節點下面的所有子節點(為臨時節點) try { //在monitor目錄上注冊一個監視器,這個監視器只能使用一次 childrenList = client.getChildren().usingWatcher(this).forPath("/monitor"); } catch (Exception e) { e.printStackTrace(); } } /** * 當monitor節點下面的子節點發生變化的時候,這個方法會被調用到 */ public void process(WatchedEvent event) { System.out.println("我被調用了:"+event); try { //重復注冊監視器 List<String> newChildrenList = client.getChildren().usingWatcher(this).forPath("/monitor"); //先遍歷原始的子節點list for (String ip : childrenList) { if(!newChildrenList.contains(ip)){ System.out.println("節點消失:"+ip); } } for (String ip : newChildrenList) { if(!childrenList.contains(ip)){ System.out.println("新增節點:"+ip); } } childrenList = newChildrenList; } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ZkNodeWacter spiderWacter = new ZkNodeWacter(); spiderWacter.start();//表示需要開啟一個監視器 } private void start() { while(true){ ; } } }
TestCurator.java
package zhouls.bigdata.zkDemo; import java.net.InetAddress; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.junit.Test; /** * * @author zhouls * */ public class TestCurator { @Test public void testName() throws Exception { // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失 int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 InetAddress localhost = InetAddress.getLocalHost(); String ip = localhost.getHostAddress(); client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點 .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 .forPath("/monitor/" + ip);//指定節點名稱 while (true) { ; } // client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 // .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定節點類型,注意:臨時節點必須在某一個永久節點下面 // .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 // .forPath("/monitor/ + ip");// 指定節點名稱 // while (true) { // ; // } } }
三、zookeeper編程入門系列之zookeeper實現分布式共享鎖
這里,一般,都是創建臨時有序子節點,怎么來創建,不難
說到協調,我首先想到的是北京很多十字路口的交通協管,他們手握着小紅旗,指揮車輛和行人是不是可以通行。如果我們把車輛和行人比喻成運行在計算機中的單元(線程),那么這個協管是干什么的?很多人都會想到,這不就是鎖么?對,在一個並發的環境里,我們為了避免多個運行單元對共享數據同時進行修改,造成數據損壞的情況出現,我們就必須依賴像鎖這樣的協調機制,讓有的線程可以先操作這些資源,然后其他線程等待。對於進程內的鎖來講,我們使用的各種語言平台都已經給我們准備很多種選擇。


TestCurator.java
package zhouls.bigdata.zkDemo; import java.net.InetAddress; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.junit.Test; /** * * @author zhouls * */ public class TestCurator { @Test public void test1() throws Exception { // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失 int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 InetAddress localhost = InetAddress.getLocalHost(); String ip = localhost.getHostAddress(); client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點 .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 .forPath("/monitor/" + ip);//指定節點名稱 while (true) { ; } } @Test public void test2() throws Exception { // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失 int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 InetAddress localhost = InetAddress.getLocalHost(); String ip = localhost.getHostAddress(); client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定節點類型,注意:臨時節點必須在某一個永久節點下面 .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 .forPath("/monitor/");// 指定節點名稱 while (true) { ; } } }
DistributedLock.java
package zhouls.bigdata.zkDemo; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** DistributedLock lock = null; try { lock = new DistributedLock("127.0.0.1:2181","test"); lock.lock(); //do something... } catch (Exception e) { e.printStackTrace(); } finally { if(lock != null) lock.unlock(); } //lock.closeZk();//在cleanup方法中添加 * */ public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks";//根 private String lockName;//競爭資源的標志 private String waitNode;//等待前一個鎖 private String myZnode;//當前鎖 private CountDownLatch latch;//計數器 private int sessionTimeout = 30000;//30秒 private int waitTimeout = 30000;//等待節點失效最大時間 30秒 private List<Exception> exception = new ArrayList<Exception>(); /** * 創建分布式鎖,使用前請確認zkConnString配置的zookeeper服務可用 * @param zkConnString 127.0.0.1:2181 * @param lockName 競爭資源標志,lockName中不能包含單詞lock */ public DistributedLock(String zkConnString, String lockName){ this.lockName = lockName; // 創建一個與服務器的連接 try { zk = new ZooKeeper(zkConnString, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ // 創建根節點 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } /** * zookeeper節點的監視器 */ public void process(WatchedEvent event) { if(this.latch != null) { this.latch.countDown(); } } /** * 獲取鎖 */ public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, waitTimeout);//等待獲取鎖 } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } /** * 嘗試獲取鎖 */ public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); //創建臨時有序子節點 myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.err.println(myZnode + " is created "); //取出所有子節點 List<String> subNodes = zk.getChildren(root, false); //取出所有lockName的鎖 List<String> lockObjNodes = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } //對所有節點進行默認排序,從小到大 Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //如果是最小的節點,則表示取得鎖 return true; } //如果不是最小的節點,找到比自己小1的節點 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); //獲取比當前節點小一級的節點(Collections.binarySearch(lockObjNodes, subMyZnode):獲取當前節點的角標) waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 等待獲取鎖 * @param lower :等待的鎖 * @param waitTime 最大等待時間 * @return * @throws InterruptedException * @throws KeeperException */ private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); //判斷比自己小一個數的節點是否存在,如果不存在則無需等待鎖,同時注冊監聽 if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; } /** * 取消鎖監控 */ public void unlock() { try { System.out.println(Thread.currentThread().getId()+",unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; //zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } /** * 關閉zk鏈接 */ public void closeZk(){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } /** * 自定義異常信息 * @author lenovo * */ public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } } }

如有兩個線程, 兩個線程要同時到mysql中更新一條數據, 對數據庫中的數據進行累加更新。由於在分布式環境下, 這兩個線程可能存在於不同的機器上的不同jvm進程中, 所以這兩個線程的關系就是垮主機跨進程, 使用java中的synchronized鎖是搞不定的。
概念,見
Zookeeper概念學習系列之zookeeper實現分布式共享鎖
這里的節點也可以為lock。
先執行以下的test3,再執行test4




[zk: localhost:2181(CONNECTED) 9] ls / [monitor, hbase, zookeeper, admin, lock, consumers, config, storm, brokers, controller_epoch] [zk: localhost:2181(CONNECTED) 10] ls /lock [169.254.28.160] [zk: localhost:2181(CONNECTED) 11]
然后,再執行test4




然后,再執行下test4,試試,看看有什么變化

可以看到,在增加。
總的代碼是TestCurator.java
package zhouls.bigdata.zkDemo; import java.net.InetAddress; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.junit.Test; /** * * @author zhouls * */ public class TestCurator { @Test public void test1() throws Exception { // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失 int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 InetAddress localhost = InetAddress.getLocalHost(); String ip = localhost.getHostAddress(); client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點 .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 .forPath("/monitor/" + ip);//指定節點名稱 while (true) { ; } } @Test public void test2() throws Exception { // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失 int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 InetAddress localhost = InetAddress.getLocalHost(); String ip = localhost.getHostAddress(); client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定節點類型,注意:臨時節點必須在某一個永久節點下面 .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 .forPath("/monitor/");// 指定節點名稱 while (true) { ; } } @Test public void test3() throws Exception { // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失 int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 InetAddress localhost = InetAddress.getLocalHost(); String ip = localhost.getHostAddress(); client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點 .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 .forPath("/lock/" + ip);//指定節點名稱 while (true) { ; } } @Test public void test4() throws Exception { // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String connectString = "master:2181,slave1:2181,slave2:2181"; int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失 int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間 CuratorFramework client = CuratorFrameworkFactory.newClient( connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy); client.start();// 開啟客戶端 InetAddress localhost = InetAddress.getLocalHost(); String ip = localhost.getHostAddress(); client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建 .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定節點類型,注意:臨時節點必須在某一個永久節點下面 .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息 .forPath("/lock/");// 指定節點名稱 while (true) { ; } } }
DistributedLock.java
package zhouls.bigdata.zkDemo; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** DistributedLock lock = null; try { lock = new DistributedLock("127.0.0.1:2181","test"); lock.lock(); //do something... } catch (Exception e) { e.printStackTrace(); } finally { if(lock != null) lock.unlock(); } //lock.closeZk();//在cleanup方法中添加 * */ public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks";//根 private String lockName;//競爭資源的標志 private String waitNode;//等待前一個鎖 private String myZnode;//當前鎖 private CountDownLatch latch;//計數器 private int sessionTimeout = 30000;//30秒 private int waitTimeout = 30000;//等待節點失效最大時間 30秒 private List<Exception> exception = new ArrayList<Exception>(); /** * 創建分布式鎖,使用前請確認zkConnString配置的zookeeper服務可用 * @param zkConnString 127.0.0.1:2181 * @param lockName 競爭資源標志,lockName中不能包含單詞lock */ public DistributedLock(String zkConnString, String lockName){ this.lockName = lockName; // 創建一個與服務器的連接 try { zk = new ZooKeeper(zkConnString, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ // 創建根節點 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } /** * zookeeper節點的監視器 */ public void process(WatchedEvent event) { if(this.latch != null) { this.latch.countDown(); } } /** * 獲取鎖 */ public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, waitTimeout);//等待獲取鎖 } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } /** * 嘗試獲取鎖 */ public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); //創建臨時有序子節點 myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.err.println(myZnode + " is created "); //取出所有子節點 List<String> subNodes = zk.getChildren(root, false); //取出所有lockName的鎖 List<String> lockObjNodes = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } //對所有節點進行默認排序,從小到大 Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //如果是最小的節點,則表示取得鎖 return true; } //如果不是最小的節點,找到比自己小1的節點 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); //獲取比當前節點小一級的節點(Collections.binarySearch(lockObjNodes, subMyZnode):獲取當前節點的角標) waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 等待獲取鎖 * @param lower :等待的鎖 * @param waitTime 最大等待時間 * @return * @throws InterruptedException * @throws KeeperException */ private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); //判斷比自己小一個數的節點是否存在,如果不存在則無需等待鎖,同時注冊監聽 if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; } /** * 取消鎖監控 */ public void unlock() { try { System.out.println(Thread.currentThread().getId()+",unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; //zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } /** * 關閉zk鏈接 */ public void closeZk(){ try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } /** * 自定義異常信息 * @author lenovo * */ public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } } }
這個代碼里,大家可以改為自己的集群,如我的是master:2181,slave1:2181,slave2:2181
