Zookeeper基礎教程(六):.net core使用Zookeeper


  Demo代碼已提交到gitee,感興趣的更有可以直接克隆使用,地址:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/Zookeeper

  .net core要使用Zookeeper,我們還是推薦使用ZooKeeperNetEx這個插件,先在nuget中搜索安裝ZooKeeperNetEx,然后可以在Startup類中直接使用ZooKeeperNetEx連接Zookeeper獲取數據,也可以使用前面章節中介紹的ZookeeperHelper輔助類來操作,本文也是以這個Zookeeper輔助類來作為工具,將Zookeeper作為一個配置源,從而將它集成到.net core的Configuration-option配置中。

  博主使用的是.net core2.2的版本,不過對2.*和3.*區別不大。

  首先貼出簡單的項目文件結構:

  

   其中,Configurations目錄下的一系列Zookeeper開頭的文件是集成Zookeeper所需的類,下面單獨介紹,首先從我們前面的文章中得到ZookeeperHelper這個輔助類:Zookeeper基礎教程(四):C#連接使用Zookeeper

  ZookeeperConfigurationExtensions是一個拓展類,最終使用的是這個類的拓展方法去集成Zookeeper到Configuration配置源中

  
    public static class ZookeeperConfigurationExtensions
    {
        /// <summary>
        /// 添加Zookeeper配置
        /// </summary>
        /// <param name="builder"></param>
        /// <param name="configure"></param>
        public static IConfigurationBuilder AddZookeeper(this IConfigurationBuilder builder, Action<ZookeeperOptions> configure)
        {
            var options = new ZookeeperOptions();
            configure?.Invoke(options);
            return builder.AddZookeeper(options);
        }
        /// <summary>
        /// 添加Zookeeper配置
        /// </summary>
        /// <param name="builder"></param>
        /// <param name="zookeeperOptions"></param>
        public static IConfigurationBuilder AddZookeeper(this IConfigurationBuilder builder, ZookeeperOptions zookeeperOptions)
        {
            ZookeeperConfigurationSource zookeeperConfigurationSource = new ZookeeperConfigurationSource(zookeeperOptions);
            builder.Add(zookeeperConfigurationSource);
            return builder;
        }
        /// <summary>
        /// 添加Zookeeper配置
        /// </summary>
        /// <param name="builder"></param>
        /// <param name="configure"></param>
        /// <returns></returns>
        public static IHostBuilder UseZookeeper(this IHostBuilder builder, Action<ZookeeperOptions> configure)
        {
            var options = new ZookeeperOptions();
            configure?.Invoke(options);
            return builder.UseZookeeper(options);
        }
        /// <summary>
        /// 添加Zookeeper配置
        /// </summary>
        /// <param name="builder"></param>
        /// <param name="zookeeperOptions"></param>
        /// <returns></returns>
        public static IHostBuilder UseZookeeper(this IHostBuilder builder, ZookeeperOptions zookeeperOptions)
        {
            return builder.ConfigureAppConfiguration((_, cbuilder) => cbuilder.AddZookeeper(zookeeperOptions));
        }
    }
ZookeeperConfigurationExtensions

  ZookeeperConfigurationProvider是Zookeeper集成的核心類,主要是保存Zookeeper讀取下來的數據  

  
    public class ZookeeperConfigurationProvider : ConfigurationProvider, IDisposable
    {
        IDisposable _changeTokenRegistration;
        ConfigurationReloadToken _reloadToken = new ConfigurationReloadToken();

        /// <summary>
        /// 監控對象
        /// </summary>
        ZookeeperConfigurationWatcher zookeeperConfigurationWatcher;
        public ZookeeperConfigurationSource ZookeeperConfigurationSource { get; private set; }

        public ZookeeperConfigurationProvider(ZookeeperConfigurationSource zookeeperConfigurationSource)
        {
            this.ZookeeperConfigurationSource = zookeeperConfigurationSource;
            this.zookeeperConfigurationWatcher = new ZookeeperConfigurationWatcher(this);

            if (zookeeperConfigurationSource.ZookeeperOptions.ReloadOnChange)
            {
                _changeTokenRegistration = ChangeToken.OnChange(
                    () => Watch(),
                    () =>
                    {
                        Thread.Sleep(zookeeperConfigurationSource.ZookeeperOptions.ReloadDelay);
                        Load();
                    });
            }
        }
        /// <summary>
        /// 獲取ReloadToken
        /// </summary>
        /// <returns></returns>
        private IChangeToken Watch()
        {
            return _reloadToken;
        }
        /// <summary>
        /// 加載配置
        /// </summary>
        public override void Load()
        {
            Data = zookeeperConfigurationWatcher.Process();

            OnReload();
        }
        /// <summary>
        /// 重置
        /// </summary>
        public void Reload()
        {
            var previousToken = Interlocked.Exchange(ref _reloadToken, new ConfigurationReloadToken());
            previousToken.OnReload();
        }
        /// <summary>
        /// 釋放
        /// </summary>
        public void Dispose()
        {
            _changeTokenRegistration?.Dispose();
        }
    }
ZookeeperConfigurationProvider

  ZookeeperConfigurationSource類表示一個配置源  

  
    public class ZookeeperConfigurationSource : IConfigurationSource
    {
        /// <summary>
        /// 源狀態信息
        /// </summary>
        public ZookeeperOptions ZookeeperOptions { get; private set; }

        public ZookeeperConfigurationSource(ZookeeperOptions zookeeperOptions)
        {
            this.ZookeeperOptions = zookeeperOptions;
        }

        /// <summary>
        /// 獲取配置提供者
        /// </summary>
        /// <param name="builder"></param>
        /// <returns></returns>
        public IConfigurationProvider Build(IConfigurationBuilder builder)
        {
            return new ZookeeperConfigurationProvider(this);
        }
    }
ZookeeperConfigurationSource

  ZookeeperConfigurationWatcher是Zookeeper數據讀取的核心類,ZookeeperHelper這個輔助類就是在這個類中連接Zookeeper去讀取數據     

  
    public class ZookeeperConfigurationWatcher : IDisposable
    {
        ZookeeperConfigurationProvider zookeeperConfigurationProvider;

        /// <summary>
        /// Zookeeper輔助操作類
        /// </summary>
        ZookeeperHelper zookeeperHelper;

        public ZookeeperConfigurationWatcher(ZookeeperConfigurationProvider zookeeperConfigurationProvider)
        {
            this.zookeeperConfigurationProvider = zookeeperConfigurationProvider;
            var options = zookeeperConfigurationProvider.ZookeeperConfigurationSource.ZookeeperOptions;
            zookeeperHelper = new ZookeeperHelper(options.Address, options.RootPath);
            //初次連接后,立即
            zookeeperHelper.OnConnected += () =>
            {
                if (options.Scheme != AuthScheme.World)
                {
                    zookeeperHelper.Authorize(options.Scheme, options.Auth);
                }
                zookeeperConfigurationProvider.Reload();
            };
            zookeeperHelper.Connect();
            //int timeOut = 10;
            //while (!zookeeperHelper.Connected && timeOut-- > 0)
            //{
            //    Thread.Sleep(1000); //停一秒,等待連接完成
            //}
            if (!zookeeperHelper.Connected)
            {
                throw new TimeoutException($"connect to zookeeper [{options.Address}] timeout");
            }

            if (options.ReloadOnChange)
            {
                //監聽根節點下所有的znode節點,當任意節點發生改變后,即立刻重新加載
                zookeeperHelper.WatchAllAsync(ze =>
                {
                    if (ze.Type != ZookeeperEvent.EventType.None)
                    {
                        //使用一個異步去完成,同步會造成死鎖
                        Task.Run(() =>
                        {
                            zookeeperConfigurationProvider.Reload();
                        });
                    }
                }).Wait();
            }
        }

        /// <summary>
        /// 獲取Zookeeper中的數據
        /// </summary>
        /// <returns></returns>
        public IDictionary<string, string> Process()
        {
            var data = zookeeperHelper.GetDictionaryAsync(ConfigurationPath.KeyDelimiter).GetAwaiter().GetResult();
            if (!zookeeperConfigurationProvider.ZookeeperConfigurationSource.ZookeeperOptions.ExcludeRoot)
            {
                return data;
            }

            var root = this.zookeeperConfigurationProvider.ZookeeperConfigurationSource.ZookeeperOptions.RootPath ?? "";
            var rootSplits = root.Split(new string[] { "/" }, StringSplitOptions.RemoveEmptyEntries);
            IDictionary<string, string> dict = new Dictionary<string, string>();
            foreach (var key in data.Keys)
            {
                var split = key.Split(new string[] { ConfigurationPath.KeyDelimiter }, StringSplitOptions.RemoveEmptyEntries);
                var array = split.Skip(rootSplits.Length).ToArray();
                dict[ConfigurationPath.Combine(array)] = data[key];
            }
            return dict;
        }
        /// <summary>
        /// 釋放資源
        /// </summary>
        public virtual void Dispose()
        {
            zookeeperHelper.Dispose();
            zookeeperConfigurationProvider.Dispose();
        }
    }
ZookeeperConfigurationWatcher

  ZookeeperOptions是相關配置信息  

  
    public class ZookeeperOptions
    {
        /// <summary>
        /// 是否監控源數據變化
        /// </summary>
        public bool ReloadOnChange { get; set; } = true;
        /// <summary>
        /// 加載延遲
        /// </summary>
        public int ReloadDelay { get; set; } = 250;
        /// <summary>
        /// Zookeeper集群地址
        /// </summary>
        public string[] Address { get; set; }
        /// <summary>
        /// Zookeeper初始路徑
        /// </summary>
        public string RootPath { get; set; }
        /// <summary>
        /// 認證主題
        /// </summary>
        public AuthScheme Scheme { get; set; }
        /// <summary>
        /// 認證信息
        /// </summary>
        public string Auth { get; set; }
        /// <summary>
        /// 生成的配置中是否去掉初始路徑
        /// </summary>
        public bool ExcludeRoot { get; set; }
    }
ZookeeperOptions

   然后是Program類:  

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using AspNetCore.ZookeeperApi.Configurations;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace AspNetCore.ZookeeperApi
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateWebHostBuilder(args).Build().Run();
        }

        public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
            WebHost.CreateDefaultBuilder(args)
                .UseStartup<Startup>()
                .UseZookeeper(new ZookeeperOptions()
                {
                    //Auth = "user:123",
                    //Scheme = AuthScheme.Digest,
                    //Address = new string[] { "192.168.209.133:2181", "192.168.209.134:2181", "192.168.209.135:2181" },
                    Address = new string[] { "192.168.209.133:2181" },
                    RootPath = "/Test"
                });
    }
}

  其中UseZookeeper方法就是ZookeeperConfigurationExtensions的一個拓展方法,調用這個拓展方法之后,Zookeeper讀取的配置可以通過IConfiguration這個服務對象讀取了。

  UseZookeeper拓展方法傳入了一個參數,Address是Zookeeper連接地址,RootPath是項目配置在Zookeeper中的根節點路徑。

  假如我們在Zookeeper中創建一個znode節點:/Test,然后在/Test節點下創建5個子節點:/Test/Value1、/Test/Value2、/Test/Value3、/Test/Value4、/Test/Value5

  

    然后分別將數據保存到這5個節點中:  

   # Value1表示字符串類型數據 
  set /Test/Value1 'this is string values'
  # Value2表示整形類型數據
  set /Test/Value2 1
  # Value3表示浮點型數據
  set /Test/Value3 3.14
  # Value4表示布爾類型數據
  set /Test/Value4 false
  # Value5表示日期類型數據
  set /Test/Value5 '2020-06-02'

    上面說了,Zookeeper讀取的配置可以通過IConfiguration這個服務對象讀取,IConfiguration是.net core提供的配置服務,我們可以直接使用讀取,但是推薦使用Option方式讀取,比如為讀取/Test節點下的5個節點數據,我們可以創建一個TestOptions類:  

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace AspNetCore.ZookeeperApi
{
    public class TestOptions
    {
        /// <summary>
        /// 字符數據
        /// </summary>
        public string Value1 { get; set; }
        /// <summary>
        /// 整形數據
        /// </summary>
        public int Value2 { get; set; }
        /// <summary>
        /// 浮點型數據
        /// </summary>
        public decimal Value3 { get; set; }
        /// <summary>
        /// 布爾型數據
        /// </summary>
        public bool Value4 { get; set; }
        /// <summary>
        /// 日期數據
        /// </summary>
        public DateTime Value5 { get; set; }
    }
}

  然后在Startup的ConfigureServices方法中注冊  

   public void ConfigureServices(IServiceCollection services)
   {
       services.Configure<TestOptions>(Configuration.GetSection("Test"));

       services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
   }

  之后我們就可以直接使用Option注入的形式去訪問了,比如我們創建一個TestController的控制器:  

    [HttpGet]
    public object Get()
    {
        //使用IOptionsSnapshot<>,它是Scoped作用域
        var optionsSnapshot = HttpContext.RequestServices.GetService<IOptionsSnapshot<TestOptions>>();

        //因為IOptionsMonitor<>有緩存IOptionsMonitorCache<>,所以需要注意使用
        //如果使用service.Configure<TOptions>(IConfiguration)方法添加的Options,否則需要自行設置重新加載機制
        //重新加載可參考:https://www.cnblogs.com/shanfeng1000/p/15095236.html
        var optionsMonitor = HttpContext.RequestServices.GetService<IOptionsMonitor<TestOptions>>();

        return new
        {
            OptionsSnapshot = optionsSnapshot.Value,
            OptionsMonitor = optionsMonitor.CurrentValue
        };
    }

  然后運行項目,訪問這個接口地址就能得到:

  

    可以看到Zookeeper中的數據成功被讀取到,現在我們可以修改Zookeeper中/Test節點下的任意節點數據,如:  

    set /Test/Value2 2

  由於Zookeeper的監聽機制,重新調用接口,你會看到Value2的值也成功刷新了

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM