zookeeper3.5.9的安裝與使用 Curator Watcher監聽 分布式鎖


zookeeper是Apache Hadoop的子項目,是一個樹形的目錄服務。

在官網下載想要的版本: http://zookeeper.apache.org/releases.html   如這里使用的是 apache-zookeeper-3.5.9-bin.tar.gz(該版本是windows和Linux通用的版本)

 

*安裝 (溫馨提示,zookeeper是java語言編寫的,它依賴於JDK7及以上版本).

 

1, 上傳到Linux服務器並解壓到指定路徑

> mkdir zookeeper

> tar -zxvf apache-zookeeper-3.5.9-bin.tar.gz

 

2, zookeeper的啟動必須有zoo.cfg配置文件,默認是沒有的

> cp zoo_sample.cfg zoo.cfg

> vi zoo.cfg  編輯該文件,主要是指定zookeeper的數據保存路徑

     dataDir=/app/zookeeper/zkdata

 

3, 啟動 (默認是 2181端口 )

> cd bin

> ./zkServer.sh start 啟動,,,當然還有 status stop restart 參數可用.

 

zookeeper中每個節點被稱為 ZNode,每個節點上都會保存自己的數據和節點信息, 節點下可以有子節點;

通常節點分為 持久化節點、臨時節點 -e 、 持久化順序節點 -s  臨時順序節點 -es

 

*客戶端命令

> ./zkCli.sh  [-server localhost:2181]     如果客戶端和服務端在一台,客戶端就是要連這一台可以省略參數 連接成功后,退出用 quit

 *JavaApi調用zkServer

    Curator,它是Apache提供的Java客戶端庫連接zookeeper,最初是Netflix研發的捐獻給Apache基金會,目前是Apache的頂級項目。

    官網: http://curator.apache.org

 

   0. 准備工程  pom.xml

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <!--curator start-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.3.0</version>
        </dependency>
        <!--curator end-->

        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

   1. 連接到zkServer

   2. 創建節點

    3. 查詢節點

     4. 修改節點

 

     5. 刪除節點

 

      6. Watcher監聽

     •ZooKeeper 允許用戶在指定節點上注冊一些Watcher,並且在一些特定事件觸發的時候,ZooKeeper 服務端會將事件通知到感興趣的客戶端上去,該機制是 ZooKeeper 實現分布式協調服務的重要特性

     •ZooKeeper 中引入了Watcher機制來實現了發布/訂閱功能,能夠讓多個訂閱者同時監聽某一個對象,當一個對象自身狀態變化時,會通知所有訂閱者。

     •ZooKeeper 原生支持通過注冊Watcher來進行事件監聽,但是其使用並不是特別方便 需要開發人員自己反復注冊Watcher,比較繁瑣。

     •Curator引入了 Cache 來實現對 ZooKeeper 服務端事件的監聽

    •ZooKeeper提供了三種Watcher:

            •NodeCache : 只是監聽某一個特定的節點

            •PathChildrenCache : 監控一個ZNode的子節點.

            •TreeCache : 可以監控整個樹上的所有節點,類似於PathChildrenCache和NodeCache的組合

     6.1  NodeCache 示例

      6.2  PathChildrenCache 示例

     6.3  TreeCache 示例

 7. zookeeper分布式鎖原理

     •核心思想:當客戶端要獲取鎖,則創建節點,使用完鎖,則刪除該節點。

          1.客戶端獲取鎖時,在lock節點下創建臨時順序節點。(lock節點是舉例的節點名)

            臨時:保證任何時候哪怕是宕機節點也能被刪除, 順序: 因為要找序號最小的節點獲取到鎖,其它節點依次監聽比它小的那一個節點的刪除事件

          2.然后獲取lock下面的所有子節點,客戶端獲取到所有的子節點之后,如果發現自己創建的子節點序號最小,那么就認為該客戶端獲取到了鎖。使用完鎖后,將該節點刪除。

         3.如果發現自己創建的節點並非lock所有子節點中最小的,說明自己還沒有獲取到鎖,此時客戶端需要找到比自己小的那個節點,同時對其注冊事件監聽器,監聽刪除事件

          4.如果發現比自己小的那個節點被刪除,則客戶端的Watcher會收到相應通知,此時再次判斷自己創建的節點是否是lock子節點中序號最小的,如果是則獲取到了鎖,如果不是則重復以上步驟繼續獲取到比自己小的一個節點並注冊監聽。

 

Curator實現分布式鎖API

  • 在Curator中有五種鎖方案:

    • InterProcessSemaphoreMutex:分布式排它鎖(非可重入鎖)

    • InterProcessMutex:分布式可重入排它鎖

    • InterProcessReadWriteLock:分布式讀寫鎖

    • InterProcessMultiLock:將多個鎖作為單個實體管理的容器

    • InterProcessSemaphoreV2:共享信號量

資源在誰手上,分布式鎖就加到誰手上。

package com.laoyang.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

//這里用多線程的方式模擬多個12306的服務
public class Ticket12306 implements Runnable {

    private int tickets = 10; //假定有10張票
    //這里使用curator提供的5種鎖當中的InterProcessMutex
    InterProcessMutex lock ;
    CuratorFramework client = null;
    public Ticket12306(){

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,5);
        //調用CuratorFrameworkFactory工廠的builder()方法創建客戶端對象
        client = CuratorFrameworkFactory.builder()
                .connectString("192.168.239.135:2181") //設置連接串 ip:port,ip:port...
                .sessionTimeoutMs(60000)       //設置會話超時毫秒數,默認值是60*1000
                .connectionTimeoutMs(15000)   //設置連接超時毫秒數,默認值是 15*1000
                .retryPolicy(retryPolicy)      //重試策略
                .build();
        client.start();         //開啟連接

        lock = new InterProcessMutex(client,"/lock");
    }
    @Override
    public void run() {

        while (true){
            //獲取鎖
            try {
                lock.acquire(1, TimeUnit.SECONDS);
                if (tickets >0){
                    System.out.println(Thread.currentThread()+":"+tickets--);
                    Thread.sleep(200);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //釋放鎖
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
package com.laoyang.curator;

import org.junit.Test;

public class LockTest {


    public static void main(String[] args){

        Ticket12306 ticket12306 = new Ticket12306();
        Thread t1 = new Thread(ticket12306,"飛豬");
        Thread t2 = new Thread(ticket12306,"攜程");
        t1.start();
        t2.start();
    }
}

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM