前言:馬上要過年了,祝大家新年快樂!在過年回家前分享一篇關於Zookeeper的文章,我們都知道現在微服務盛行,大數據、分布式系統中經常會使用到Zookeeper,它是微服務、分布式系統中必不可少的分布式協調框架。它的作用體現在分布式系統中解決了配置中心的問題,以及解決了在分布式環境中不同進程之間爭奪資源的問題,也就是分布式鎖的功能以及分布式消息隊列功能等等。所以在微服務的環境中Zookeeper是現在很多公司首選的分布式協調框架,包括我之前的公司也在使用Zookeeper。說了這么多,沒別的就是想說一下Zookeeper的重要性,廢話不多說,進入正題。本篇博客只是演示在.Net Core 環境中如何使用Zookeeper組件進行基本的增刪改查和一些注意的要點,如果對Zookeeper還不是太了解的話,建議認認真真、仔仔細細地閱讀該文章:http://www.cnblogs.com/sunddenly/p/4033574.html 否則可能下面演示的你會看不懂。
一、Zookeeper基本概念快速介紹
概念:
Zookeeper是一個開源的分布式協調框架,它具有高性能 、高可用的特點,同時具有嚴格的順序訪問控制能力(主要是寫操作的嚴格順序性),基於對ZAB(Zookeeper原子消息廣播協議)的實現,它能夠很好的保證分布式環境下的數據一致性。也正是基於這樣的特征,使得Zookeeper稱為解決分布式數據一致性問題的利器,Zookeeper由兩部分組成:Zookeeper服務端和客戶端。
特點:
- 全局一致性:每個server保存一份相同的數據副本,client無論鏈接哪個server,展示的數據都是一致的,這是最重要的特征。
- 可靠性:如果消息其中一台服務器接受,那么將被所有的服務器接受。
- 順序性:包括全局有序性和偏序兩種:全局有序是指如果在一台服務器上消息a在消息b前發布,則在所有server上消息a都將在消息b前被發布;偏序是指如果一個消息b在消息a后被同一個發送者發布,a必將排在b前面。
- 數據更新原子性:一次數據更新要么成功,要么失敗,不存在中間狀態。
- 實時性:Zookeeper保證客戶端將在一個時間間隔范圍內獲得服務器的更新信息,或者服務器失敗的信息。
數據結構:
圖片來源:(https://www.cnblogs.com/xums/p/7074008.html)
- Zookeeper的數據結構模型采用類似於文件系統的樹結構。樹上的每個節點稱為ZNode,而每個節點都可能有一個或者多個子節點。ZNode的節點路徑標識方式是由一系列斜杠"/"進行分割的路徑表示,必須是絕對路徑。既可以向ZNode節點寫入、修改和讀取數據,也可以創建、刪除ZNode節點或ZNode節點下的子節點。
- 值的注意的是,Zookeeper的設計目標不是傳統的數據庫存儲或大數據對象存儲,而是協同數據的存儲,因此在實現的時候,ZNode存儲的數據大小不應該超過1MB。另外,每一個節點都有一個ACL(訪問控制列表),據此控制該節點的訪問權限。
- ZNode數據節點是有生命周期的,其生命周期的長短取決於數據節點的節點類型。節點類型共有四種:持久節點、持久順序節點、臨時節點、臨時順序節點
好了,基本的概念就聊到這里,先有一個印象,如果需要詳細的學習,建議認認真真閱讀這篇博客:http://www.cnblogs.com/sunddenly/p/4033574.html,下面就開始演示基本的api操作。
二、ASP.Net Core 中使用ZooKeeper
首先,添加下面的依賴包:
新建一個.Net Core的控制台應用:
Zookeeper的服務端使用的是張輝清老師新書《中小研發團隊架構實踐》里面的服務,我這里不再安裝Zookeeper服務端,只是介紹一下Zookeeper的目錄結構
- Zookeeper目錄介紹
(1)bin:主要的一些運行命令
(2)conf:存放配置文件,其中我們需要修改zk.cfg
(3)contrib:附加的一些功能
(4)dist-maven:mvn編譯后的目錄
(5)docs:文檔
(6)lib:需要依賴的jar包
配置文件zk.cfg文件內容介紹(單機版)
(1)trickTime:用於計算的時間單元,比如session超時:N*trickTime
(2)initLimit:用於集群,允許從節點鏈接並同步到master節點的初始化鏈接時間,以trickTime的倍數來表示
(3)syncLimit:用於集群,master主節點與從節點之間發送消息,請求和應答時間長度(心跳機制)
(4)dataDir:必須配置
(5)dataLogDir:日志目錄,如果不配置會和dataDir公用
(6)clientPort:鏈接服務器的端口,默認是2181
好了就介紹到這里,下面讓我會演示關於Zookeeper API的各種操作。
- 如何連接Zookeeper的服務端
(1)代碼如下:
using org.apache.zookeeper; using org.apache.zookeeper.data; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event; namespace ZookeeperNetCore { public class ZookeeperClient { public ZooKeeper ZK { get; set; } // 配置項 public string QueryPath { get; set; }= "/Configuration"; //節點狀態信息 public Stat Stat { get; set; } // 配置數據 public byte[] ConfigData { get; set; } = null; public ZookeeperClient(string serviceAddress, int timeout) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this)); Console.WriteLine("客戶端開始連接zookeeper服務器..."); Console.WriteLine($"連接狀態:{ZK.getState()}"); Thread.Sleep(1000);//注意:為什么要加上這行代碼,如果不加會出現什么問題 Console.WriteLine($"連接狀態:{ZK.getState()}"); } // 讀取節點的配置數據 public async Task<string> ReadConfigDataAsync() { if (this.ZK == null) { return string.Empty; } var stat = await ZK.existsAsync(QueryPath, true); if (stat == null) { return string.Empty; } this.Stat = stat; var dataResult = await ZK.getDataAsync(QueryPath, true); return Encoding.UTF8.GetString(dataResult.Data); } public class ConfigServiceWatcher : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper鏈接成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } } }
解釋:
首先,我們來看看創建Zookeeper對象時,應該注意的問題:
Zookeeper的構造函數參數解釋如下:
客戶端和zk服務端鏈接是一個異步的過程,當連接成功后后,客戶端會收的一個watch通知,就是調用回調函數:ConfigServiceWatcher.process(WatchedEvent @event)注意這個類ConfigServiceWatcher必須要繼承Watcher,重寫 process(WatchedEvent @event),所以就打印出了。關於Zookeeper的watcher后面會詳細介紹,不明白的不要緊,后面會通過代碼給大家演示。
(1)connectString:連接服務器的ip字符串,比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"可以是一個ip,也可以是多個ip,一個ip代表單機,多個ip代表集群,也可以在ip后加路徑。
(2)sessionTimeout:超時時間,心跳收不到了,那就超時
(3)watcher:通知事件,如果有對應的事件觸發,則會收到一個通知;如果不需要,那就設置為null,在上面的演示中,我們設置了一個watcher。
(4)canBeReadOnly:可讀,當這個物理機節點斷開后,還是可以讀到數據的,只是不能寫,此時數據被讀取到的可能是舊數據,此處建議設置為false,不推薦使用。
(5)sessionId:會話的id
(6)sessionPasswd:會話密碼 當會話丟失后,可以依據 sessionId 和 sessionPasswd 重新獲取會話。
好了,基本的參數已經介紹完畢,那么,來解釋一下為什么在創建Zookeeper對象時添加下面這句代碼:
其實上面我已經解釋了,由於客戶端和zk服務端鏈接是一個異步的過程,需要一定的時間間隔,所以,如果不添加效果這樣:
(2)zookeeper 恢復之前的會話連接演示
using org.apache.zookeeper; using org.apache.zookeeper.data; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event; namespace ZookeeperNetCore { public class ZookeeperClient { public ZooKeeper ZK { get; set; } // 配置項 public string QueryPath { get; set; }= "/Configuration"; //節點狀態信息 public Stat Stat { get; set; } // 配置數據 public byte[] ConfigData { get; set; } = null; public ZookeeperClient(string serviceAddress, int timeout) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this)); } public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd); } // 讀取節點的配置數據 public async Task<string> ReadConfigDataAsync() { if (this.ZK == null) { return string.Empty; } var stat = await ZK.existsAsync(QueryPath, true); if (stat == null) { return string.Empty; } this.Stat = stat; var dataResult = await ZK.getDataAsync(QueryPath, true); return Encoding.UTF8.GetString(dataResult.Data); } public class ConfigServiceWatcher : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper鏈接成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } public class ConfigServiceWatcher2 : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher2(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper鏈接成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } } }
- ZNode創建刪除修改查詢
代碼:
using org.apache.zookeeper; using org.apache.zookeeper.data; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.Watcher.Event; using static org.apache.zookeeper.ZooDefs; namespace ZookeeperNetCore { public class ZookeeperClient { public ZooKeeper ZK { get; set; } // 配置項 public string QueryPath { get; set; }= "/Configuration"; //節點狀態信息 public Stat Stat { get; set; } // 配置數據 public byte[] ConfigData { get; set; } = null; public ZookeeperClient(string serviceAddress, int timeout) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher(this)); } public ZookeeperClient(string serviceAddress, int timeout, long sessionId, byte[] sessionPasswd) { ZK = new ZooKeeper(serviceAddress, timeout, new ConfigServiceWatcher2(this), sessionId, sessionPasswd); } // 讀取節點的配置數據 public async Task<string> ReadConfigDataAsync() { if (this.ZK == null) { return string.Empty; } var stat = await ZK.existsAsync(QueryPath, true); if (stat == null) { return string.Empty; } this.Stat = stat; var dataResult = await ZK.getDataAsync(QueryPath, true); return Encoding.UTF8.GetString(dataResult.Data); } public class ConfigServiceWatcher : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper鏈接成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } public class ConfigServiceWatcher2 : Watcher { private ZookeeperClient _cs = null; public ConfigServiceWatcher2(ZookeeperClient cs) { _cs = cs; } public override async Task process(WatchedEvent @event) { Console.WriteLine($"Zookeeper鏈接成功:{@event.getState() == KeeperState.SyncConnected}"); if (@event.get_Type() == EventType.NodeDataChanged) { var data = await _cs.ReadConfigDataAsync(); Console.WriteLine("{0}收到修改此節點【{1}】值的通知,其值已被改為【{2}】。", Environment.NewLine, _cs.QueryPath, data); } } } // 關閉ZooKeeper連接 // 釋放資源 public async Task Close() { if (this.ZK != null) { await ZK.closeAsync(); } this.ZK = null; } } }
using org.apache.zookeeper; using System; using System.Text; using System.Threading; using System.Threading.Tasks; using static org.apache.zookeeper.ZooDefs; namespace ZookeeperNetCore { class Program { public const int timeout = 5000; static async Task Main(string[] args) { var conf = new ZookeeperClient("", timeout); try { conf.QueryPath = "/UserName"; Console.WriteLine("客戶端開始連接zookeeper服務器..."); Console.WriteLine($"連接狀態:{conf.ZK.getState()}"); Thread.Sleep(1000);//注意:為什么要加上這行代碼,如果不加會出現什么問題 Console.WriteLine($"連接狀態:{conf.ZK.getState()}"); if (await conf.ZK.existsAsync(conf.QueryPath, false) == null) { conf.ConfigData = Encoding.Default.GetBytes("guozheng"); await conf.ZK.createAsync(conf.QueryPath, conf.ConfigData, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } string configData = await conf.ReadConfigDataAsync(); Console.WriteLine("節點【{0}】目前的值為【{1}】。", conf.QueryPath, configData); Console.ReadLine(); Random random = new Random((int)DateTime.Now.Ticks & 0x0000FFFF); conf.ConfigData = Encoding.UTF8.GetBytes(string.Format("Mike_{0}", random.Next(100))); await conf.ZK.setDataAsync(conf.QueryPath, conf.ConfigData, -1); Console.WriteLine("節點【{0}】的值已被修改為【{1}】。", conf.QueryPath, Encoding.UTF8.GetString(conf.ConfigData)); Console.ReadLine(); if (await conf.ZK.existsAsync(conf.QueryPath, false) != null) { await conf.ZK.deleteAsync(conf.QueryPath, -1); Console.WriteLine("已刪除此【{0}】節點。{1}", conf.QueryPath, Environment.NewLine); } } catch (Exception ex) { if (conf.ZK == null) { Console.WriteLine("已關閉ZooKeeper的連接。"); Console.ReadLine(); return; } Console.WriteLine("拋出異常:{0}【{1}】。", Environment.NewLine, ex.ToString()); } finally { await conf.Close(); Console.WriteLine("已關閉ZooKeeper的連接。"); Console.ReadLine(); } ////開始會話重連 //Console.WriteLine("開始會話重連..."); //var conf2 = new ZookeeperClient("", timeout, sessionId, sessionPassword); //Console.WriteLine(conf2.ZK.getSessionId()); //Console.WriteLine( Encoding.UTF8.GetString(conf2.ZK.getSessionPasswd())); //Console.WriteLine($"重新連接狀態zkSession:{conf2.ZK.getState()}"); //Thread.Sleep(1000);//注意:為什么要加上這行代碼,如果不加會出現什么問題 //Console.WriteLine($"重新連接狀態zkSession:{conf2.ZK.getState()}"); Console.ReadKey(); } } }
解釋:
關於異步創建節點的方法,是不支持子節點的遞歸創建,參數介紹:
(1)path:創建的路徑
(2)data:存儲的數據的byte[]
(3)acl:控制權限策略 Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa CREATOR_ALL_ACL --> auth:user:password:cdrwa
(4)createMode: 節點類型, 是一個枚舉 PERSISTENT:持久節點 PERSISTENT_SEQUENTIAL:持久順序節點 EPHEMERAL:臨時節點 EPHEMERAL_SEQUENTIAL:臨時順序節點
關於上面參數引出來的知識點,需要幾章來講解,本篇文章先不介紹,后面會介紹。好了,關於.Net Core中使用Zookeeper的介紹就到這里,關於上面演示的結果,我先拋出一個問題,大家可以思考一下:為什么“Zookeeper鏈接成功:True”會輸出多次?也就是我們下節要討論的Zookeeper的watcher機制。時間到了,收拾行李,准備一下回家啦,先寫到這里,祝大家新年快樂!希望對你有幫助,過完年來見!
三、總結
可能有些地方解釋的不是太清楚,大家多多見諒,有些的不對的地方,希望能指正出來。
說明:演示代碼里面使用的Zookeeper服務過一段時間能用,不能用的話,在評論區留言,后面用阿里雲自己搭建一個。
代碼地址:
https://github.com/guozheng007/ZookeeperNetCoreDemo
參考資料:
(1)張輝清:《中小研發團隊架構實踐》
(2) 風間影月:《ZooKeeper分布式專題與Dubbo微服務入門》
(3)sunddenly:http://www.cnblogs.com/sunddenly/p/4033574.html
作者:郭崢
出處:http://www.cnblogs.com/runningsmallguo/
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。