zookeeper編程入門系列之zookeeper實現分布式進程監控和分布式共享鎖(圖文詳解)


 

 

 

本博文的主要內容有

    一、zookeeper編程入門系列之利用zookeeper的臨時節點的特性來監控程序是否還在運行

    二、zookeeper編程入門系列之zookeeper實現分布式進程監控

   三、zookeeper編程入門系列之zookeeper實現分布式共享鎖

 

 

 

 

 

 

 

  我這里采用的是maven項目,這個很簡單,不會的博友,見我下面寫的這篇博客

Zookeeper項目開發環境搭建(Eclipse\MyEclipse + Maven)

 

 

  這里,推薦用下面的eclipse版本(當然你若也有myeclipse,請忽視我這句話)

 

 

 

 

 

 

 

 

 

 

Group Id:zhouls.bigdata

Artifact Id:zkDemo

Package:zhouls.bigdata.zkDemo

 

 

 

   將默認的jdk,修改為jdk1.7

 

 

  修改默認的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>zhouls.bigdata</groupId>
  <artifactId>zkDemo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>zkDemo</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>
</project>

 

 

 

   修改后的pom.xml為

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>zhouls.bigdata</groupId>
  <artifactId>zkDemo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>zkDemo</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <!-- 此版本的curator操作的zk是3.4.6版本 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.10.0</version>
        </dependency>

    </dependencies>
</project>

 

 

 

 

   junit是單元測試。

 

 

 

 

 

  也許,大家的jdk1.7會報錯誤,那就改為jdk1.8。

 

 

 

 

 一、zookeeper編程入門系列之利用zookeeper的臨時節點的特性來監控程序是否還在運行

 

   寫一個TestCurator.java

  怎么通過Curator連接到zookeeper官網,其實是有固定的。

 

 

 

 

 

   這打開需要好幾分鍾的時間,里面會有示范代碼,教我們怎么連接zookeeper。

 

 

 

  我這里的zookeeper集群是master(192.168.80.145)、slave1(192.168.80.146)和slave2(192.168.80.147)。

 

 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] 

 

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] 

 

 

 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] 

 

 

 

   現在,我想通過Curator來連接到zookeeper集群,並在里面創建臨時節點。

   這里,永久節點為monitor、臨時節點為test123。(當然,大家可以自行去命名)(同時,大家也可以通過命令行方式來創建,我這里就是以代碼api形式來創建了)

 

 

 

 

 

 

 

 

 

    比如,這樣,monitor是父節點(作為永久節點),test123是臨時節點。

     而現在,是monitor都沒有,它不會給我們一次性創建完。

 

   除非,大家在命令行里先創建好monitor節點,之后,然后上述代碼可以操作成功。否則,就需如下修改代碼。

 

 

 

 

 

package zhouls.bigdata.zkDemo;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;

/**
 * 
 * @author zhouls
 *
 */
public class TestCurator {

    @Test
    public void testName() throws Exception {
        // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        String connectString = "master:2181,slave1:2181,slave2:2181";
        int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失
        int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                connectString, sessionTimeoutMs, connectionTimeoutMs,
                retryPolicy);
        client.start();// 開啟客戶端

        
        client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
            .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點
            .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
            .forPath("/monitor/test123");//指定節點名稱
            

    }

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

   可以看到成功monitor生成,其實啊,/monitor/test123節點也是有的。(只是中間又消失了)

  為什么會中間消失了呢?是因為,test123是臨時節點。創建完之后,它就會消失了。

 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[]

 

 

 

 

  那么,我想看,怎么用代碼來實現呢?

  增加以下代碼

 

 

   此時的代碼是TestCurator.java

package zhouls.bigdata.zkDemo;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;

/**
 * 
 * @author zhouls
 *
 */
public class TestCurator {

    @Test
    public void testName() throws Exception {
        // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        String connectString = "master:2181,slave1:2181,slave2:2181";
        int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失
        int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                connectString, sessionTimeoutMs, connectionTimeoutMs,
                retryPolicy);
        client.start();// 開啟客戶端

        
        client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
            .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點
            .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
            .forPath("/monitor/test123");//指定節點名稱
        while (true) { ; }
    

    }

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[test123]
[zk: localhost:2181(CONNECTED) 3] 

 

 

 

  然后,我這邊,把代碼,來停掉,則它就會消失了。

 

 

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[]
[zk: localhost:2181(CONNECTED) 3] ls /monitor
[test123]
[zk: localhost:2181(CONNECTED) 4] ls /monitor
[]
[zk: localhost:2181(CONNECTED) 5] 

 

 

 

 

  好的,那么,現在,又有一個疑問出來了,在往monitor節點里,注冊節點如test123,那么,我怎么知道是哪一台的呢?則此時,需要做如下修改

 

 

   此刻的代碼如下TestCurator.java

package zhouls.bigdata.zkDemo;

import java.net.InetAddress;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;

/**
 * 
 * @author zhouls
 *
 */
public class TestCurator {

    @Test
    public void testName() throws Exception {
        // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        String connectString = "master:2181,slave1:2181,slave2:2181";
        int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失
        int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                connectString, sessionTimeoutMs, connectionTimeoutMs,
                retryPolicy);
        client.start();// 開啟客戶端
 InetAddress localhost = InetAddress.getLocalHost(); String ip = localhost.getHostAddress();
        
        client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
            .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點
            .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
            .forPath("/monitor/" + ip);//指定節點名稱
        while (true) {
                    ;
        }
        

    }

}

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[]
[zk: localhost:2181(CONNECTED) 3] ls /monitor
[test123]
[zk: localhost:2181(CONNECTED) 4] ls /monitor
[]
[zk: localhost:2181(CONNECTED) 5] ls /monitor
[169.254.28.160]
[zk: localhost:2181(CONNECTED) 6] 

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[test123]
[zk: localhost:2181(CONNECTED) 3] ls /monitor
[169.254.28.160]
[zk: localhost:2181(CONNECTED) 4] 

 

 

 

 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[test123]
[zk: localhost:2181(CONNECTED) 3] ls /monitor
[169.254.28.160]
[zk: localhost:2181(CONNECTED) 4] 

 

 

  這個ip怎么不是我集群里的ip呢?是哪里的???

  原來是這里的

 

   因為,我是在test測試,所以,拿到的是windows本地的ip地址。

 

 

 

  如果,放在mian去測試,則就是拿到集群里的ip地址了。

 

 

 

 

 

 

 

 

 

 

 

  至此,我們是用臨時節點的這個特性,來監控程序有沒有運行的。並不是說臨時節點就是來只做這個事!!!

package zhouls.bigdata.zkDemo;

import java.net.InetAddress;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;

/**
 * 
 * @author zhouls
 *
 */
public class TestCurator {

    @Test
    public void testName() throws Exception {
        // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        String connectString = "master:2181,slave1:2181,slave2:2181";
        int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失
        int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                connectString, sessionTimeoutMs, connectionTimeoutMs,
                retryPolicy);
        client.start();// 開啟客戶端

        InetAddress localhost = InetAddress.getLocalHost();
        String ip = localhost.getHostAddress();
        
        client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
            .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點
            .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
            .forPath("/monitor/" + ip);//指定節點名稱
        while (true) {
                    ;
        }
        

        //或者
        
//        client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
//                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定節點類型,注意:臨時節點必須在某一個永久節點下面
//                .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
//                .forPath("/monitor/ + ip");// 指定節點名稱
//        while (true) {
//            ;
//        }

    }

}

 

 

  可以將這個,寫入到入口類或構造函數里。每次開始前都調用執行。以此來監控程序是否還在運行,非常重要!

 

 

 

 

 

 

 

二、zookeeper編程入門系列之zookeeper實現分布式進程監控

 

 

 

   思路: 即在/下,先注冊一個監視器,即monitor節點(為永久節點)

          然后,監視monitor節點下面的所有子節點(為臨時節點)

 

   概念見

Zookeeper概念學習系列之zookeeper實現分布式進程監控

 

  先執行

 

 

 

   然后執行

 

 

 

 

  ZkNodeWacter.java

package zhouls.bigdata.zkDemo;

import java.util.ArrayList;
import java.util.List;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
/**
 * 這個監視器需要一直在后台運行,所以相當於是一個死循環的進程
 * @author zhouls
 *
 */
public class ZkNodeWacter implements Watcher {
    CuratorFramework client;
    List<String> childrenList = new ArrayList<String>();
    public ZkNodeWacter() {
        //在啟動監視器的時候,鏈接到zk
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        String connectString = "master:2181,slave1:2181,slave2:2181";
        int sessionTimeoutMs = 5000;
        int connectionTimeoutMs = 3000;
        client = CuratorFrameworkFactory.newClient(
                connectString, sessionTimeoutMs, connectionTimeoutMs,
                retryPolicy);
        client.start();// 開啟客戶端
        
        
        //監視monitor節點下面的所有子節點(為臨時節點)
        try {
            //在monitor目錄上注冊一個監視器,這個監視器只能使用一次
            childrenList = client.getChildren().usingWatcher(this).forPath("/monitor");
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        
    }
    
    
    /**
     * 當monitor節點下面的子節點發生變化的時候,這個方法會被調用到
     */
    public void process(WatchedEvent event) {
        System.out.println("我被調用了:"+event);
        
        try {
            //重復注冊監視器
            List<String> newChildrenList = client.getChildren().usingWatcher(this).forPath("/monitor");
            //先遍歷原始的子節點list
            for (String ip : childrenList) {
                if(!newChildrenList.contains(ip)){
                    System.out.println("節點消失:"+ip);
                }
            }
            
            for (String ip : newChildrenList) {
                if(!childrenList.contains(ip)){
                    System.out.println("新增節點:"+ip);
                }
            }
            childrenList = newChildrenList;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    
    public static void main(String[] args) {
        ZkNodeWacter spiderWacter = new ZkNodeWacter();
        spiderWacter.start();//表示需要開啟一個監視器
    }


    private void start() {
        while(true){
            ;
        }
    }

}

 

 

 

 

 

 

  TestCurator.java

package zhouls.bigdata.zkDemo;

import java.net.InetAddress;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;

/**
 * 
 * @author zhouls
 *
 */
public class TestCurator {

    @Test
    public void testName() throws Exception {
        // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        String connectString = "master:2181,slave1:2181,slave2:2181";
        int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失
        int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                connectString, sessionTimeoutMs, connectionTimeoutMs,
                retryPolicy);
        client.start();// 開啟客戶端

        InetAddress localhost = InetAddress.getLocalHost();
        String ip = localhost.getHostAddress();
        
        client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
            .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點
            .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
            .forPath("/monitor/" + ip);//指定節點名稱
        while (true) {
                    ;
        }
        

        
        
//        client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
//                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定節點類型,注意:臨時節點必須在某一個永久節點下面
//                .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
//                .forPath("/monitor/ + ip");// 指定節點名稱
//        while (true) {
//            ;
//        }

    }

}

 

 

 

 

 

 

 

三、zookeeper編程入門系列之zookeeper實現分布式共享鎖

   這里,一般,都是創建臨時有序子節點,怎么來創建,不難

   說到協調,我首先想到的是北京很多十字路口的交通協管,他們手握着小紅旗,指揮車輛和行人是不是可以通行。如果我們把車輛和行人比喻成運行在計算機中的單元(線程),那么這個協管是干什么的?很多人都會想到,這不就是鎖么?對,在一個並發的環境里,我們為了避免多個運行單元對共享數據同時進行修改,造成數據損壞的情況出現,我們就必須依賴像鎖這樣的協調機制,讓有的線程可以先操作這些資源,然后其他線程等待。對於進程內的鎖來講,我們使用的各種語言平台都已經給我們准備很多種選擇。

 

 

 

 

  

 

 

 

 

 

   TestCurator.java

package zhouls.bigdata.zkDemo;

import java.net.InetAddress;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;

/**
 * 
 * @author zhouls
 *
 */
public class TestCurator {

    @Test
    public void test1() throws Exception {
        // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        String connectString = "master:2181,slave1:2181,slave2:2181";
        int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失
        int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                connectString, sessionTimeoutMs, connectionTimeoutMs,
                retryPolicy);
        client.start();// 開啟客戶端

        InetAddress localhost = InetAddress.getLocalHost();
        String ip = localhost.getHostAddress();
        
        client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
            .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點
            .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
            .forPath("/monitor/" + ip);//指定節點名稱
        while (true) {
            ;
        }
        
    }

    
    

        @Test
        public void test2() throws Exception {
            // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失
            int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間
            CuratorFramework client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 開啟客戶端

            InetAddress localhost = InetAddress.getLocalHost();
            String ip = localhost.getHostAddress();
            
            client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定節點類型,注意:臨時節點必須在某一個永久節點下面
            .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
            .forPath("/monitor/");// 指定節點名稱
            while (true) {
                ;
            }

    }

}

 

 

 

 

  DistributedLock.java

package zhouls.bigdata.zkDemo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
      DistributedLock lock = null;
    try {
        lock = new DistributedLock("127.0.0.1:2181","test");
        lock.lock();
        //do something...
    } catch (Exception e) {
        e.printStackTrace();
    } 
    finally {
        if(lock != null)
            lock.unlock();
    }
    
    //lock.closeZk();//在cleanup方法中添加
 *
 */
public class DistributedLock implements Lock, Watcher{
    private ZooKeeper zk;
    private String root = "/locks";//
    private String lockName;//競爭資源的標志
    private String waitNode;//等待前一個鎖
    private String myZnode;//當前鎖
    private CountDownLatch latch;//計數器
    private int sessionTimeout = 30000;//30秒
    private int waitTimeout = 30000;//等待節點失效最大時間 30秒
    private List<Exception> exception = new ArrayList<Exception>();
    
    /**
     * 創建分布式鎖,使用前請確認zkConnString配置的zookeeper服務可用
     * @param zkConnString 127.0.0.1:2181
     * @param lockName 競爭資源標志,lockName中不能包含單詞lock
     */
    public DistributedLock(String zkConnString, String lockName){
        this.lockName = lockName;
        // 創建一個與服務器的連接
         try {
            zk = new ZooKeeper(zkConnString, sessionTimeout, this);
            Stat stat = zk.exists(root, false);
            if(stat == null){
                // 創建根節點
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
            }
        } catch (IOException e) {
            exception.add(e);
        } catch (KeeperException e) {
            exception.add(e);
        } catch (InterruptedException e) {
            exception.add(e);
        }
    }

    /**
     * zookeeper節點的監視器
     */
    public void process(WatchedEvent event) {
        if(this.latch != null) {  
            this.latch.countDown();  
        }
    }
    /**
     * 獲取鎖
     */
    public void lock() {
        if(exception.size() > 0){
            throw new LockException(exception.get(0));
        }
        try {
            if(this.tryLock()){
                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                return;
            }
            else{
                waitForLock(waitNode, waitTimeout);//等待獲取鎖
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        } 
    }

    /**
     * 嘗試獲取鎖
     */
    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if(lockName.contains(splitStr))
                throw new LockException("lockName can not contains \\u000B");
            //創建臨時有序子節點
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.err.println(myZnode + " is created ");
            //取出所有子節點
            List<String> subNodes = zk.getChildren(root, false);
            //取出所有lockName的鎖
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            //對所有節點進行默認排序,從小到大
            Collections.sort(lockObjNodes);
            System.out.println(myZnode + "==" + lockObjNodes.get(0));
            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //如果是最小的節點,則表示取得鎖
                return true;
            }
            //如果不是最小的節點,找到比自己小1的節點
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            //獲取比當前節點小一級的節點(Collections.binarySearch(lockObjNodes, subMyZnode):獲取當前節點的角標)
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }
    
    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if(this.tryLock()){
                return true;
            }
            return waitForLock(waitNode,time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 等待獲取鎖
     * @param lower :等待的鎖
     * @param waitTime 最大等待時間
     * @return
     * @throws InterruptedException
     * @throws KeeperException
     */
    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);
        //判斷比自己小一個數的節點是否存在,如果不存在則無需等待鎖,同時注冊監聽
        if(stat != null){
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }


    /**
     * 取消鎖監控
     */
    public void unlock() {
        try {
            System.out.println(Thread.currentThread().getId()+",unlock " + myZnode);
            zk.delete(myZnode,-1);
            myZnode = null;
            //zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 關閉zk鏈接
     */
    public void closeZk(){
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }

    public Condition newCondition() {
        return null;
    }
    /**
     * 自定義異常信息
     * @author lenovo
     *
     */
    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}

 

 

 

 

   如有兩個線程, 兩個線程要同時到mysql中更新一條數據, 對數據庫中的數據進行累加更新。由於在分布式環境下, 這兩個線程可能存在於不同的機器上的不同jvm進程中, 所以這兩個線程的關系就是垮主機跨進程, 使用java中的synchronized鎖是搞不定的。

 

 

 

  概念,見

Zookeeper概念學習系列之zookeeper實現分布式共享鎖

 

 

 

  這里的節點也可以為lock。

  先執行以下的test3,再執行test4

 

 

 

 

 

 

 

 

 

 

 

 

 

[zk: localhost:2181(CONNECTED) 9] ls /       
[monitor, hbase, zookeeper, admin, lock, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 10] ls /lock
[169.254.28.160]
[zk: localhost:2181(CONNECTED) 11] 

 

 

 

 

  然后,再執行test4

 

 

 

 

 

 

 

 

 

 

 

 

   然后,再執行下test4,試試,看看有什么變化

 

  可以看到,在增加。

 

 

  總的代碼是TestCurator.java

package zhouls.bigdata.zkDemo;

import java.net.InetAddress;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;

/**
 * 
 * @author zhouls
 *
 */
public class TestCurator {

    @Test
    public void test1() throws Exception {
        // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        String connectString = "master:2181,slave1:2181,slave2:2181";
        int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失
        int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                connectString, sessionTimeoutMs, connectionTimeoutMs,
                retryPolicy);
        client.start();// 開啟客戶端

        InetAddress localhost = InetAddress.getLocalHost();
        String ip = localhost.getHostAddress();
        
        client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
            .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點
            .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
            .forPath("/monitor/" + ip);//指定節點名稱
        while (true) {
            ;
        }
        
    }

    
    

        @Test
        public void test2() throws Exception {
            // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失
            int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間
            CuratorFramework client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 開啟客戶端

            InetAddress localhost = InetAddress.getLocalHost();
            String ip = localhost.getHostAddress();
            
            client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定節點類型,注意:臨時節點必須在某一個永久節點下面
            .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
            .forPath("/monitor/");// 指定節點名稱
            while (true) {
                ;
            }

    }
        
        
        
        @Test
        public void test3() throws Exception {
            // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失
            int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間
            CuratorFramework client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 開啟客戶端

            InetAddress localhost = InetAddress.getLocalHost();
            String ip = localhost.getHostAddress();
            
            client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
                .withMode(CreateMode.EPHEMERAL)//指定節點類型,臨時節點
                .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
                .forPath("/lock/" + ip);//指定節點名稱
            while (true) {
                ;
            }
            
        }
        
        
        
        
        @Test
        public void test4() throws Exception {
            // 1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            String connectString = "master:2181,slave1:2181,slave2:2181";
            int sessionTimeoutMs = 5000;// 這個值只能在4000-40000ms之間 表示鏈接斷掉之后多長時間臨時節點會消失
            int connectionTimeoutMs = 3000;// 獲取鏈接的超時時間
            CuratorFramework client = CuratorFrameworkFactory.newClient(
                    connectString, sessionTimeoutMs, connectionTimeoutMs,
                    retryPolicy);
            client.start();// 開啟客戶端

            InetAddress localhost = InetAddress.getLocalHost();
            String ip = localhost.getHostAddress();
            
            client.create().creatingParentsIfNeeded()// 如果父節點不存在則創建
            .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定節點類型,注意:臨時節點必須在某一個永久節點下面
            .withACL(Ids.OPEN_ACL_UNSAFE)// 設置節點權限信息
            .forPath("/lock/");// 指定節點名稱
            while (true) {
                ;
            }

    }
        
        
        

}

 

 

 

 

  DistributedLock.java

package zhouls.bigdata.zkDemo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
      DistributedLock lock = null;
    try {
        lock = new DistributedLock("127.0.0.1:2181","test");
        lock.lock();
        //do something...
    } catch (Exception e) {
        e.printStackTrace();
    } 
    finally {
        if(lock != null)
            lock.unlock();
    }
    
    //lock.closeZk();//在cleanup方法中添加
 *
 */
public class DistributedLock implements Lock, Watcher{
    private ZooKeeper zk;
    private String root = "/locks";//
    private String lockName;//競爭資源的標志
    private String waitNode;//等待前一個鎖
    private String myZnode;//當前鎖
    private CountDownLatch latch;//計數器
    private int sessionTimeout = 30000;//30秒
    private int waitTimeout = 30000;//等待節點失效最大時間 30秒
    private List<Exception> exception = new ArrayList<Exception>();
    
    /**
     * 創建分布式鎖,使用前請確認zkConnString配置的zookeeper服務可用
     * @param zkConnString 127.0.0.1:2181
     * @param lockName 競爭資源標志,lockName中不能包含單詞lock
     */
    public DistributedLock(String zkConnString, String lockName){
        this.lockName = lockName;
        // 創建一個與服務器的連接
         try {
            zk = new ZooKeeper(zkConnString, sessionTimeout, this);
            Stat stat = zk.exists(root, false);
            if(stat == null){
                // 創建根節點
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
            }
        } catch (IOException e) {
            exception.add(e);
        } catch (KeeperException e) {
            exception.add(e);
        } catch (InterruptedException e) {
            exception.add(e);
        }
    }

    /**
     * zookeeper節點的監視器
     */
    public void process(WatchedEvent event) {
        if(this.latch != null) {  
            this.latch.countDown();  
        }
    }
    /**
     * 獲取鎖
     */
    public void lock() {
        if(exception.size() > 0){
            throw new LockException(exception.get(0));
        }
        try {
            if(this.tryLock()){
                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                return;
            }
            else{
                waitForLock(waitNode, waitTimeout);//等待獲取鎖
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        } 
    }

    /**
     * 嘗試獲取鎖
     */
    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if(lockName.contains(splitStr))
                throw new LockException("lockName can not contains \\u000B");
            //創建臨時有序子節點
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.err.println(myZnode + " is created ");
            //取出所有子節點
            List<String> subNodes = zk.getChildren(root, false);
            //取出所有lockName的鎖
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            //對所有節點進行默認排序,從小到大
            Collections.sort(lockObjNodes);
            System.out.println(myZnode + "==" + lockObjNodes.get(0));
            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //如果是最小的節點,則表示取得鎖
                return true;
            }
            //如果不是最小的節點,找到比自己小1的節點
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            //獲取比當前節點小一級的節點(Collections.binarySearch(lockObjNodes, subMyZnode):獲取當前節點的角標)
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }
    
    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if(this.tryLock()){
                return true;
            }
            return waitForLock(waitNode,time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 等待獲取鎖
     * @param lower :等待的鎖
     * @param waitTime 最大等待時間
     * @return
     * @throws InterruptedException
     * @throws KeeperException
     */
    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);
        //判斷比自己小一個數的節點是否存在,如果不存在則無需等待鎖,同時注冊監聽
        if(stat != null){
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }


    /**
     * 取消鎖監控
     */
    public void unlock() {
        try {
            System.out.println(Thread.currentThread().getId()+",unlock " + myZnode);
            zk.delete(myZnode,-1);
            myZnode = null;
            //zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 關閉zk鏈接
     */
    public void closeZk(){
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }

    public Condition newCondition() {
        return null;
    }
    /**
     * 自定義異常信息
     * @author lenovo
     *
     */
    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}

  這個代碼里,大家可以改為自己的集群,如我的是master:2181,slave1:2181,slave2:2181

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM