背景
有一套特定規格的應用(程序+數據庫),當有業務需求時,就需要多部署應用,並且所有的應用都使用一個共同的后台來管理。應用新增后,如何通知后台更新連接串成了一個關鍵的問題。於是就產生了使用ZooKeeper管理數據庫連接串的奇思異想。具體方案如下:
1. 運維負責搭建數據庫,並執行初始化腳本,然后把對應的數據庫配置刷入ZooKeeper;
2. 運維完成App(1...N)的部署,App(1...N)從ZooKeeper讀取對應的數據庫配置;
3. 后台監聽ZooKeeper,更新數據庫配置到后台應用內存。
環境准備
1. 安裝Zookeeper
docker pull zookeeper:3.4.13
docker run --name zookeeper -d -p 2181:2181 zookeeper:3.4.13
2. 安裝Mysql
docker pull mysql:5.7
docker run --name mysql -e MYSQL_ROOT_PASSWORD=root -p 3306:3306 -d mysql:5.7
docker run --name mysql2 -e MYSQL_ROOT_PASSWORD=root -p 3307:3306 -d mysql:5.7
docker run --name mysql3 -e MYSQL_ROOT_PASSWORD=root -p 3308:3306 -d mysql:5.7
3. 初始化數據庫
CREATE DATABASE test; USE test; CREATE TABLE `table` ( `id` int(11) NOT NULL, `name` varchar(50) NOT NULL, PRIMARY KEY (`id`) );
分別在各個數據庫插入測試數據
mysql:
USE test; INSERT INTO `table` (id, name) VALUES (1, 'A1'); INSERT INTO `table` (id, name) VALUES (2, 'B1'); INSERT INTO `table` (id, name) VALUES (3, 'C1');
mysql2:
USE test; INSERT INTO `table` (id, name) VALUES (1, 'A2'); INSERT INTO `table` (id, name) VALUES (2, 'B2'); INSERT INTO `table` (id, name) VALUES (3, 'C2');
mysql3:
USE test; INSERT INTO `table` (id, name) VALUES (1, 'A3'); INSERT INTO `table` (id, name) VALUES (2, 'B3'); INSERT INTO `table` (id, name) VALUES (3, 'C3');
4. 基於數據庫生成POCO
Install-Package MySql.Data.EntityFrameworkCore -Version 8.0.13
Scaffold-DbContext "server=127.0.0.1;port=3306;user=root;password=123456;database=test" MySql.Data.EntityFrameworkCore -OutputDir DataAccess -f
5. 引用ZooKeeper相關組件
Install-Package ZooKeeperNetEx -Version 3.4.12.1
核心代碼
1. ZookeeperOption:從appsettings中讀取ZooKeeper相關配置
public class ZookeeperOption { public ZookeeperOption(IConfiguration config) { if (config == null) { throw new ArgumentNullException(nameof(config)); } var section = config.GetSection("zookeeper"); section.Bind(this); } public string ConnectionString { get; set; } public int Timeout { get; set; } }
2. ZookeeperServiceCollectionExtensions:注冊ZooKeeper服務
public static class ZookeeperServiceCollectionExtensions { public static IServiceCollection AddZookeeper(this IServiceCollection services, IConfiguration config) { if (services == null) { throw new ArgumentNullException(nameof(services)); } if (config == null) { throw new ArgumentNullException(nameof(config)); } services.AddOptions(); var option = new ZookeeperOption(config); var zookeeper = new org.apache.zookeeper.ZooKeeper(option.ConnectionString, option.Timeout * 1000, new DefaultWatcher()); services.Add(ServiceDescriptor.Singleton(zookeeper)); return services; } } public class DefaultWatcher : Watcher { public override Task process(WatchedEvent @event) { return Task.CompletedTask; } }
3. ZookeeperHandler:ZooKeeper初始化及目錄變化處理類,並把數據庫連接信息寫入程序內存
public interface IZookeeperHandler { Task InitAsync(); Task RefreshAsync(); } public class ZookeeperHandler: IZookeeperHandler { private readonly org.apache.zookeeper.ZooKeeper _zooKeeper; private readonly IMemoryCache _cache; public ZookeeperHandler(org.apache.zookeeper.ZooKeeper zooKeeper, IMemoryCache cache) { _zooKeeper = zooKeeper; _cache = cache; } public async Task InitAsync() { await RefreshAsync(); } public async Task RefreshAsync() { var connDic = new Dictionary<string, string>(); var isExisted = await _zooKeeper.existsAsync("/connections"); if (isExisted == null) { await _zooKeeper.createAsync("/connections", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } var connResult = await _zooKeeper.getChildrenAsync("/connections", new ConnectionWatcher(this)); foreach (var conn in connResult.Children) { var connData = await _zooKeeper.getDataAsync($"/connections/{conn}/value"); var connStr = Encoding.UTF8.GetString(connData.Data); connDic[conn] = connStr; } _cache.Set("connections", connDic); } }
4. ConnectionWatcher:監聽者,內容變化時調用ZookeeperHandler的RefreshAsync()方法,其中,變化只通知一次,因此需要再次建立監聽
public class ConnectionWatcher : Watcher { private readonly IZookeeperHandler _zookeeperService; public ConnectionWatcher(IZookeeperHandler zookeeperService) { _zookeeperService = zookeeperService; } public override async Task process(WatchedEvent @event) { var type = @event.get_Type(); if (type != Event.EventType.None) { await _zookeeperService.RefreshAsync(); } } }
5. ZookeeperApplicationBuilderExtensions:初始化
public static class ZookeeperApplicationBuilderExtensions { public static IApplicationBuilder UseZookeeper(this IApplicationBuilder app) { var service = app.ApplicationServices.GetRequiredService<IZookeeperHandler>(); service.InitAsync().Wait(); return app; } }
6. ContextProvider:根據Id從內存中讀取對應的數據庫連接串,並提供DbContext實例
public interface IContextProvider { TestContext GetContext(string id); } public class ContextProvider : IContextProvider { private readonly IMemoryCache _cache; public ContextProvider(IMemoryCache cache) { _cache = cache; } public TestContext GetContext(string id) { var dic = _cache.Get<Dictionary<string, string>>("connections"); var connectionStr = dic[id]; var optionsBuilder = new DbContextOptionsBuilder<TestContext>(); optionsBuilder.UseMySQL(connectionStr); return new TestContext(optionsBuilder.Options); } }
效果演示
1. 剛開始沒有任何連接信息
2. 添加一個連接信息
3. 查詢連接對應的數據
4. 再添加兩個連接信息
5. 查看ZooKeeper的信息
docker run -it --rm --link zookeeper:zookeeper zookeeper:3.4.13 zkCli.sh -server zookeeper
ls /connections
get /connections/1/value
get /connections/2/value
get /connections/3/value
補充說明
因為把數據庫連接信息寫到了程序內存中,因此,如果當ZooKeeper出現了故障:
1. 老的(正在運行)應用正在使用的數據庫不會受到影響,但無法監聽到數據庫信息的變化;
2. 新的應用無法啟動。
ZooKeeper恢復后:
1. 老的(正在運行)應用會重連,重新監聽到數據庫信息的變化
2. 新的應用可以成功啟動。