一:簡介
自動計算都是常駐內存的,沒有人機交互。我們經常用到的就是console job和sql job了。sqljob有自己的宿主,與數據庫產品有很關聯,暫時不提。console job使用quartz.net框架,目前3.x已經支持netcore。
如果單台服務器運行計算,那就很簡單了,quartz很強大了,而且支持故障災難轉移集群,docker做宿主,很容易實現。但是分布式就不可同日而語了。如果你的一個數據處理太慢,需要多進程多主機處理,那么就需要多進程自動協調處理這一數據,比如如果你的訂單太多,而一個進程處理延遲是10秒,那用戶體驗就會非常不好,雖然異步已經提高了你的吞吐量,但是延遲太久,對后續業務還是造成很大的干擾,時不時的會進行停頓。如果兩到三台的機器進行處理,那延遲就會大大的減低,但是這兩到三台服務器如果分配處理任務?如何分割這個數據,多進程進行處理?就需要到這一篇講解的知識了。
在多個job應用之間進行協調的工具,就是zookeeper了,zookeeper官方介紹:一個分布式應用協調服務。其實他也是一個類似文件系統的寫一致的數據存儲軟件,我們可以使用它做分布式鎖,對應用進行協調控制。
目前流行的這一類產品也比較多,但是我是熟悉quartz,至於切片的功能,在quartz之上可以進行封裝,因為所要封裝的功能不多,所以我還是選擇了quartz。
二 zookeeper 服務
首先就是zookeeper服務,和前面log日志一樣,首先創建構建和配置文件類。
首先看看配置類:
ZookeeperConfiguration.cs

using Microsoft.Extensions.Configuration; namespace Walt.Framework.Configuration { public class ZookeeperConfiguration { public IConfiguration Configuration { get; } public ZookeeperConfiguration(IConfiguration configuration) { Configuration = configuration; } } }

using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Options; using Walt.Framework.Service.Zookeeper; namespace Walt.Framework.Configuration { public static class ZookeeperConfigurationExtensioncs { public static IZookeeperBuilder AddConfiguration(this IZookeeperBuilder builder ,IConfiguration configuration) { InitService( builder,configuration); return builder; } public static void InitService(IZookeeperBuilder builder,IConfiguration configuration) { builder.Services.TryAddSingleton<IConfigureOptions<ZookeeperOptions>>( new ZookeeperConfigurationOptions(configuration)); builder.Services.TryAddSingleton (ServiceDescriptor.Singleton<IOptionsChangeTokenSource<ZookeeperOptions>>( new ConfigurationChangeTokenSource<ZookeeperOptions>(configuration)) ); builder.Services .TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<ZookeeperOptions>> (new ConfigureFromConfigurationOptions<ZookeeperOptions>(configuration))); builder.Services.AddSingleton(new ZookeeperConfiguration(configuration)); } } }

using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Options; using Walt.Framework.Service.Zookeeper; namespace Walt.Framework.Configuration { public class ZookeeperConfigurationOptions : IConfigureOptions<ZookeeperOptions> { private readonly IConfiguration _configuration; public ZookeeperConfigurationOptions(IConfiguration configuration) { _configuration=configuration; } public void Configure(ZookeeperOptions options) { System.Diagnostics.Debug.WriteLine("zookeeper配置類,適配方法。" +Newtonsoft.Json.JsonConvert.SerializeObject(options)); } } }
以上這三個類就是配置類,下面是構建類和配置信息類:

using Microsoft.Extensions.DependencyInjection; namespace Walt.Framework.Service.Zookeeper { public class ZookeeperBuilder : IZookeeperBuilder { public IServiceCollection Services {get;} public ZookeeperBuilder(IServiceCollection services) { Services=services; } } }

using System; using System.Collections.Generic; namespace Walt.Framework.Service.Zookeeper { public class ZookeeperOptions { /// <param name="connectstring">comma separated host:port pairs, each corresponding to a zk server. /// e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional chroot suffix is used the example would look like: /// "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a" where the client would be rooted at "/app/a" and all paths would /// be relative to this root - ie getting/setting/etc... "/foo/bar" would result in operations being run on "/app/a/foo/bar" /// (from the server perspective).</param> public string Connectstring{get;set;} public int SessionTimeout{get;set;} public int SessionId{get;set;} public string SessionPwd{get;set;} public bool IsReadOnly{get;set;} } }

using System; using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using org.apache.zookeeper; using org.apache.zookeeper.data; using static org.apache.zookeeper.ZooKeeper; using System.Linq; using System.Linq.Expressions; using System.Threading; using static org.apache.zookeeper.Watcher.Event; using Newtonsoft.Json; using System.Collections.Concurrent; namespace Walt.Framework.Service.Zookeeper { internal class WaitLockWatch:Watcher { private AutoResetEvent _autoResetEvent; private ILogger _logger; private string _path; private ZookeeperService _zookeeperService; public string _tempNode; public WaitLockWatch(AutoResetEvent autoResetEvent ,ZookeeperService zookeeperService ,ILogger logger,string path ,string tempNode) { _autoResetEvent=autoResetEvent; _zookeeperService=zookeeperService; _logger=logger; _path=path; _tempNode=tempNode; } public override Task process(WatchedEvent @event) { _logger.LogDebug("{0}節點下子節點發生改變,激發監視方法。",_path); var childList=_zookeeperService.GetChildrenAsync(_path,null,true).Result; if(childList==null||childList.Children==null||childList.Children.Count<1) { _logger.LogDebug("獲取子序列失敗,計數為零.path:{0}",_path); return Task.FromResult(0); } var top=childList.Children.OrderBy(or=>or).First(); if(_path+"/"+top==_tempNode) { _logger.LogDebug("釋放阻塞"); _autoResetEvent.Set(); } return Task.FromResult(0); } } internal class WaitConnWatch : Watcher { private AutoResetEvent _autoResetEvent; private ILogger _logger; public WaitConnWatch(AutoResetEvent autoResetEvent ,ILogger logger) { _autoResetEvent=autoResetEvent; _logger=logger; } public override Task process(WatchedEvent @event) { _logger.LogDebug("watch激發,回掉狀態:{0}",@event.getState().ToString()); if(@event.getState()== KeeperState.SyncConnected ||@event.getState()== KeeperState.ConnectedReadOnly) { _logger.LogDebug("釋放阻塞"); _autoResetEvent.Set(); } return Task.FromResult(0); } } public class ZookeeperService : IZookeeperService { public List<string> requestLockSequence=new List<string>(); private object _lock=new object(); private ZookeeperOptions _zookeeperOptions; private ZooKeeper _zookeeper; private static readonly byte[] NO_PASSWORD = new byte[0]; public Watcher Wathcer {get;set;} public ILoggerFactory LoggerFac { get; set; } private ILogger _logger; AutoResetEvent[] autoResetEvent=new AutoResetEvent[2] {new AutoResetEvent(false),new AutoResetEvent(false)}; public ZookeeperService(IOptionsMonitor<ZookeeperOptions> zookeeperOptions ,ILoggerFactory loggerFac) { LoggerFac=loggerFac; _logger=LoggerFac.CreateLogger<ZookeeperService>(); _zookeeperOptions=zookeeperOptions.CurrentValue; _logger.LogDebug("配置參數:{0}",JsonConvert.SerializeObject(_zookeeperOptions)); zookeeperOptions.OnChange((zookopt,s)=>{ _zookeeperOptions=zookopt; }); _logger.LogDebug("開始連接"); Conn(_zookeeperOptions); } private void Conn(ZookeeperOptions zookeeperOptions) { bool isReadOnly=default(Boolean); Wathcer=new WaitConnWatch(autoResetEvent[0],_logger); if(isReadOnly!=zookeeperOptions.IsReadOnly) { isReadOnly=zookeeperOptions.IsReadOnly; } byte[] pwd=new byte[0]; //如果沒有密碼和sessionId if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd) &&_zookeeperOptions.SessionId==default(int)) { _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly); } else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd)) { pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd); _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,0,pwd,isReadOnly); } else { _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring ,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly); } if(_zookeeper.getState()==States.CONNECTING) { _logger.LogDebug("當前狀態:CONNECTING。阻塞等待"); autoResetEvent[0].WaitOne(); } } public Task<string> CreateZNode(string path,string data,CreateMode createMode,List<ACL> aclList) { ReConn(); if(string.IsNullOrEmpty(path)||!path.StartsWith('/')) { _logger.LogDebug("path路徑非法,參數:path:{0}",path); return null; } byte[] dat=new byte[0]; if(string.IsNullOrEmpty(data)) { dat=System.Text.Encoding.Default.GetBytes(data); } if(createMode==null) { _logger.LogDebug("createMode為null,默認使用CreateMode.PERSISTENT"); createMode=CreateMode.PERSISTENT; } return _zookeeper.createAsync(path,dat,aclList,createMode); } public Task<DataResult> GetDataAsync(string path,Watcher watcher,bool isSync) { ReConn(); if(_zookeeper.existsAsync(path).Result==null ) { _logger.LogDebug("path不存在"); return null; } if(isSync) { _logger.LogDebug("即將進行同步。"); var task=Task.Run(async ()=>{ await _zookeeper.sync(path); }); task.Wait(); } return _zookeeper.getDataAsync(path,watcher); } public Task<Stat> SetDataAsync(string path,string data,bool isSync) { ReConn(); if(_zookeeper.existsAsync(path).Result==null ) { _logger.LogDebug("path不存在"); return null; } byte[] dat=new byte[0]; if(!string.IsNullOrEmpty(data)) { dat=System.Text.Encoding.Default.GetBytes(data); } return _zookeeper.setDataAsync(path,dat); } public async Task<ChildrenResult> GetChildrenAsync(string path,Watcher watcher,bool isSync) { ReConn(); if(_zookeeper.existsAsync(path).Result==null ) { _logger.LogDebug("path不存在"); return null; } if(isSync) { _logger.LogDebug("即將進行同步。"); var task=Task.Run(async ()=>{ _logger.LogDebug("開始同步"); await _zookeeper.sync(path); }); task.Wait(); } return await _zookeeper.getChildrenAsync(path,watcher); } public void DeleteNode(string path,String tempNode) { if(!string.IsNullOrEmpty(tempNode)) { requestLockSequence.Remove(tempNode); } ReConn(); if(_zookeeper.existsAsync(path).Result==null ) { _logger.LogDebug("path不存在"); return; } var task=Task.Run(async ()=>{ _logger.LogDebug("刪除node:{0}",path); await _zookeeper.deleteAsync(path); }); task.Wait(); var sequencePath=requestLockSequence.Where(w=>path==w).FirstOrDefault(); if(sequencePath!=null) { requestLockSequence.Remove(sequencePath); } } public string GetDataByLockNode(string path,string sequenceName,List<ACL> aclList,out string tempNodeOut) { _logger.LogInformation("獲取分布式鎖開始。"); ReConn(); string tempNode=string.Empty; tempNodeOut=string.Empty; if(_zookeeper.existsAsync(path).Result==null ) { _logger.LogDebug("path不存在"); return null; } try { _logger.LogDebug("開始鎖定語句塊"); lock(_lock) { _logger.LogDebug("鎖定,訪問requestLockSequence的代碼應該同步。"); tempNode=requestLockSequence .Where(w=>w.StartsWith(path+"/"+sequenceName)).FirstOrDefault(); if(tempNode==null) { tempNode=CreateZNode(path+"/"+sequenceName,"",CreateMode.EPHEMERAL_SEQUENTIAL,aclList).Result; _logger.LogDebug("創建節點:{0}",tempNode); if(tempNode==null) { _logger.LogDebug("創建臨時序列節點失敗。詳細參數:path:{0},data:{1},CreateMode:{2}" ,path+"/squence","",CreateMode.EPHEMERAL_SEQUENTIAL); return null; } _logger.LogInformation("創建成功,加入requestLockSequence列表。"); requestLockSequence.Add(tempNode); } else { _logger.LogDebug("已經存在的鎖節點,返回null"); return null; } } var childList= GetChildrenAsync(path,null,true).Result; if(childList==null||childList.Children==null||childList.Children.Count<1) { _logger.LogDebug("獲取子序列失敗,計數為零.path:{0}",path); return null; } _logger.LogDebug("獲取path:{0}的子節點:{1}",path,Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children)); var top=childList.Children.OrderBy(or=>or).First(); byte[] da=null; if(path+"/"+top==tempNode) { tempNodeOut =tempNode; da= GetDataAsync(path,null,true).Result.Data; if(da==null||da.Length<1) { return string.Empty; } return System.Text.Encoding.Default.GetString(da); } else { childList= GetChildrenAsync(path,new WaitLockWatch(autoResetEvent[1],this,_logger,path,tempNode),true).Result; autoResetEvent[1].WaitOne(); } _logger.LogDebug("繼續執行。"); tempNodeOut =tempNode; da= GetDataAsync(path,null,true).Result.Data; if(da==null||da.Length<1) { return string.Empty; } return System.Text.Encoding.Default.GetString(da); } catch(Exception ep) { _logger.LogError(ep,"獲取同步鎖出現錯誤。"); if(!string.IsNullOrEmpty(tempNode)) { DeleteNode(tempNode,tempNode); } } return null; } private void ReConn() { _logger.LogInformation("檢查連接狀態"); if(_zookeeper.getState()==States.CLOSED ||_zookeeper.getState()== States.NOT_CONNECTED) { _logger.LogInformation("連接為關閉,開始重新連接。"); Conn(_zookeeperOptions); } } public void Close(string tempNode) { var task=Task.Run(async ()=>{ requestLockSequence.Remove(tempNode); await _zookeeper.closeAsync(); }); task.Wait(); } } }
前面的類如果了解了netcore的擴展服務和配置機制,就很簡單理解了,我們主要是講解這個服務類的功能。
首先看服務類其中的一段代碼:
private void Conn(ZookeeperOptions zookeeperOptions) { bool isReadOnly=default(Boolean); Wathcer=new WaitConnWatch(autoResetEvent[0],_logger); //監控連接是否連接成功 if(isReadOnly!=zookeeperOptions.IsReadOnly) { isReadOnly=zookeeperOptions.IsReadOnly; } byte[] pwd=new byte[0]; //如果沒有密碼和sessionId if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd) &&_zookeeperOptions.SessionId==default(int)) { _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly); } else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd)) { pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd); _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,0,pwd,isReadOnly); } else { _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring ,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly); } if(_zookeeper.getState()==States.CONNECTING) { _logger.LogDebug("當前狀態:CONNECTING。阻塞等待"); autoResetEvent[0].WaitOne(); } }
這個方法是連接zookeeper,我們在構造函數中調用它,注意,zookeeper是異步,所以需要watcher類監控和AutoResetEvent阻塞當前線程,因為如果不阻塞,在連接還沒建立的時候,后面調用,
會出現錯誤。在監控類被觸發的時候,執行取消阻塞。
internal class WaitConnWatch : Watcher { private AutoResetEvent _autoResetEvent; private ILogger _logger; public WaitConnWatch(AutoResetEvent autoResetEvent ,ILogger logger) { _autoResetEvent=autoResetEvent; _logger=logger; } public override Task process(WatchedEvent @event) { _logger.LogDebug("watch激發,回掉狀態:{0}",@event.getState().ToString()); if(@event.getState()== KeeperState.SyncConnected ||@event.getState()== KeeperState.ConnectedReadOnly) { _logger.LogDebug("釋放阻塞"); _autoResetEvent.Set(); //取消阻塞當前線程。 } return Task.FromResult(0); } }
大家看zookeeper服務類,里面很多方法在執行前需要執行conn方法,這只是一種失敗重連的機制,因為一般沒有線程池的,我都會給這個服務單例。哪有人會問,如果失敗重連,會不會阻塞當前應用,
這個不會,因為netcore是多線程的,但是會降低這個應用的生產力。我前面翻譯過一篇net的關於線程的知識,后面也會單獨一篇講解netcore的線程模型。還有代碼中我很少用異常去處理容錯性,盡量拋出原生的異常,使用日志去記錄,然后為這個類返回個null,異常對性能也有一定的消耗,當然看自己習慣了,目的都是為了應用的健壯性。
其他方法是操作zookeeper的類,大家可以看我貼出來的代碼。因為zookeeper最出名的估計就是分布式鎖了,所以就把這個功能加進來。
public string GetDataByLockNode(string path,string sequenceName,List<ACL> aclList,out string tempNodeOut) { _logger.LogInformation("獲取分布式鎖開始。"); ReConn(); string tempNode=string.Empty; tempNodeOut=string.Empty; if(_zookeeper.existsAsync(path).Result==null ) { _logger.LogDebug("path不存在"); return null; } try { _logger.LogDebug("開始鎖定語句塊"); lock(_lock) //這是我為了防止重復提交做的防止並發,當然實際用的地方是協調應用之間的功能,而肯定不會用人機交互。你可以把這個看作多此一舉。 { _logger.LogDebug("鎖定,訪問requestLockSequence的代碼應該同步。"); tempNode=requestLockSequence .Where(w=>w.StartsWith(path+"/"+sequenceName)).FirstOrDefault(); if(tempNode==null) { tempNode=CreateZNode(path+"/"+sequenceName,"",CreateMode.EPHEMERAL_SEQUENTIAL,aclList).Result; _logger.LogDebug("創建節點:{0}",tempNode); if(tempNode==null) { _logger.LogDebug("創建臨時序列節點失敗。詳細參數:path:{0},data:{1},CreateMode:{2}" ,path+"/squence","",CreateMode.EPHEMERAL_SEQUENTIAL); return null; } _logger.LogInformation("創建成功,加入requestLockSequence列表。"); requestLockSequence.Add(tempNode); } else { _logger.LogDebug("已經存在的鎖節點,返回null"); return null; } } var childList= GetChildrenAsync(path,null,true).Result; //首先獲取lock子節點。 if(childList==null||childList.Children==null||childList.Children.Count<1) { _logger.LogDebug("獲取子序列失敗,計數為零.path:{0}",path); return null; } _logger.LogDebug("獲取path:{0}的子節點:{1}",path,Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children)); var top=childList.Children.OrderBy(or=>or).First(); byte[] da=null; if(path+"/"+top==tempNode) //判斷是否是當前自己的節點在隊列頂端。 { tempNodeOut =tempNode; da= GetDataAsync(path,null,true).Result.Data; if(da==null||da.Length<1) { return string.Empty; } return System.Text.Encoding.Default.GetString(da); } else { childList= GetChildrenAsync(path,new WaitLockWatch(autoResetEvent[1],this,_logger,path,tempNode),true).Result; //如果自己不再隊列頂端,則加監聽等待這個節點有更改。 autoResetEvent[1].WaitOne(); } _logger.LogDebug("繼續執行。"); tempNodeOut =tempNode; da= GetDataAsync(path,null,true).Result.Data; if(da==null||da.Length<1) { return string.Empty; } return System.Text.Encoding.Default.GetString(da); } catch(Exception ep) { _logger.LogError(ep,"獲取同步鎖出現錯誤。"); if(!string.IsNullOrEmpty(tempNode)) { DeleteNode(tempNode,tempNode); } } return null; }
接下來看監聽類:
internal class WaitLockWatch:Watcher { private AutoResetEvent _autoResetEvent; private ILogger _logger; private string _path; private ZookeeperService _zookeeperService; public string _tempNode; public WaitLockWatch(AutoResetEvent autoResetEvent ,ZookeeperService zookeeperService ,ILogger logger,string path ,string tempNode) { _autoResetEvent=autoResetEvent; _zookeeperService=zookeeperService; _logger=logger; _path=path; _tempNode=tempNode; } public override Task process(WatchedEvent @event) { _logger.LogDebug("{0}節點下子節點發生改變,激發監視方法。",_path); var childList=_zookeeperService.GetChildrenAsync(_path,null,true).Result; if(childList==null||childList.Children==null||childList.Children.Count<1) { _logger.LogDebug("獲取子序列失敗,計數為零.path:{0}",_path); return Task.FromResult(0); } var top=childList.Children.OrderBy(or=>or).First(); if(_path+"/"+top==_tempNode) //判斷當前節點是否隊列頂端 { _logger.LogDebug("釋放阻塞"); _autoResetEvent.Set(); } return Task.FromResult(0); } }
這個類,在path節點每次更改或者子節點更改的時候都會激發,僅僅是判斷當前節點是不是列表的頂端,再執行 _autoResetEvent.Set();釋放阻塞讓繼續執行。
編譯提交nuget,然后集成到測試程序看效果。
首先看zookeeper:
目前節點下沒有一個子節點,再看看這個節點的值:
首先新建一個看看:
create -s -e /testzookeeper/sequence ""
然后啟動程序,當程序提交一個子節點,他是排序的:
我們刪除我們剛剛用命令行創建的節點:
delete /testzookeeper/sequence0000000072
然后再看值:
我們的客戶端就是獲取到鎖后更改這個值,增加了一段字符串,客戶端調用如下:
[HttpGet("{id}")] public ActionResult<string> Get(int id) { string sequenceNode=string.Empty; string nodename=_zookeeperService.GetDataByLockNode("/testzookeeper","sequence",ZooDefs.Ids.OPEN_ACL_UNSAFE,out sequenceNode); if(sequenceNode==null) { return "獲取分布式鎖失敗,請查看日志。"; } _zookeeperService.SetDataAsync("/testzookeeper",nodename+"執行了"+DateTime.Now.ToString("yyyyMMddhhmmss"),false); if(!string.IsNullOrEmpty(sequenceNode)) { _zookeeperService.DeleteNode(sequenceNode,sequenceNode); return "取得鎖並且成功處理數據,釋放鎖成功。"; } return "出現錯誤,請查日志。"; }
當然這個是我做測試用的,實際開發,web就是用來做他的數據庫存取的,盡量不要去做一些額外的功能,因為i守護進程或者后台服務和job有它自己的任務。
下一節我們就實際 使用這個分布式鎖做quartz的切片功能。