ASP.Net Core 中使用Zookeeper搭建分布式環境中的配置中心系列一:使用Zookeeper.Net組件演示基本的操作


前言:馬上要過年了,祝大家新年快樂!在過年回家前分享一篇關於Zookeeper的文章,我們都知道現在微服務盛行,大數據、分布式系統中經常會使用到Zookeeper,它是微服務、分布式系統中必不可少的分布式協調框架。它的作用體現在分布式系統中解決了配置中心的問題,以及解決了在分布式環境中不同進程之間爭奪資源的問題,也就是分布式鎖的功能以及分布式消息隊列功能等等。所以在微服務的環境中Zookeeper是現在很多公司首選的分布式協調框架,包括我之前的公司也在使用Zookeeper。說了這么多,沒別的就是想說一下Zookeeper的重要性,廢話不多說,進入正題。本篇博客只是演示在.Net Core 環境中如何使用Zookeeper組件進行基本的增刪改查和一些注意的要點,如果對Zookeeper還不是太了解的話,建議認認真真、仔仔細細地閱讀該文章:http://www.cnblogs.com/sunddenly/p/4033574.html   否則可能下面演示的你會看不懂。

 

一、Zookeeper基本概念快速介紹

概念:

Zookeeper是一個開源的分布式協調框架,它具有高性能 、高可用的特點,同時具有嚴格的順序訪問控制能力(主要是寫操作的嚴格順序性),基於對ZAB(Zookeeper原子消息廣播協議)的實現,它能夠很好的保證分布式環境下的數據一致性。也正是基於這樣的特征,使得Zookeeper稱為解決分布式數據一致性問題的利器,Zookeeper由兩部分組成:Zookeeper服務端和客戶端。

特點:

  • 全局一致性:每個server保存一份相同的數據副本,client無論鏈接哪個server,展示的數據都是一致的,這是最重要的特征。
  • 可靠性:如果消息其中一台服務器接受,那么將被所有的服務器接受。
  • 順序性:包括全局有序性和偏序兩種:全局有序是指如果在一台服務器上消息a在消息b前發布,則在所有server上消息a都將在消息b前被發布;偏序是指如果一個消息b在消息a后被同一個發送者發布,a必將排在b前面。
  • 數據更新原子性:一次數據更新要么成功,要么失敗,不存在中間狀態。
  • 實時性:Zookeeper保證客戶端將在一個時間間隔范圍內獲得服務器的更新信息,或者服務器失敗的信息。

數據結構:

圖片來源:(https://www.cnblogs.com/xums/p/7074008.html)

  • Zookeeper的數據結構模型采用類似於文件系統的樹結構。樹上的每個節點稱為ZNode,而每個節點都可能有一個或者多個子節點。ZNode的節點路徑標識方式是由一系列斜杠"/"進行分割的路徑表示,必須是絕對路徑。既可以向ZNode節點寫入、修改和讀取數據,也可以創建、刪除ZNode節點或ZNode節點下的子節點。
  • 值的注意的是,Zookeeper的設計目標不是傳統的數據庫存儲或大數據對象存儲,而是協同數據的存儲,因此在實現的時候,ZNode存儲的數據大小不應該超過1MB。另外,每一個節點都有一個ACL(訪問控制列表),據此控制該節點的訪問權限。
  • ZNode數據節點是有生命周期的,其生命周期的長短取決於數據節點的節點類型。節點類型共有四種:持久節點、持久順序節點、臨時節點、臨時順序節點

 

好了,基本的概念就聊到這里,先有一個印象,如果需要詳細的學習,建議認認真真閱讀這篇博客:http://www.cnblogs.com/sunddenly/p/4033574.html,下面就開始演示基本的api操作。

 

二、ASP.Net Core 中使用ZooKeeper

 首先,添加下面的依賴包:

 

新建一個.Net Core的控制台應用:

Zookeeper的服務端使用的是張輝清老師新書《中小研發團隊架構實踐》里面的服務,我這里不再安裝Zookeeper服務端,只是介紹一下Zookeeper的目錄結構

  • Zookeeper目錄介紹

(1)bin:主要的一些運行命令

(2)conf:存放配置文件,其中我們需要修改zk.cfg

(3)contrib:附加的一些功能

(4)dist-maven:mvn編譯后的目錄

(5)docs:文檔

(6)lib:需要依賴的jar包

配置文件zk.cfg文件內容介紹(單機版)

(1)trickTime:用於計算的時間單元,比如session超時:N*trickTime

(2)initLimit:用於集群,允許從節點鏈接並同步到master節點的初始化鏈接時間,以trickTime的倍數來表示

(3)syncLimit:用於集群,master主節點與從節點之間發送消息,請求和應答時間長度(心跳機制)

(4)dataDir:必須配置

(5)dataLogDir:日志目錄,如果不配置會和dataDir公用

(6)clientPort:鏈接服務器的端口,默認是2181

好了就介紹到這里,下面讓我會演示關於Zookeeper  API的各種操作。

  • 如何連接Zookeeper的服務端

(1)代碼如下:

using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.Watcher.Event;

namespace ZookeeperNetCore
{
    public class ZookeeperClient
    {
        public ZooKeeper ZK { get; set; }

        // 配置項
        public string QueryPath { get; set; }= "/Configuration";
        //節點狀態信息
        public Stat Stat { get; set; }

        // 配置數據
        public byte[] ConfigData { get; set; } = null;


        public ZookeeperClient(string serviceAddress, int timeout)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this));

            Console.WriteLine("客戶端開始連接zookeeper服務器...");
            Console.WriteLine($"連接狀態:{ZK.getState()}");
            Thread.Sleep(1000);//注意:為什么要加上這行代碼,如果不加會出現什么問題
            Console.WriteLine($"連接狀態:{ZK.getState()}");
        }

        // 讀取節點的配置數據
        public async Task<string> ReadConfigDataAsync()
        {
            if (this.ZK == null)
            {
                return string.Empty;
            }

            var stat = await ZK.existsAsync(QueryPath, true);

            if (stat == null)
            {
                return string.Empty;
            }

            this.Stat = stat;

            var dataResult = await ZK.getDataAsync(QueryPath, true);
            
            return Encoding.UTF8.GetString(dataResult.Data);
        }

        public class ConfigServiceWatcher : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override  async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper鏈接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

    }
}

解釋:

 首先,我們來看看創建Zookeeper對象時,應該注意的問題:

Zookeeper的構造函數參數解釋如下:

客戶端和zk服務端鏈接是一個異步的過程,當連接成功后后,客戶端會收的一個watch通知,就是調用回調函數:ConfigServiceWatcher.process(WatchedEvent @event)注意這個類ConfigServiceWatcher必須要繼承Watcher,重寫 process(WatchedEvent @event),所以就打印出了。關於Zookeeper的watcher后面會詳細介紹,不明白的不要緊,后面會通過代碼給大家演示。

(1)connectString:連接服務器的ip字符串,比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"可以是一個ip,也可以是多個ip,一個ip代表單機,多個ip代表集群,也可以在ip后加路徑。

(2)sessionTimeout:超時時間,心跳收不到了,那就超時

(3)watcher:通知事件,如果有對應的事件觸發,則會收到一個通知;如果不需要,那就設置為null,在上面的演示中,我們設置了一個watcher。

(4)canBeReadOnly:可讀,當這個物理機節點斷開后,還是可以讀到數據的,只是不能寫,此時數據被讀取到的可能是舊數據,此處建議設置為false,不推薦使用。

(5)sessionId:會話的id

(6)sessionPasswd:會話密碼 當會話丟失后,可以依據 sessionId 和 sessionPasswd 重新獲取會話。

好了,基本的參數已經介紹完畢,那么,來解釋一下為什么在創建Zookeeper對象時添加下面這句代碼:

其實上面我已經解釋了,由於客戶端和zk服務端鏈接是一個異步的過程,需要一定的時間間隔,所以,如果不添加效果這樣:

 

(2)zookeeper 恢復之前的會話連接演示

using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.Watcher.Event;

namespace ZookeeperNetCore
{
    public class ZookeeperClient
    {
        public ZooKeeper ZK { get; set; }

        // 配置項
        public string QueryPath { get; set; }= "/Configuration";
        //節點狀態信息
        public Stat Stat { get; set; }

        // 配置數據
        public byte[] ConfigData { get; set; } = null;


        public ZookeeperClient(string serviceAddress, int timeout)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this));

        }

        public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd);

        }

        // 讀取節點的配置數據
        public async Task<string> ReadConfigDataAsync()
        {
            if (this.ZK == null)
            {
                return string.Empty;
            }

            var stat = await ZK.existsAsync(QueryPath, true);

            if (stat == null)
            {
                return string.Empty;
            }

            this.Stat = stat;

            var dataResult = await ZK.getDataAsync(QueryPath, true);
            
            return Encoding.UTF8.GetString(dataResult.Data);
        }

        public class ConfigServiceWatcher : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override  async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper鏈接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

        public class ConfigServiceWatcher2 : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher2(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper鏈接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }
    }
}

 

  •  ZNode創建刪除修改查詢

代碼:

using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.Watcher.Event;
using static org.apache.zookeeper.ZooDefs;

namespace ZookeeperNetCore
{
    public class ZookeeperClient
    {
        public ZooKeeper ZK { get; set; }

        // 配置項
        public string QueryPath { get; set; }= "/Configuration";
        //節點狀態信息
        public Stat Stat { get; set; }

        // 配置數據
        public byte[] ConfigData { get; set; } = null;


        public ZookeeperClient(string serviceAddress, int timeout)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this));

        }

        public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd)
        {
            ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd);

        }

        // 讀取節點的配置數據
        public async Task<string> ReadConfigDataAsync()
        {
            if (this.ZK == null)
            {
                return string.Empty;
            }

            var stat = await ZK.existsAsync(QueryPath, true);

            if (stat == null)
            {
                return string.Empty;
            }

            this.Stat = stat;

            var dataResult = await ZK.getDataAsync(QueryPath, true);
            
            return Encoding.UTF8.GetString(dataResult.Data);
        }

        

        public class ConfigServiceWatcher : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override  async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper鏈接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

        public class ConfigServiceWatcher2 : Watcher
        {
            private ZookeeperClient _cs = null;

            public ConfigServiceWatcher2(ZookeeperClient cs)
            {
                _cs = cs;
            }

            public override async Task process(WatchedEvent @event)
            {
                Console.WriteLine($"Zookeeper鏈接成功:{@event.getState() == KeeperState.SyncConnected}");

                if (@event.get_Type() == EventType.NodeDataChanged)
                {
                    var data = await _cs.ReadConfigDataAsync();

                    Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data);
                }
            }
        }

        // 關閉ZooKeeper連接
        // 釋放資源
        public async Task Close()
        {
            if (this.ZK != null)
            {
               await ZK.closeAsync();
            }

            this.ZK = null;
        }


        
    }
}
using org.apache.zookeeper;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static org.apache.zookeeper.ZooDefs;

namespace ZookeeperNetCore
{
    class Program
    {
        public const int timeout = 5000;
        static async Task Main(string[] args)
        {
            var conf = new ZookeeperClient("", timeout);

            try
            {
                conf.QueryPath = "/UserName";

                Console.WriteLine("客戶端開始連接zookeeper服務器...");
                Console.WriteLine($"連接狀態:{conf.ZK.getState()}");
                Thread.Sleep(1000);//注意:為什么要加上這行代碼,如果不加會出現什么問題
                Console.WriteLine($"連接狀態:{conf.ZK.getState()}");

                if (await conf.ZK.existsAsync(conf.QueryPath, false) == null)
                {
                    conf.ConfigData = Encoding.Default.GetBytes("guozheng");
                    await conf.ZK.createAsync(conf.QueryPath, conf.ConfigData, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }

                string configData = await conf.ReadConfigDataAsync();
                Console.WriteLine("節點【{0}】目前的值為【{1}】。", conf.QueryPath, configData);
                Console.ReadLine();


                Random random = new Random((int)DateTime.Now.Ticks & 0x0000FFFF);
                conf.ConfigData = Encoding.UTF8.GetBytes(string.Format("Mike_{0}", random.Next(100)));

                await conf.ZK.setDataAsync(conf.QueryPath, conf.ConfigData, -1);

                Console.WriteLine("節點【{0}】的值已被修改為【{1}】。", conf.QueryPath, Encoding.UTF8.GetString(conf.ConfigData));

                Console.ReadLine();

                if (await conf.ZK.existsAsync(conf.QueryPath, false) != null)
                {
                    await conf.ZK.deleteAsync(conf.QueryPath, -1);

                    Console.WriteLine("已刪除此【{0}】節點。{1}", conf.QueryPath, Environment.NewLine);
                }

            }
            catch (Exception ex)
            {
                if (conf.ZK == null)
                {
                    Console.WriteLine("已關閉ZooKeeper的連接。");
                    Console.ReadLine();
                    return;
                }

                Console.WriteLine("拋出異常:{0}【{1}】。", Environment.NewLine, ex.ToString());
            }
            finally
            {
                await conf.Close();
                Console.WriteLine("已關閉ZooKeeper的連接。");
                Console.ReadLine();
            }



            ////開始會話重連
            //Console.WriteLine("開始會話重連...");

            //var conf2 = new ZookeeperClient("", timeout, sessionId, sessionPassword);

            //Console.WriteLine(conf2.ZK.getSessionId());
            //Console.WriteLine( Encoding.UTF8.GetString(conf2.ZK.getSessionPasswd()));

            //Console.WriteLine($"重新連接狀態zkSession:{conf2.ZK.getState()}");
            //Thread.Sleep(1000);//注意:為什么要加上這行代碼,如果不加會出現什么問題
            //Console.WriteLine($"重新連接狀態zkSession:{conf2.ZK.getState()}");


            Console.ReadKey();
        }
    }
}

 

 解釋:

關於異步創建節點的方法,是不支持子節點的遞歸創建,參數介紹:

(1)path:創建的路徑

(2)data:存儲的數據的byte[]

(3)acl:控制權限策略   Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa      CREATOR_ALL_ACL --> auth:user:password:cdrwa

(4)createMode: 節點類型, 是一個枚舉    PERSISTENT:持久節點   PERSISTENT_SEQUENTIAL:持久順序節點   EPHEMERAL:臨時節點   EPHEMERAL_SEQUENTIAL:臨時順序節點

 關於上面參數引出來的知識點,需要幾章來講解,本篇文章先不介紹,后面會介紹。好了,關於.Net Core中使用Zookeeper的介紹就到這里,關於上面演示的結果,我先拋出一個問題,大家可以思考一下:為什么“Zookeeper鏈接成功:True”會輸出多次?也就是我們下節要討論的Zookeeper的watcher機制。時間到了,收拾行李,准備一下回家啦,先寫到這里,祝大家新年快樂!希望對你有幫助,過完年來見!

 

三、總結

 可能有些地方解釋的不是太清楚,大家多多見諒,有些的不對的地方,希望能指正出來。

說明:演示代碼里面使用的Zookeeper服務過一段時間能用,不能用的話,在評論區留言,后面用阿里雲自己搭建一個。

 代碼地址:

 https://github.com/guozheng007/ZookeeperNetCoreDemo

 

 

參考資料:

(1)張輝清:《中小研發團隊架構實踐》

(2) 風間影月:《ZooKeeper分布式專題與Dubbo微服務入門》

 (3)sunddenly:http://www.cnblogs.com/sunddenly/p/4033574.html

作者:郭崢

出處:http://www.cnblogs.com/runningsmallguo/

本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM