Demo代碼已提交到gitee,感興趣的更有可以直接克隆使用,地址:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/Zookeeper
.net core要使用Zookeeper,我們還是推薦使用ZooKeeperNetEx這個插件,先在nuget中搜索安裝ZooKeeperNetEx,然后可以在Startup類中直接使用ZooKeeperNetEx連接Zookeeper獲取數據,也可以使用前面章節中介紹的ZookeeperHelper輔助類來操作,本文也是以這個Zookeeper輔助類來作為工具,將Zookeeper作為一個配置源,從而將它集成到.net core的Configuration-option配置中。
博主使用的是.net core2.2的版本,不過對2.*和3.*區別不大。
首先貼出簡單的項目文件結構:
其中,Configurations目錄下的一系列Zookeeper開頭的文件是集成Zookeeper所需的類,下面單獨介紹,首先從我們前面的文章中得到ZookeeperHelper這個輔助類:Zookeeper基礎教程(四):C#連接使用Zookeeper
ZookeeperConfigurationExtensions是一個拓展類,最終使用的是這個類的拓展方法去集成Zookeeper到Configuration配置源中

public static class ZookeeperConfigurationExtensions { /// <summary> /// 添加Zookeeper配置 /// </summary> /// <param name="builder"></param> /// <param name="configure"></param> public static IConfigurationBuilder AddZookeeper(this IConfigurationBuilder builder, Action<ZookeeperOptions> configure) { var options = new ZookeeperOptions(); configure?.Invoke(options); return builder.AddZookeeper(options); } /// <summary> /// 添加Zookeeper配置 /// </summary> /// <param name="builder"></param> /// <param name="zookeeperOptions"></param> public static IConfigurationBuilder AddZookeeper(this IConfigurationBuilder builder, ZookeeperOptions zookeeperOptions) { ZookeeperConfigurationSource zookeeperConfigurationSource = new ZookeeperConfigurationSource(zookeeperOptions); builder.Add(zookeeperConfigurationSource); return builder; } /// <summary> /// 添加Zookeeper配置 /// </summary> /// <param name="builder"></param> /// <param name="configure"></param> /// <returns></returns> public static IHostBuilder UseZookeeper(this IHostBuilder builder, Action<ZookeeperOptions> configure) { var options = new ZookeeperOptions(); configure?.Invoke(options); return builder.UseZookeeper(options); } /// <summary> /// 添加Zookeeper配置 /// </summary> /// <param name="builder"></param> /// <param name="zookeeperOptions"></param> /// <returns></returns> public static IHostBuilder UseZookeeper(this IHostBuilder builder, ZookeeperOptions zookeeperOptions) { return builder.ConfigureAppConfiguration((_, cbuilder) => cbuilder.AddZookeeper(zookeeperOptions)); } }
ZookeeperConfigurationProvider是Zookeeper集成的核心類,主要是保存Zookeeper讀取下來的數據

public class ZookeeperConfigurationProvider : ConfigurationProvider, IDisposable { IDisposable _changeTokenRegistration; ConfigurationReloadToken _reloadToken = new ConfigurationReloadToken(); /// <summary> /// 監控對象 /// </summary> ZookeeperConfigurationWatcher zookeeperConfigurationWatcher; public ZookeeperConfigurationSource ZookeeperConfigurationSource { get; private set; } public ZookeeperConfigurationProvider(ZookeeperConfigurationSource zookeeperConfigurationSource) { this.ZookeeperConfigurationSource = zookeeperConfigurationSource; this.zookeeperConfigurationWatcher = new ZookeeperConfigurationWatcher(this); if (zookeeperConfigurationSource.ZookeeperOptions.ReloadOnChange) { _changeTokenRegistration = ChangeToken.OnChange( () => Watch(), () => { Thread.Sleep(zookeeperConfigurationSource.ZookeeperOptions.ReloadDelay); Load(); }); } } /// <summary> /// 獲取ReloadToken /// </summary> /// <returns></returns> private IChangeToken Watch() { return _reloadToken; } /// <summary> /// 加載配置 /// </summary> public override void Load() { Data = zookeeperConfigurationWatcher.Process(); OnReload(); } /// <summary> /// 重置 /// </summary> public void Reload() { var previousToken = Interlocked.Exchange(ref _reloadToken, new ConfigurationReloadToken()); previousToken.OnReload(); } /// <summary> /// 釋放 /// </summary> public void Dispose() { _changeTokenRegistration?.Dispose(); } }
ZookeeperConfigurationSource類表示一個配置源

public class ZookeeperConfigurationSource : IConfigurationSource { /// <summary> /// 源狀態信息 /// </summary> public ZookeeperOptions ZookeeperOptions { get; private set; } public ZookeeperConfigurationSource(ZookeeperOptions zookeeperOptions) { this.ZookeeperOptions = zookeeperOptions; } /// <summary> /// 獲取配置提供者 /// </summary> /// <param name="builder"></param> /// <returns></returns> public IConfigurationProvider Build(IConfigurationBuilder builder) { return new ZookeeperConfigurationProvider(this); } }
ZookeeperConfigurationWatcher是Zookeeper數據讀取的核心類,ZookeeperHelper這個輔助類就是在這個類中連接Zookeeper去讀取數據

public class ZookeeperConfigurationWatcher : IDisposable { ZookeeperConfigurationProvider zookeeperConfigurationProvider; /// <summary> /// Zookeeper輔助操作類 /// </summary> ZookeeperHelper zookeeperHelper; public ZookeeperConfigurationWatcher(ZookeeperConfigurationProvider zookeeperConfigurationProvider) { this.zookeeperConfigurationProvider = zookeeperConfigurationProvider; var options = zookeeperConfigurationProvider.ZookeeperConfigurationSource.ZookeeperOptions; zookeeperHelper = new ZookeeperHelper(options.Address, options.RootPath); //初次連接后,立即 zookeeperHelper.OnConnected += () => { if (options.Scheme != AuthScheme.World) { zookeeperHelper.Authorize(options.Scheme, options.Auth); } zookeeperConfigurationProvider.Reload(); }; zookeeperHelper.Connect(); //int timeOut = 10; //while (!zookeeperHelper.Connected && timeOut-- > 0) //{ // Thread.Sleep(1000); //停一秒,等待連接完成 //} if (!zookeeperHelper.Connected) { throw new TimeoutException($"connect to zookeeper [{options.Address}] timeout"); } if (options.ReloadOnChange) { //監聽根節點下所有的znode節點,當任意節點發生改變后,即立刻重新加載 zookeeperHelper.WatchAllAsync(ze => { if (ze.Type != ZookeeperEvent.EventType.None) { //使用一個異步去完成,同步會造成死鎖 Task.Run(() => { zookeeperConfigurationProvider.Reload(); }); } }).Wait(); } } /// <summary> /// 獲取Zookeeper中的數據 /// </summary> /// <returns></returns> public IDictionary<string, string> Process() { var data = zookeeperHelper.GetDictionaryAsync(ConfigurationPath.KeyDelimiter).GetAwaiter().GetResult(); if (!zookeeperConfigurationProvider.ZookeeperConfigurationSource.ZookeeperOptions.ExcludeRoot) { return data; } var root = this.zookeeperConfigurationProvider.ZookeeperConfigurationSource.ZookeeperOptions.RootPath ?? ""; var rootSplits = root.Split(new string[] { "/" }, StringSplitOptions.RemoveEmptyEntries); IDictionary<string, string> dict = new Dictionary<string, string>(); foreach (var key in data.Keys) { var split = key.Split(new string[] { ConfigurationPath.KeyDelimiter }, StringSplitOptions.RemoveEmptyEntries); var array = split.Skip(rootSplits.Length).ToArray(); dict[ConfigurationPath.Combine(array)] = data[key]; } return dict; } /// <summary> /// 釋放資源 /// </summary> public virtual void Dispose() { zookeeperHelper.Dispose(); zookeeperConfigurationProvider.Dispose(); } }
ZookeeperOptions是相關配置信息

public class ZookeeperOptions { /// <summary> /// 是否監控源數據變化 /// </summary> public bool ReloadOnChange { get; set; } = true; /// <summary> /// 加載延遲 /// </summary> public int ReloadDelay { get; set; } = 250; /// <summary> /// Zookeeper集群地址 /// </summary> public string[] Address { get; set; } /// <summary> /// Zookeeper初始路徑 /// </summary> public string RootPath { get; set; } /// <summary> /// 認證主題 /// </summary> public AuthScheme Scheme { get; set; } /// <summary> /// 認證信息 /// </summary> public string Auth { get; set; } /// <summary> /// 生成的配置中是否去掉初始路徑 /// </summary> public bool ExcludeRoot { get; set; } }
然后是Program類:
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; using AspNetCore.ZookeeperApi.Configurations; using Microsoft.AspNetCore; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; namespace AspNetCore.ZookeeperApi { public class Program { public static void Main(string[] args) { CreateWebHostBuilder(args).Build().Run(); } public static IWebHostBuilder CreateWebHostBuilder(string[] args) => WebHost.CreateDefaultBuilder(args) .UseStartup<Startup>() .UseZookeeper(new ZookeeperOptions() { //Auth = "user:123", //Scheme = AuthScheme.Digest, //Address = new string[] { "192.168.209.133:2181", "192.168.209.134:2181", "192.168.209.135:2181" }, Address = new string[] { "192.168.209.133:2181" }, RootPath = "/Test" }); } }
其中UseZookeeper方法就是ZookeeperConfigurationExtensions的一個拓展方法,調用這個拓展方法之后,Zookeeper讀取的配置可以通過IConfiguration這個服務對象讀取了。
UseZookeeper拓展方法傳入了一個參數,Address是Zookeeper連接地址,RootPath是項目配置在Zookeeper中的根節點路徑。
假如我們在Zookeeper中創建一個znode節點:/Test,然后在/Test節點下創建5個子節點:/Test/Value1、/Test/Value2、/Test/Value3、/Test/Value4、/Test/Value5
然后分別將數據保存到這5個節點中:
# Value1表示字符串類型數據
set /Test/Value1 'this is string values'
# Value2表示整形類型數據
set /Test/Value2 1
# Value3表示浮點型數據
set /Test/Value3 3.14
# Value4表示布爾類型數據
set /Test/Value4 false
# Value5表示日期類型數據
set /Test/Value5 '2020-06-02'
上面說了,Zookeeper讀取的配置可以通過IConfiguration這個服務對象讀取,IConfiguration是.net core提供的配置服務,我們可以直接使用讀取,但是推薦使用Option方式讀取,比如為讀取/Test節點下的5個節點數據,我們可以創建一個TestOptions類:
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace AspNetCore.ZookeeperApi { public class TestOptions { /// <summary> /// 字符數據 /// </summary> public string Value1 { get; set; } /// <summary> /// 整形數據 /// </summary> public int Value2 { get; set; } /// <summary> /// 浮點型數據 /// </summary> public decimal Value3 { get; set; } /// <summary> /// 布爾型數據 /// </summary> public bool Value4 { get; set; } /// <summary> /// 日期數據 /// </summary> public DateTime Value5 { get; set; } } }
然后在Startup的ConfigureServices方法中注冊
public void ConfigureServices(IServiceCollection services) { services.Configure<TestOptions>(Configuration.GetSection("Test")); services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2); }
之后我們就可以直接使用Option注入的形式去訪問了,比如我們創建一個TestController的控制器:
[HttpGet] public object Get() { //使用IOptionsSnapshot<>,它是Scoped作用域 var optionsSnapshot = HttpContext.RequestServices.GetService<IOptionsSnapshot<TestOptions>>(); //因為IOptionsMonitor<>有緩存IOptionsMonitorCache<>,所以需要注意使用 //如果使用service.Configure<TOptions>(IConfiguration)方法添加的Options,否則需要自行設置重新加載機制 //重新加載可參考:https://www.cnblogs.com/shanfeng1000/p/15095236.html var optionsMonitor = HttpContext.RequestServices.GetService<IOptionsMonitor<TestOptions>>(); return new { OptionsSnapshot = optionsSnapshot.Value, OptionsMonitor = optionsMonitor.CurrentValue }; }
然后運行項目,訪問這個接口地址就能得到:
可以看到Zookeeper中的數據成功被讀取到,現在我們可以修改Zookeeper中/Test節點下的任意節點數據,如:
set /Test/Value2 2
由於Zookeeper的監聽機制,重新調用接口,你會看到Value2的值也成功刷新了