ScheduleMaster是一個開源的分布式任務調度系統,它基於.NET Core 3.1平台構建,支持跨平台多節點部署運行。
項目主頁:https://github.com/hey-hoho/ScheduleMasterCore
本篇從源碼角度分析一下節點控制的核心流程。
生命周期事件
生命周期事件增強了整個應用進程的控制能力,由於節點狀態與之關系密切,所以必須要首先了解下生命周期事件具體干了什么活。
借助於ASP.NET Core框架的HostedService模型,我們把生命周期管理器封裝在一個后台托管服務AppLifetimeHostedService中,在它的StartAsync方法中注冊了我們需要的事件:
public Task StartAsync(CancellationToken cancellationToken)
{
_appLifetime.ApplicationStarted.Register(OnStarted);
_appLifetime.ApplicationStopping.Register(OnStopping);
_appLifetime.ApplicationStopped.Register(OnStopped);
return Task.CompletedTask;
}
這里主要涉及的事件就是應用啟動和停止時所需要處理的邏輯,分別對應節點狀態的變更,下面重點說一下啟動事件。
ScheduleMaster采用了典型的中心化結構搭建,基於1個master節點和N和worker節點提供服務,其中master扮演了整個系統資源調度的角色,worker則是實際執行任務的角色。這樣的話,master就必須要感知到它所能調度的資源清單,所以系統引入了節點注冊概念。
根據注冊發起者的不同,可以分為如下兩種模式:
-
自動注冊模式
-
手動注冊模式
自動注冊模式
接觸過微服務架構的朋友應該會對服務注冊發現這一過程比較熟悉,借鑒了相似的設計,節點自動注冊就類似服務注冊的樣子,在節點啟動時自動把自身的配置信息注冊到控制中心,默認的方式就是從配置文件讀取節點信息,同時也支持使用命令行參數覆蓋配置文件中的字段:
private void OnStarted()
{
// ....
//判斷是否要自動根據配置文件注冊節點信息
if (AppCommandResolver.IsAutoRegister())
{
_logger.LogInformation("enabled auto register...");
// 設置節點信息
ConfigurationCache.SetNode(_configuration);
// ....
}
}
public static void SetNode(IConfiguration configuration)
{
NodeSetting = configuration.GetSection("NodeSetting").Get<NodeSetting>();
string identity = AppCommandResolver.GetCommandLineArgsValue("identity");
if (!string.IsNullOrEmpty(identity))
{
NodeSetting.IdentityName = identity;
}
string protocol = AppCommandResolver.GetCommandLineArgsValue("protocol");
if (!string.IsNullOrEmpty(protocol))
{
NodeSetting.Protocol = protocol;
}
string ip = AppCommandResolver.GetCommandLineArgsValue("ip");
if (!string.IsNullOrEmpty(ip))
{
NodeSetting.IP = ip;
}
string port = AppCommandResolver.GetCommandLineArgsValue("port");
if (!string.IsNullOrEmpty(port))
{
NodeSetting.Port = Convert.ToInt32(port);
}
string priority = AppCommandResolver.GetCommandLineArgsValue("priority");
if (!string.IsNullOrEmpty(priority))
{
NodeSetting.Priority = Convert.ToInt32(priority);
}
NodeSetting.MachineName = Environment.MachineName;
}
再看一下如何判斷節點是否開啟了自動注冊模式:
public static bool IsAutoRegister()
{
//優先讀取環境參數
string option = Environment.GetEnvironmentVariable("SMCORE_AUTOR");
//再看命令行參數中是否也有設置
string cmdArg = GetCommandLineArgsValue("autor");
if (!string.IsNullOrEmpty(cmdArg))
{
option = cmdArg;
}
return option != "false";
}
很明顯,在節點啟動時如果指定了特定的環境變量SMCORE_AUTOR或命令行參數autor並且值為false即表示關閉自動注冊模式,否則默認開啟。

要注意的是,master節點只提供了自動注冊模式。
手動注冊模式
自動注冊模式雖然流程簡單,但是需要提前配置好節點信息,這對於節點彈性部署並不友好,因此為了增加系統靈活性,系統也提供了手動注冊節點的模式,這時候對worker注冊的主動權轉移到master手里,需要先在master控制台中創建好要注冊的節點,然后執行連接操作,最后啟動服務即可。

這個過程中比較核心的是連接驗證過程,設計這個流程的原因是為了保障創建連接的雙方是可信狀態,實現數據匹配,其核心過程為:
-
worker節點在啟動時通過環境變量
SMCORE_WORKEROF或者命令行參數workerof指定歸屬的master名稱 -
在控制台中對節點執行[連接]操作,master攜帶驗證信息對worker發起連接請求
-
如果驗證通過,則使用指定的節點名稱去數據庫查詢完整的節點配置信息,並為worker節點緩存配置數據,worker生成一個新的訪問秘鑰返回
-
標記節點狀態為空閑中,此時worker並不運行任何調度服務,處於空跑狀態
-
對節點執行[啟用]操作,開啟調度功能
驗證連接過程的核心代碼為:
public async Task<(bool success, string content)> Connect()
{
HttpClient client = CreateClient();
client.DefaultRequestHeaders.Add("sm_connection", SecurityHelper.MD5(ConfigurationCache.NodeSetting.IdentityName));
client.DefaultRequestHeaders.Add("sm_nameto", _server.NodeName);
var response = await client.PostAsync("/api/server/connect", null);
return (response.IsSuccessStatusCode, await response.Content.ReadAsStringAsync());
}
[HttpPost, AllowAnonymous]
public IActionResult Connect()
{
string workerof = AppCommandResolver.GetTargetMasterName();
string encodeKey = Request.Headers["sm_connection"].FirstOrDefault();
if (string.IsNullOrEmpty(workerof) || string.IsNullOrEmpty(encodeKey))
{
_logger.LogWarning("connect failed! workerof or encodekey is null...");
return BadRequest("Unauthorized Connection.");
}
if (!Core.Common.SecurityHelper.MD5(workerof).Equals(encodeKey))
{
_logger.LogWarning("connect failed! encodekey is unvalid, wokerof:{0}, encodekey:{1}", workerof, encodeKey);
return BadRequest("Unauthorized Connection.");
}
string workerName = Request.Headers["sm_nameto"].FirstOrDefault();
var node = _db.ServerNodes.FirstOrDefault(x => x.NodeName == workerName);
if (node == null)
{
_logger.LogWarning("connect failed! unkown worker name:{0}...", workerName);
return BadRequest("Unkown Worker Name.");
}
Core.ConfigurationCache.SetNode(node);
string secret = Guid.NewGuid().ToString("n");
QuartzManager.AccessSecret = secret;
_logger.LogInformation("successfully connected to {0}!", workerof);
LogHelper.Info($"與{workerof}連接成功~");
return Ok(secret);
}
健康檢查
健康檢查是為了保障不可用的worker節點及時被發現並剔除調度,其驗證方式使用了ASP.NET Core框架自帶的健康檢查機制中間件,通過訪問一個指定的路由地址獲取節點的健康情況,如果連續N次檢查失敗就把該節點強制剔除下線,多次檢查目的是為了避免因短暫的網絡抖動導致出現誤判情況,這個次數N可以根據實際情況進行配置,默認是3次。
首先master啟動的時候會注冊一個每分鍾執行一次的后台定時任務,這個任務會拉取所有狀態是非[下線]的worker節點,然后對其發起健康檢查請求:
public class SystemSchedulerRegistry : Registry
{
public SystemSchedulerRegistry()
{
NonReentrantAsDefault();
//對運行節點每分鍾一次心跳監測
Schedule<WorkerCheckJob>().ToRunEvery(1).Minutes();
}
}
internal class WorkerCheckJob : IJob
{
/// <summary>
/// 執行計划
/// </summary>
public void Execute()
{
using (var scope = ConfigurationCache.RootServiceProvider.CreateScope())
{
Core.Interface.INodeService service = scope.ServiceProvider.GetService<Core.Interface.INodeService>();
AutowiredServiceProvider provider = new AutowiredServiceProvider();
provider.PropertyActivate(service, scope.ServiceProvider);
service.WorkerHealthCheck();
}
}
}
具體判斷節點無效的流程為:
-
讀取系統配置的最大允許無響應次數
-
給節點維護一個失敗計數器,本質是一個字典,key是節點名稱,value是連續失敗的次數
-
對節點發起健康檢查請求,如果請求成功就更新節點的最后刷新時間,並把計數器歸0
-
如果請求失敗但沒有達到最大失敗次數,把計數器加1,等待下次檢查
-
如果已經達到最大失敗次數,則把節點標記下線,釋放該節點占據的鎖,同時把計數器歸0
worker的中間件注冊過程為:
public void ConfigureServices(IServiceCollection services)
{
// ....
services.AddHealthChecks();
// ....
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
// ....
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
endpoints.MapHealthChecks("/health");
});
// ....
}
訪問控制
為了保證worker接口訪問的安全性,系統加入了動態秘鑰驗證機制,每次節點啟動或者被連接的時候都會生成一個新的秘鑰,持有合法秘鑰的請求才會被節點正常處理,否則直接返回401 Unauthorized。
public void OnActionExecuting(ActionExecutingContext context)
{
var anonymous = (context.ActionDescriptor as ControllerActionDescriptor).MethodInfo.GetCustomAttributes(typeof(AllowAnonymousAttribute), false);
if (anonymous.Any())
{
return;
}
var secret = context.HttpContext.Request.Headers["sm_secret"].FirstOrDefault();
if (string.Compare(Common.QuartzManager.AccessSecret, secret, StringComparison.CurrentCultureIgnoreCase) != 0)
{
context.Result = new UnauthorizedObjectResult($"w:{Common.QuartzManager.AccessSecret} m:{secret}");
}
}
節點訪問
在master控制台中對任務的操作最終都被分發到關聯的worker節點上,通過worker提供的webapi接口實現遠程調用。以啟動任務為例,我們看一下具體分發和遠程調用過程:
private async Task<bool> DispatcherHandler(Guid sid, RequestDelegate func)
{
var nodeList = _nodeService.GetAvaliableWorkerForSchedule(sid);
if (nodeList.Any())
{
foreach (var item in nodeList)
{
if (!await func(item))
{
return false;
}
}
return true;
}
throw new InvalidOperationException("running worker not found.");
}
public async Task<bool> ScheduleStart(Guid sid)
{
return await DispatcherHandler(sid, async (ServerNodeEntity node) =>
{
_scheduleClient.Server = node;
return await _scheduleClient.Start(sid);
});
}
可以看到,啟動操作會首先查詢任務的執行節點,然后依次遍歷執行遠程調用,只要其中一個節點執行命令失敗那么整個操作就會失敗。
最終的httpclient請求被封裝在Hos.ScheduleMaster.Core.Services.RemoteCaller.ServerClient類中,它的CreateClient方法從IHttpClientFactory獲取了一個客戶端實例,並把節點的訪問秘鑰放入請求頭中,以此完成安全性驗證:
protected HttpClient CreateClient()
{
if (_server == null)
{
throw new ArgumentException("no target worker that can send the request.");
}
HttpClient client = _httpClientFactory.CreateClient("workercaller");
client.DefaultRequestHeaders.Add("sm_secret", _server.AccessSecret);
client.BaseAddress = new Uri($"{_server.AccessProtocol}://{_server.Host}");
return client;
}
寫在最后
到這里基本把節點的核心操作都分析完畢了,希望能對關注這個項目的朋友帶來幫助~
