.net core 下 Quartz 任務調度的使用和配置 


Quarzt 是一個開源的作業調度框架;允許程序開發人員根據時間間隔來調度作業;實現了任務和觸發器的多對多關系.

1.Quarzt 核心概念:
  • Job : 表示一個工作,要執行的具體內容; 接口中定義了一個方法 void excute(JobExecutionContext context);
  • JobDetail:  表示一個具體的可執行的調度程序;包含了這個任務調度的方案和策略.
  • Trigger:  一個調度參數的配置,配置調用的時間和周期.框架提供了5種觸發器類型(SimpleTrigger、CronTrigger、DateIntervalTrigger、 NthlncludedDayTrigger、Calendar類).
  • Scheduler: 一個調度容器可以注冊多個JobDetail和Trigger.調度需要組合JobDetail和Trigger;

附: 常用的trigger為SimpleTrigger和CronTrigger. 
SimpleTrigger 執行N次,重復N次;
CronTrigger: 幾秒 幾分 幾時 哪日 哪月 哪周 哪年執行

quartz表達式在線網址:  http://cron.qqe2.com/

1.1 Quarzt 運行環境:

可以嵌入在應用程序,也可以作為一個獨立的程序運行.

1.2.Quarzt 存儲方式:
  • RAMJobStore(內存作業存儲類型)和JDBCJobStore(數據庫作業存儲類型),兩種方式對比如下:
優點 缺點
RAMJobStore 不要外部數據庫,配置容易,運行速度快 因為調度程序信息是存儲在被分配給JVM的內存里面, 所以,當應用程序停止運行時,所有調度信息將被丟失。另外因為存儲到JVM內存里面,所以可以存儲多少個Job和Trigger將會受到限制
BCJobStor 支持集群,因為所有的任務信息都會保存 運行速度的快慢取決與連接數據庫的快慢到數據庫中,可以控制事物,還有就是如果應用服務器關閉或者重啟,任務信息都不會丟失,並且可以恢復因服務器關閉或者重啟而導致執行失敗的任務
2. Quarzt 在.net的實現
2.1 Quarzt ISchedulerCenter接口定義
  • ISchedulerCenter接口主要定義了以下幾個接口

開啟任務調度 StartScheduleAsync()
停止任務調度 StopScheduleAsync()
添加一個任務 AddScheduleJobAsync(TasksQz sysSchedule)
停止一個任務 StopScheduleJobAsync(TasksQz sysSchedule)
恢復一個任務 ResumeJob(TasksQz sysSchedule)

    /// <summary>
    /// 服務調度接口
    /// </summary>
    public interface ISchedulerCenter
    {

        /// <summary>
        /// 開啟任務調度
        /// </summary>
        /// <returns></returns>
        Task<MessageModel<string>> StartScheduleAsync();

        /// <summary>
        /// 停止任務調度
        /// </summary>
        /// <returns></returns>
        Task<MessageModel<string>> StopScheduleAsync();

        /// <summary>
        /// 添加一個任務
        /// </summary>
        /// <returns></returns>
        Task<MessageModel<string>> AddScheduleJobAsync(TasksQz sysSchedule);

       
        /// <summary>
        /// 停止一個任務
        /// </summary>
        /// <returns></returns>
        Task<MessageModel<string>> StopScheduleJobAsync(TasksQz sysSchedule);

        /// <summary>
        /// 恢復一個任務
        /// </summary>
        /// <returns></returns>
        Task<MessageModel<string>> ResumeJob(TasksQz sysSchedule);

    }



2.2 Quarzt 任務調度服務中心 SchedulerCenterServer 封裝
  • 任務調度中心 SchedulerCenterServer 類的實現, 繼承實現 ISchedulerCenter 接口;包含任務的開啟、任務的關閉方法;以及對指定任務的開啟、暫停和恢復方法的封裝;

    /// <summary>
    /// 任務調度管理中心
    /// </summary>
    public class SchedulerCenterServer : ISchedulerCenter
    {
        private Task<IScheduler> _scheduler;
        private readonly IJobFactory _iocjobFactory;

        public SchedulerCenterServer(IJobFactory jobFactory)
        {
            _iocjobFactory = jobFactory;
            _scheduler = GetSchedulerAsync();
        }

        private Task<IScheduler> GetSchedulerAsync()
        {
            if (_scheduler != null)
                return this._scheduler;
            else
            {
                //從Factory中獲取Scheduler實例
                NameValueCollection collection = new NameValueCollection
                {
                    {"quartz.serializer.type","binary" }
                };
                StdSchedulerFactory factory = new StdSchedulerFactory(collection);
                return _scheduler = factory.GetScheduler();
            }
        }


        /// <summary>
        /// 開啟任務調度
        /// </summary>
        /// <returns></returns>
        public async Task<MessageModel<string>> StartScheduleAsync()
        {
            var result = new MessageModel<string>();
            try
            {
                this._scheduler.Result.JobFactory = this._iocjobFactory;
                if (!this._scheduler.Result.IsStarted)
                {
                    //等待任務運行完成
                    await this._scheduler.Result.Start();
                    await Console.Out.WriteLineAsync("任務調度開啟!");
                    result.success = true;
                    result.msg = $"任務調度開啟成功";
                    return result;
                }
                else
                {
                    result.success = false;
                    result.msg = $"任務調度已經開啟";
                    return result;
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
            throw new NotImplementedException();
        }

        /// <summary>
        /// 停止任務調度
        /// </summary>
        /// <returns></returns>
        public async Task<MessageModel<string>> StopScheduleAsync()
        {
            var result = new MessageModel<string>();
            try
            {
                if (!this._scheduler.Result.IsShutdown)
                {
                    //等待任務運行完成
                    await this._scheduler.Result.Shutdown();
                    await Console.Out.WriteLineAsync("任務調度停止! ");

                    result.success = true;
                    result.msg = $"任務調度停止成功";
                    return result;
                }
                else
                {
                    result.success = false;
                    result.msg = $"任務調度已經停止";
                    return result;
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
            throw new NotImplementedException();
        }




        /// <summary>
        /// 添加一個計划任務(映射程序集指定IJob實現類)
        /// </summary>
        /// <param name="sysSchedule"></param>
        /// <returns></returns>
        public async Task<MessageModel<string>> AddScheduleJobAsync(TasksQz sysSchedule)
        {
            var result = new MessageModel<string>();
            if (sysSchedule != null)
            {
                try
                {
                    JobKey jobKey = new JobKey(sysSchedule.Id.ToString());
                    if (await _scheduler.Result.CheckExists(jobKey))
                    {
                        result.success = false;
                        result.msg = $"該任務計划已經在執行:【{sysSchedule.Name}】";
                        return result;
                    }
                    #region 設置開始時間和結束時間
                    if (sysSchedule.BeginTime == null)
                    {
                        sysSchedule.BeginTime = DateTime.Now;
                    }
                    DateTimeOffset startRunTime = DateBuilder.NextGivenSecondDate(sysSchedule.BeginTime, 1);//設置開始時間
                    if (sysSchedule.EndTime == null)
                    {
                        sysSchedule.EndTime = DateTime.MaxValue.AddDays(-1);
                    }
                    DateTimeOffset endRunTime = DateBuilder.NextGivenSecondDate(sysSchedule.EndTime, 1);//設置暫停時間

                    #endregion

                    #region 通過反射獲取程序集類型和類

                    Assembly assembly = Assembly.Load(sysSchedule.AssemblyName);
                    Type jobType = assembly.GetType(sysSchedule.AssemblyName + "." + sysSchedule.ClassName);

                    #endregion

                    //判斷任務調度是否開啟
                    if (!_scheduler.Result.IsStarted)
                    {
                        await StartScheduleAsync();
                    }

                    //傳入反射出來的執行程序集
                    IJobDetail job = new JobDetailImpl(sysSchedule.Id.ToString(), sysSchedule.JobGroup, jobType);
                    job.JobDataMap.Add("JobParam", sysSchedule.JobParams);
                    ITrigger trigger;
                    #region 泛型傳遞
                    //IJobDetail job = JobBuilder.Create<T>()
                    //    .WithIdentity(sysSchedule.Name, sysSchedule.JobGroup)
                    //    .Build();
                    #endregion
                    if (sysSchedule.Cron != null && CronExpression.IsValidExpression(sysSchedule.Cron) && sysSchedule.TriggerType > 0)
                    {
                        trigger = CreateCronTrigger(sysSchedule);
                    }
                    else
                    {
                        trigger = CreateSimpleTrigger(sysSchedule);
                    }

                    //告訴Quartz使用我們的觸發器來安排作業
                    await _scheduler.Result.ScheduleJob(job, trigger);

                    //await Task.Delay(TimeSpan.FromSeconds(120));
                    //await Console.Out.WriteLineAsync("關閉了調度器!");
                    //await _scheduler.Result.Shutdown();

                    result.success = true;
                    result.msg = $"啟動任務: 【{sysSchedule.Name}】成功";
                    return result;

                }
                catch (Exception ex)
                {
                    result.success = false;
                    result.msg = $"任務計划異常:【{ex.Message}】";
                    return result;
                }
            }
            else
            {
                result.success = false;
                result.msg = $"任務計划不存在:【{sysSchedule?.Name}】";
                return result;
            }
        }

        /// <summary>
        /// 暫停一個指定的計划任務
        /// </summary>
        /// <param name="sysSchedule"></param>
        /// <returns></returns>
        public async Task<MessageModel<string>> StopScheduleJobAsync(TasksQz sysSchedule)
        {
            var result = new MessageModel<string>();
            try
            {
                JobKey jobKey = new JobKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup);
                if (!await _scheduler.Result.CheckExists(jobKey))
                {
                    result.success = false;
                    result.msg = $"未找到要暫停的任務:【{sysSchedule.Name}】";
                    return result;
                }
                else
                {
                    await this._scheduler.Result.PauseJob(jobKey);
                    result.success = true;
                    result.msg = $"暫停任務:【{sysSchedule.Name}】成功";
                    return result;
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }


        /// <summary>
        /// 恢復指定的計划任務
        /// </summary>
        /// <param name="sysSchedule"></param>
        /// <returns></returns>
        public async Task<MessageModel<string>> ResumeJob(TasksQz sysSchedule)
        {
            var result = new MessageModel<string>();
            try
            {
                JobKey jobKey = new JobKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup);
                if (!await _scheduler.Result.CheckExists(jobKey))
                {
                    result.success = false;
                    result.msg = $"未找到要重新運行的任務,【{sysSchedule.Name}】";
                    return result;
                }
                //await this._scheduler.Result.ResumeJob(jobKey);

                ITrigger trigger;
                if (sysSchedule.Cron != null && CronExpression.IsValidExpression(sysSchedule.Cron) && sysSchedule.TriggerType > 0)
                {
                    trigger = CreateCronTrigger(sysSchedule);
                }
                else
                {
                    trigger = CreateSimpleTrigger(sysSchedule);
                }
                TriggerKey triggerKey = new TriggerKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup); ;
                await _scheduler.Result.RescheduleJob(triggerKey, trigger);

                result.success = true;
                result.msg = $"恢復計划任務: 【{sysSchedule.Name}】 成功";
                return result;
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }


        #region 創建觸發器幫助方法

        private ITrigger CreateSimpleTrigger(TasksQz sysSchedule)
        {
            if (sysSchedule.RunTimes > 0)
            {
                ITrigger trigger = TriggerBuilder.Create()
                    .WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
                    .StartAt(sysSchedule.BeginTime.Value)
                    .EndAt(sysSchedule.EndTime.Value)
                    .WithSimpleSchedule(x =>
                    x.WithIntervalInSeconds(sysSchedule.IntervalSecond)
                    .WithRepeatCount(sysSchedule.RunTimes)).ForJob(sysSchedule.Id.ToString(), sysSchedule.JobGroup).Build();
                return trigger;
            }
            else
            {
                ITrigger trigger = TriggerBuilder.Create()
                    .WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
                    .StartAt(sysSchedule.BeginTime.Value)
                    .EndAt(sysSchedule.EndTime.Value)
                    .WithSimpleSchedule(x =>
                    x.WithIntervalInSeconds(sysSchedule.IntervalSecond)
                    .RepeatForever()).ForJob(sysSchedule.Id.ToString(), sysSchedule.JobGroup).Build();
                return trigger;
            }
            //觸發作業立即運行,然后每10秒重復一次,無限循環
        }


        /// <summary>
        /// 創建類型Cron的觸發器
        /// </summary>
        /// <param name="sysSchedule"></param>
        /// <returns></returns>
        private ITrigger CreateCronTrigger(TasksQz sysSchedule)
        {
            //作業觸發器
            return TriggerBuilder.Create()
                .WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
                .StartAt(sysSchedule.BeginTime.Value)
                .EndAt(sysSchedule.EndTime.Value)
                .WithCronSchedule(sysSchedule.Cron)//指定cron表達式
                .ForJob(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
                .Build();
        }
        #endregion
    }

2.3 Job工廠類 JobFactory 實現:
public class JobFactory : IJobFactory
{
        private readonly IServiceProvider _serviceProvider;  

        public JobFactory(IServiceProvider serviceProvider)
        {
            _serviceProvider = serviceProvider;
        }



        /// <summary>
        /// 實現接口Job
        /// </summary>
        /// <param name="bundle"></param>
        /// <param name="scheduler"></param>
        /// <returns></returns>
        public IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)
        {
            try
            {

                var serviceScope = _serviceProvider.CreateScope();

                var job = serviceScope.ServiceProvider.GetService(bundle.JobDetail.JobType) as IJob;

                return job;

            }
            catch(Exception ex)
            {
                throw ex;
            }

        }


        public void ReturnJob(IJob job)
        {

            var disposable = job as IDisposable;

            if(disposable != null)
            {
                disposable.Dispose();
            }

        }
    }

2.4 單個任務調度的示例 :
  • Job_Users_Quartz 這里是要執行的服務和方法 比如每隔一段時間對平台用戶的總數量進行統計
    public class Job_Users_Quartz : JobBase, IJob
    {

        private readonly ITasksQzServices _tasksQzServices;

        private readonly IUserServices _userServices;

        private readonly ILogger<Job_Users_Quartz> _logger;

        public Job_Users_Quartz(ITasksQzServices tasksQzServices, IUserServices userServices, ILogger<Job_Users_Quartz> logger)
        {

            _tasksQzServices = tasksQzServices;
            _userServices = userServices;
            _logger = logger;

        }



        public async Task Execute(IJobExecutionContext context)
        {

            //var param = context.MergedJobDataMap;
            // 可以直接獲取 JobDetail 的值
            var jobKey = context.JobDetail.Key;
            var jobId = jobKey.Name;

            var executeLog = await ExecuteJob(context, async () => await Run(context, jobId.ObjToInt()));

            //通過數據庫配置,獲取傳遞過來的參數
            JobDataMap data = context.JobDetail.JobDataMap;
      
        }
        
        public async Task Run(IJobExecutionContext context, int jobid)
        {

            var count = await _userServices.QueryCount(x=>x.ID > 0);
            if (jobid > 0)
            {

                var model = await _tasksQzServices.QueryById(jobid);
                if (model != null)
                {

                    model.RunTimes += 1;

                    var separator = "<br>";

                    string remark = $"【{DateTime.Now}】執行任務【Id:{context.JobDetail.Key.Name},組別:context.JobDetail.Key.Group}】【執行成功】{separator}";

                    model.Remark = remark + string.Join(separator, StringHelper.GetTopDataBySeparator(model.Remark, separator, 3));

                    //_logger.LogInformation(remark);
                    await _tasksQzServices.Update(model);
                }
            }
            //_logger.LogInformation("用戶總數量" + count.ToString());
            await Console.Out.WriteLineAsync("用戶總數量" + count.ToString());
        }
    }
2.5 Quartz 啟動服務中間件
    /// <summary>
    /// Quartz 啟動服務
    /// </summary>
    public static class QuartzJobMildd
    {

        public static void UseQuartzJobMildd(this IApplicationBuilder app, ITasksQzServices tasksQzServices, ISchedulerCenter schedulerCenter)
        {

            if (app == null) throw new 
ArgumentNullException(nameof(QuartzJobMildd));


            try
            {

                if (Appsettings.app("Middleware", "QuartzNetJob", "Enabled").ObjToBool())
                {

                    var allQzServices = tasksQzServices.Query().Result;

                    foreach (var item in allQzServices)
                    {

                        if (item.IsStart)
                        {

                            var resuleModel = schedulerCenter.AddScheduleJobAsync(item).Result;

                            if (resuleModel.success)
                            {
                                Console.WriteLine($"QuartzNetJob{item.Name}啟動成功! ");
                            }
                            else
                            {
                                Console.WriteLine($"QuartzNetJob{item.Name}啟動失敗! 錯誤信息{resuleModel.msg}");
                            }
                        }
                    }
                }
            }
            catch (Exception ex)
            {

                Console.WriteLine($"An error was reported when starting the job service.\n{ex.Message}");
                throw;
            }
        }
    }

2.6 任務調度啟動服務 JobSetup
    /// <summary>
    /// 任務調度 啟動服務
    /// </summary>
    public static class JobSetup
    {

        public static void AddJobSetup(this IServiceCollection services)
        {

            if (services == null) throw new ArgumentNullException(nameof(services));

            //services.AddHostedService<Job1TimedService>();
            //services.AddHostedService<Job2TimedService>();

            services.AddSingleton<IJobFactory, JobFactory>();
            services.AddTransient<Job_Users_Quartz>();//Job使用瞬時依賴注入
            services.AddSingleton<ISchedulerCenter, SchedulerCenterServer>();

        }

    }


2.7 Quartz 中間件和服務在Startup的的啟用
 public void ConfigureServices(IServiceCollection services)
{
 services.AddJobSetup();
}


 
  public void Configure(IApplicationBuilder app, IWebHostEnvironment env, 
ITasksQzServices tasksQzServices, ISchedulerCenter schedulerCenter)
 {
      
 //// 開啟QuartzNetJob調度服務
app.UseQuartzJobMildd(tasksQzServices, schedulerCenter);

}

2.8 Quartz 的 TasksQz 類定義
    /// <summary>
    /// 任務計划表
    /// </summary>

    public class TasksQz 
    {
         /// <summary>
        /// ID
        /// </summary>
        [SugarColumn(IsNullable = false,IsPrimaryKey = true,IsIdentity = true)]
        public int Id { get; set; }

        /// <summary>
        /// 任務名稱
        /// </summary>
        [SugarColumn(ColumnDataType = "nvarchar",Length = 200,IsNullable = true)]
        public string Name { get; set;}

        /// <summary>
        /// 任務分組
        /// </summary>
        [SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
        public string JobGroup { get; set; }

        /// <summary>
        /// 任務運行時間表達式
        /// </summary>
        [SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
        public string Cron { get; set; }

        /// <summary>
        /// 任務所在DLL對應的程序集名稱
        /// </summary>
        [SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
        public string AssemblyName { get; set; }

        /// <summary>
        /// 任務所在類
        /// </summary>
        [SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
        public string ClassName { get; set; }

        /// <summary>
        /// 任務描述
        /// </summary>
        [SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
        public string Remark { get; set; }  


        /// <summary>
        /// 執行次數
        /// </summary>
        public int RunTimes { get; set; }

        /// <summary>
        /// 開始時間
        /// </summary>
        public DateTime? BeginTime { get; set; }

        /// <summary>
        /// 結束時間
        /// </summary>
        public DateTime? EndTime { get; set; }

        /// <summary>
        ///  觸發器類型(0 simple  1 cron)
        /// </summary>
        public int TriggerType { get; set; }



        /// <summary>
        /// 執行間隔時間,秒為單位
        /// </summary>
        public int IntervalSecond { get; set; }

        /// <summary>
        /// 是否啟動
        /// </summary>
        public bool IsStart { get; set; }


        /// <summary>
        /// 執行傳參
        /// </summary>
        public string JobParams { get; set; }


        /// <summary>
        /// 是否刪除 1 刪除 
        /// </summary>
        [SugarColumn(IsNullable = true)]
        public bool? IsDeleted { get; set; }

        /// <summary>
        /// 創建時間
        /// </summary>
        [SugarColumn(IsNullable = true)]
        public DateTime CreateTime { get; set; } = DateTime.Now;

    }

參考源碼:
https://github.com/anjoy8/Blog.Core

Quartz管理工具:
https://github.com/guryanovev/crystalquartz


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM