從源碼角度分析ScheduleMaster的節點管理流程


ScheduleMaster是一個開源的分布式任務調度系統,它基於.NET Core 3.1平台構建,支持跨平台多節點部署運行。

項目主頁:https://github.com/hey-hoho/ScheduleMasterCore

本篇從源碼角度分析一下節點控制的核心流程。


生命周期事件

生命周期事件增強了整個應用進程的控制能力,由於節點狀態與之關系密切,所以必須要首先了解下生命周期事件具體干了什么活。

借助於ASP.NET Core框架的HostedService模型,我們把生命周期管理器封裝在一個后台托管服務AppLifetimeHostedService中,在它的StartAsync方法中注冊了我們需要的事件:

public Task StartAsync(CancellationToken cancellationToken)
{
    _appLifetime.ApplicationStarted.Register(OnStarted);
    _appLifetime.ApplicationStopping.Register(OnStopping);
    _appLifetime.ApplicationStopped.Register(OnStopped);

    return Task.CompletedTask;
}

這里主要涉及的事件就是應用啟動和停止時所需要處理的邏輯,分別對應節點狀態的變更,下面重點說一下啟動事件。

ScheduleMaster采用了典型的中心化結構搭建,基於1個master節點和N和worker節點提供服務,其中master扮演了整個系統資源調度的角色,worker則是實際執行任務的角色。這樣的話,master就必須要感知到它所能調度的資源清單,所以系統引入了節點注冊概念。

根據注冊發起者的不同,可以分為如下兩種模式:

  • 自動注冊模式

  • 手動注冊模式


自動注冊模式

接觸過微服務架構的朋友應該會對服務注冊發現這一過程比較熟悉,借鑒了相似的設計,節點自動注冊就類似服務注冊的樣子,在節點啟動時自動把自身的配置信息注冊到控制中心,默認的方式就是從配置文件讀取節點信息,同時也支持使用命令行參數覆蓋配置文件中的字段:

private void OnStarted()
{
	// ....

    //判斷是否要自動根據配置文件注冊節點信息
    if (AppCommandResolver.IsAutoRegister())
    {
        _logger.LogInformation("enabled auto register...");
        // 設置節點信息
        ConfigurationCache.SetNode(_configuration);

        // ....
    }
}

public static void SetNode(IConfiguration configuration)
{
    NodeSetting = configuration.GetSection("NodeSetting").Get<NodeSetting>();
    string identity = AppCommandResolver.GetCommandLineArgsValue("identity");
    if (!string.IsNullOrEmpty(identity))
    {
        NodeSetting.IdentityName = identity;
    }
    string protocol = AppCommandResolver.GetCommandLineArgsValue("protocol");
    if (!string.IsNullOrEmpty(protocol))
    {
        NodeSetting.Protocol = protocol;
    }
    string ip = AppCommandResolver.GetCommandLineArgsValue("ip");
    if (!string.IsNullOrEmpty(ip))
    {
        NodeSetting.IP = ip;
    }
    string port = AppCommandResolver.GetCommandLineArgsValue("port");
    if (!string.IsNullOrEmpty(port))
    {
        NodeSetting.Port = Convert.ToInt32(port);
    }
    string priority = AppCommandResolver.GetCommandLineArgsValue("priority");
    if (!string.IsNullOrEmpty(priority))
    {
        NodeSetting.Priority = Convert.ToInt32(priority);
    }
    NodeSetting.MachineName = Environment.MachineName;
}

再看一下如何判斷節點是否開啟了自動注冊模式:

public static bool IsAutoRegister()
{
    //優先讀取環境參數
    string option = Environment.GetEnvironmentVariable("SMCORE_AUTOR");
    //再看命令行參數中是否也有設置
    string cmdArg = GetCommandLineArgsValue("autor");
    if (!string.IsNullOrEmpty(cmdArg))
    {
        option = cmdArg;
    }
    return option != "false";
}

很明顯,在節點啟動時如果指定了特定的環境變量SMCORE_AUTOR或命令行參數autor並且值為false即表示關閉自動注冊模式,否則默認開啟。
自動注冊流程

要注意的是,master節點只提供了自動注冊模式。


手動注冊模式

自動注冊模式雖然流程簡單,但是需要提前配置好節點信息,這對於節點彈性部署並不友好,因此為了增加系統靈活性,系統也提供了手動注冊節點的模式,這時候對worker注冊的主動權轉移到master手里,需要先在master控制台中創建好要注冊的節點,然后執行連接操作,最后啟動服務即可。
手動注冊流程

這個過程中比較核心的是連接驗證過程,設計這個流程的原因是為了保障創建連接的雙方是可信狀態,實現數據匹配,其核心過程為:

  • worker節點在啟動時通過環境變量SMCORE_WORKEROF或者命令行參數workerof指定歸屬的master名稱

  • 在控制台中對節點執行[連接]操作,master攜帶驗證信息對worker發起連接請求

  • 如果驗證通過,則使用指定的節點名稱去數據庫查詢完整的節點配置信息,並為worker節點緩存配置數據,worker生成一個新的訪問秘鑰返回

  • 標記節點狀態為空閑中,此時worker並不運行任何調度服務,處於空跑狀態

  • 對節點執行[啟用]操作,開啟調度功能

驗證連接過程的核心代碼為:

 public async Task<(bool success, string content)> Connect()
 {
    HttpClient client = CreateClient();
    client.DefaultRequestHeaders.Add("sm_connection", SecurityHelper.MD5(ConfigurationCache.NodeSetting.IdentityName));
    client.DefaultRequestHeaders.Add("sm_nameto", _server.NodeName);

    var response = await client.PostAsync("/api/server/connect", null);
    return (response.IsSuccessStatusCode, await response.Content.ReadAsStringAsync());
 }

 [HttpPost, AllowAnonymous]
 public IActionResult Connect()
 {
     string workerof = AppCommandResolver.GetTargetMasterName();
     string encodeKey = Request.Headers["sm_connection"].FirstOrDefault();
     if (string.IsNullOrEmpty(workerof) || string.IsNullOrEmpty(encodeKey))
     {
         _logger.LogWarning("connect failed! workerof or encodekey is null...");
         return BadRequest("Unauthorized Connection.");
     }
     if (!Core.Common.SecurityHelper.MD5(workerof).Equals(encodeKey))
     {
         _logger.LogWarning("connect failed! encodekey is unvalid, wokerof:{0}, encodekey:{1}", workerof, encodeKey);
         return BadRequest("Unauthorized Connection.");
     }
     string workerName = Request.Headers["sm_nameto"].FirstOrDefault();
     var node = _db.ServerNodes.FirstOrDefault(x => x.NodeName == workerName);
     if (node == null)
     {
         _logger.LogWarning("connect failed! unkown worker name:{0}...", workerName);
         return BadRequest("Unkown Worker Name.");
     }
     Core.ConfigurationCache.SetNode(node);
     string secret = Guid.NewGuid().ToString("n");
     QuartzManager.AccessSecret = secret;
     _logger.LogInformation("successfully connected to {0}!", workerof);
     LogHelper.Info($"與{workerof}連接成功~");
     return Ok(secret);
 }

健康檢查

健康檢查是為了保障不可用的worker節點及時被發現並剔除調度,其驗證方式使用了ASP.NET Core框架自帶的健康檢查機制中間件,通過訪問一個指定的路由地址獲取節點的健康情況,如果連續N次檢查失敗就把該節點強制剔除下線,多次檢查目的是為了避免因短暫的網絡抖動導致出現誤判情況,這個次數N可以根據實際情況進行配置,默認是3次。

首先master啟動的時候會注冊一個每分鍾執行一次的后台定時任務,這個任務會拉取所有狀態是非[下線]的worker節點,然后對其發起健康檢查請求:

    public class SystemSchedulerRegistry : Registry
    {
        public SystemSchedulerRegistry()
        {
            NonReentrantAsDefault();

            //對運行節點每分鍾一次心跳監測
            Schedule<WorkerCheckJob>().ToRunEvery(1).Minutes();
        }
    }

    internal class WorkerCheckJob : IJob
    {
        /// <summary>
        /// 執行計划
        /// </summary>
        public void Execute()
        {
            using (var scope = ConfigurationCache.RootServiceProvider.CreateScope())
            {
                Core.Interface.INodeService service = scope.ServiceProvider.GetService<Core.Interface.INodeService>();
                AutowiredServiceProvider provider = new AutowiredServiceProvider();
                provider.PropertyActivate(service, scope.ServiceProvider);
                service.WorkerHealthCheck();
            }
        }
    }

具體判斷節點無效的流程為:

  • 讀取系統配置的最大允許無響應次數

  • 給節點維護一個失敗計數器,本質是一個字典,key是節點名稱,value是連續失敗的次數

  • 對節點發起健康檢查請求,如果請求成功就更新節點的最后刷新時間,並把計數器歸0

  • 如果請求失敗但沒有達到最大失敗次數,把計數器加1,等待下次檢查

  • 如果已經達到最大失敗次數,則把節點標記下線,釋放該節點占據的鎖,同時把計數器歸0

worker的中間件注冊過程為:

public void ConfigureServices(IServiceCollection services)
{
	// ....

	services.AddHealthChecks();

	// ....
}

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
	// ....

    app.UseEndpoints(endpoints =>
    {
        endpoints.MapControllers();
        endpoints.MapHealthChecks("/health");
    });

	// ....
}

訪問控制

為了保證worker接口訪問的安全性,系統加入了動態秘鑰驗證機制,每次節點啟動或者被連接的時候都會生成一個新的秘鑰,持有合法秘鑰的請求才會被節點正常處理,否則直接返回401 Unauthorized。

public void OnActionExecuting(ActionExecutingContext context)
{
    var anonymous = (context.ActionDescriptor as ControllerActionDescriptor).MethodInfo.GetCustomAttributes(typeof(AllowAnonymousAttribute), false);
    if (anonymous.Any())
    {
        return;
    }
    var secret = context.HttpContext.Request.Headers["sm_secret"].FirstOrDefault();
    if (string.Compare(Common.QuartzManager.AccessSecret, secret, StringComparison.CurrentCultureIgnoreCase) != 0)
    {
        context.Result = new UnauthorizedObjectResult($"w:{Common.QuartzManager.AccessSecret} m:{secret}");
    }
}

節點訪問

在master控制台中對任務的操作最終都被分發到關聯的worker節點上,通過worker提供的webapi接口實現遠程調用。以啟動任務為例,我們看一下具體分發和遠程調用過程:

private async Task<bool> DispatcherHandler(Guid sid, RequestDelegate func)
{
    var nodeList = _nodeService.GetAvaliableWorkerForSchedule(sid);
    if (nodeList.Any())
    {
        foreach (var item in nodeList)
        {
            if (!await func(item))
            {
                return false;
            }
        }
        return true;
    }
    throw new InvalidOperationException("running worker not found.");
}

public async Task<bool> ScheduleStart(Guid sid)
{
    return await DispatcherHandler(sid, async (ServerNodeEntity node) =>
     {
         _scheduleClient.Server = node;
         return await _scheduleClient.Start(sid);
     });
}

可以看到,啟動操作會首先查詢任務的執行節點,然后依次遍歷執行遠程調用,只要其中一個節點執行命令失敗那么整個操作就會失敗。

最終的httpclient請求被封裝在Hos.ScheduleMaster.Core.Services.RemoteCaller.ServerClient類中,它的CreateClient方法從IHttpClientFactory獲取了一個客戶端實例,並把節點的訪問秘鑰放入請求頭中,以此完成安全性驗證:

protected HttpClient CreateClient()
{
    if (_server == null)
    {
        throw new ArgumentException("no target worker that can send the request.");
    }
    HttpClient client = _httpClientFactory.CreateClient("workercaller");
    client.DefaultRequestHeaders.Add("sm_secret", _server.AccessSecret);
    client.BaseAddress = new Uri($"{_server.AccessProtocol}://{_server.Host}");
    return client;
}

寫在最后

到這里基本把節點的核心操作都分析完畢了,希望能對關注這個項目的朋友帶來幫助~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM