Quarzt 是一個開源的作業調度框架;允許程序開發人員根據時間間隔來調度作業;實現了任務和觸發器的多對多關系.
1.Quarzt 核心概念:
- Job : 表示一個工作,要執行的具體內容; 接口中定義了一個方法 void excute(JobExecutionContext context);
- JobDetail: 表示一個具體的可執行的調度程序;包含了這個任務調度的方案和策略.
- Trigger: 一個調度參數的配置,配置調用的時間和周期.框架提供了5種觸發器類型(SimpleTrigger、CronTrigger、DateIntervalTrigger、 NthlncludedDayTrigger、Calendar類).
- Scheduler: 一個調度容器可以注冊多個JobDetail和Trigger.調度需要組合JobDetail和Trigger;
附: 常用的trigger為SimpleTrigger和CronTrigger.
SimpleTrigger 執行N次,重復N次;
CronTrigger: 幾秒 幾分 幾時 哪日 哪月 哪周 哪年執行
quartz表達式在線網址: http://cron.qqe2.com/
1.1 Quarzt 運行環境:
可以嵌入在應用程序,也可以作為一個獨立的程序運行.
1.2.Quarzt 存儲方式:
- RAMJobStore(內存作業存儲類型)和JDBCJobStore(數據庫作業存儲類型),兩種方式對比如下:
優點 | 缺點 |
---|---|
RAMJobStore 不要外部數據庫,配置容易,運行速度快 | 因為調度程序信息是存儲在被分配給JVM的內存里面, 所以,當應用程序停止運行時,所有調度信息將被丟失。另外因為存儲到JVM內存里面,所以可以存儲多少個Job和Trigger將會受到限制 |
BCJobStor 支持集群,因為所有的任務信息都會保存 | 運行速度的快慢取決與連接數據庫的快慢到數據庫中,可以控制事物,還有就是如果應用服務器關閉或者重啟,任務信息都不會丟失,並且可以恢復因服務器關閉或者重啟而導致執行失敗的任務 |
2. Quarzt 在.net的實現
2.1 Quarzt ISchedulerCenter接口定義
- ISchedulerCenter接口主要定義了以下幾個接口
開啟任務調度 StartScheduleAsync()
停止任務調度 StopScheduleAsync()
添加一個任務 AddScheduleJobAsync(TasksQz sysSchedule)
停止一個任務 StopScheduleJobAsync(TasksQz sysSchedule)
恢復一個任務 ResumeJob(TasksQz sysSchedule)
/// <summary>
/// 服務調度接口
/// </summary>
public interface ISchedulerCenter
{
/// <summary>
/// 開啟任務調度
/// </summary>
/// <returns></returns>
Task<MessageModel<string>> StartScheduleAsync();
/// <summary>
/// 停止任務調度
/// </summary>
/// <returns></returns>
Task<MessageModel<string>> StopScheduleAsync();
/// <summary>
/// 添加一個任務
/// </summary>
/// <returns></returns>
Task<MessageModel<string>> AddScheduleJobAsync(TasksQz sysSchedule);
/// <summary>
/// 停止一個任務
/// </summary>
/// <returns></returns>
Task<MessageModel<string>> StopScheduleJobAsync(TasksQz sysSchedule);
/// <summary>
/// 恢復一個任務
/// </summary>
/// <returns></returns>
Task<MessageModel<string>> ResumeJob(TasksQz sysSchedule);
}
2.2 Quarzt 任務調度服務中心 SchedulerCenterServer 封裝
- 任務調度中心 SchedulerCenterServer 類的實現, 繼承實現 ISchedulerCenter 接口;包含任務的開啟、任務的關閉方法;以及對指定任務的開啟、暫停和恢復方法的封裝;
/// <summary>
/// 任務調度管理中心
/// </summary>
public class SchedulerCenterServer : ISchedulerCenter
{
private Task<IScheduler> _scheduler;
private readonly IJobFactory _iocjobFactory;
public SchedulerCenterServer(IJobFactory jobFactory)
{
_iocjobFactory = jobFactory;
_scheduler = GetSchedulerAsync();
}
private Task<IScheduler> GetSchedulerAsync()
{
if (_scheduler != null)
return this._scheduler;
else
{
//從Factory中獲取Scheduler實例
NameValueCollection collection = new NameValueCollection
{
{"quartz.serializer.type","binary" }
};
StdSchedulerFactory factory = new StdSchedulerFactory(collection);
return _scheduler = factory.GetScheduler();
}
}
/// <summary>
/// 開啟任務調度
/// </summary>
/// <returns></returns>
public async Task<MessageModel<string>> StartScheduleAsync()
{
var result = new MessageModel<string>();
try
{
this._scheduler.Result.JobFactory = this._iocjobFactory;
if (!this._scheduler.Result.IsStarted)
{
//等待任務運行完成
await this._scheduler.Result.Start();
await Console.Out.WriteLineAsync("任務調度開啟!");
result.success = true;
result.msg = $"任務調度開啟成功";
return result;
}
else
{
result.success = false;
result.msg = $"任務調度已經開啟";
return result;
}
}
catch (Exception ex)
{
throw ex;
}
throw new NotImplementedException();
}
/// <summary>
/// 停止任務調度
/// </summary>
/// <returns></returns>
public async Task<MessageModel<string>> StopScheduleAsync()
{
var result = new MessageModel<string>();
try
{
if (!this._scheduler.Result.IsShutdown)
{
//等待任務運行完成
await this._scheduler.Result.Shutdown();
await Console.Out.WriteLineAsync("任務調度停止! ");
result.success = true;
result.msg = $"任務調度停止成功";
return result;
}
else
{
result.success = false;
result.msg = $"任務調度已經停止";
return result;
}
}
catch (Exception ex)
{
throw ex;
}
throw new NotImplementedException();
}
/// <summary>
/// 添加一個計划任務(映射程序集指定IJob實現類)
/// </summary>
/// <param name="sysSchedule"></param>
/// <returns></returns>
public async Task<MessageModel<string>> AddScheduleJobAsync(TasksQz sysSchedule)
{
var result = new MessageModel<string>();
if (sysSchedule != null)
{
try
{
JobKey jobKey = new JobKey(sysSchedule.Id.ToString());
if (await _scheduler.Result.CheckExists(jobKey))
{
result.success = false;
result.msg = $"該任務計划已經在執行:【{sysSchedule.Name}】";
return result;
}
#region 設置開始時間和結束時間
if (sysSchedule.BeginTime == null)
{
sysSchedule.BeginTime = DateTime.Now;
}
DateTimeOffset startRunTime = DateBuilder.NextGivenSecondDate(sysSchedule.BeginTime, 1);//設置開始時間
if (sysSchedule.EndTime == null)
{
sysSchedule.EndTime = DateTime.MaxValue.AddDays(-1);
}
DateTimeOffset endRunTime = DateBuilder.NextGivenSecondDate(sysSchedule.EndTime, 1);//設置暫停時間
#endregion
#region 通過反射獲取程序集類型和類
Assembly assembly = Assembly.Load(sysSchedule.AssemblyName);
Type jobType = assembly.GetType(sysSchedule.AssemblyName + "." + sysSchedule.ClassName);
#endregion
//判斷任務調度是否開啟
if (!_scheduler.Result.IsStarted)
{
await StartScheduleAsync();
}
//傳入反射出來的執行程序集
IJobDetail job = new JobDetailImpl(sysSchedule.Id.ToString(), sysSchedule.JobGroup, jobType);
job.JobDataMap.Add("JobParam", sysSchedule.JobParams);
ITrigger trigger;
#region 泛型傳遞
//IJobDetail job = JobBuilder.Create<T>()
// .WithIdentity(sysSchedule.Name, sysSchedule.JobGroup)
// .Build();
#endregion
if (sysSchedule.Cron != null && CronExpression.IsValidExpression(sysSchedule.Cron) && sysSchedule.TriggerType > 0)
{
trigger = CreateCronTrigger(sysSchedule);
}
else
{
trigger = CreateSimpleTrigger(sysSchedule);
}
//告訴Quartz使用我們的觸發器來安排作業
await _scheduler.Result.ScheduleJob(job, trigger);
//await Task.Delay(TimeSpan.FromSeconds(120));
//await Console.Out.WriteLineAsync("關閉了調度器!");
//await _scheduler.Result.Shutdown();
result.success = true;
result.msg = $"啟動任務: 【{sysSchedule.Name}】成功";
return result;
}
catch (Exception ex)
{
result.success = false;
result.msg = $"任務計划異常:【{ex.Message}】";
return result;
}
}
else
{
result.success = false;
result.msg = $"任務計划不存在:【{sysSchedule?.Name}】";
return result;
}
}
/// <summary>
/// 暫停一個指定的計划任務
/// </summary>
/// <param name="sysSchedule"></param>
/// <returns></returns>
public async Task<MessageModel<string>> StopScheduleJobAsync(TasksQz sysSchedule)
{
var result = new MessageModel<string>();
try
{
JobKey jobKey = new JobKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup);
if (!await _scheduler.Result.CheckExists(jobKey))
{
result.success = false;
result.msg = $"未找到要暫停的任務:【{sysSchedule.Name}】";
return result;
}
else
{
await this._scheduler.Result.PauseJob(jobKey);
result.success = true;
result.msg = $"暫停任務:【{sysSchedule.Name}】成功";
return result;
}
}
catch (Exception ex)
{
throw ex;
}
}
/// <summary>
/// 恢復指定的計划任務
/// </summary>
/// <param name="sysSchedule"></param>
/// <returns></returns>
public async Task<MessageModel<string>> ResumeJob(TasksQz sysSchedule)
{
var result = new MessageModel<string>();
try
{
JobKey jobKey = new JobKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup);
if (!await _scheduler.Result.CheckExists(jobKey))
{
result.success = false;
result.msg = $"未找到要重新運行的任務,【{sysSchedule.Name}】";
return result;
}
//await this._scheduler.Result.ResumeJob(jobKey);
ITrigger trigger;
if (sysSchedule.Cron != null && CronExpression.IsValidExpression(sysSchedule.Cron) && sysSchedule.TriggerType > 0)
{
trigger = CreateCronTrigger(sysSchedule);
}
else
{
trigger = CreateSimpleTrigger(sysSchedule);
}
TriggerKey triggerKey = new TriggerKey(sysSchedule.Id.ToString(), sysSchedule.JobGroup); ;
await _scheduler.Result.RescheduleJob(triggerKey, trigger);
result.success = true;
result.msg = $"恢復計划任務: 【{sysSchedule.Name}】 成功";
return result;
}
catch (Exception ex)
{
throw ex;
}
}
#region 創建觸發器幫助方法
private ITrigger CreateSimpleTrigger(TasksQz sysSchedule)
{
if (sysSchedule.RunTimes > 0)
{
ITrigger trigger = TriggerBuilder.Create()
.WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
.StartAt(sysSchedule.BeginTime.Value)
.EndAt(sysSchedule.EndTime.Value)
.WithSimpleSchedule(x =>
x.WithIntervalInSeconds(sysSchedule.IntervalSecond)
.WithRepeatCount(sysSchedule.RunTimes)).ForJob(sysSchedule.Id.ToString(), sysSchedule.JobGroup).Build();
return trigger;
}
else
{
ITrigger trigger = TriggerBuilder.Create()
.WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
.StartAt(sysSchedule.BeginTime.Value)
.EndAt(sysSchedule.EndTime.Value)
.WithSimpleSchedule(x =>
x.WithIntervalInSeconds(sysSchedule.IntervalSecond)
.RepeatForever()).ForJob(sysSchedule.Id.ToString(), sysSchedule.JobGroup).Build();
return trigger;
}
//觸發作業立即運行,然后每10秒重復一次,無限循環
}
/// <summary>
/// 創建類型Cron的觸發器
/// </summary>
/// <param name="sysSchedule"></param>
/// <returns></returns>
private ITrigger CreateCronTrigger(TasksQz sysSchedule)
{
//作業觸發器
return TriggerBuilder.Create()
.WithIdentity(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
.StartAt(sysSchedule.BeginTime.Value)
.EndAt(sysSchedule.EndTime.Value)
.WithCronSchedule(sysSchedule.Cron)//指定cron表達式
.ForJob(sysSchedule.Id.ToString(), sysSchedule.JobGroup)
.Build();
}
#endregion
}
2.3 Job工廠類 JobFactory 實現:
public class JobFactory : IJobFactory
{
private readonly IServiceProvider _serviceProvider;
public JobFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
/// <summary>
/// 實現接口Job
/// </summary>
/// <param name="bundle"></param>
/// <param name="scheduler"></param>
/// <returns></returns>
public IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)
{
try
{
var serviceScope = _serviceProvider.CreateScope();
var job = serviceScope.ServiceProvider.GetService(bundle.JobDetail.JobType) as IJob;
return job;
}
catch(Exception ex)
{
throw ex;
}
}
public void ReturnJob(IJob job)
{
var disposable = job as IDisposable;
if(disposable != null)
{
disposable.Dispose();
}
}
}
2.4 單個任務調度的示例 :
- Job_Users_Quartz 這里是要執行的服務和方法 比如每隔一段時間對平台用戶的總數量進行統計
public class Job_Users_Quartz : JobBase, IJob
{
private readonly ITasksQzServices _tasksQzServices;
private readonly IUserServices _userServices;
private readonly ILogger<Job_Users_Quartz> _logger;
public Job_Users_Quartz(ITasksQzServices tasksQzServices, IUserServices userServices, ILogger<Job_Users_Quartz> logger)
{
_tasksQzServices = tasksQzServices;
_userServices = userServices;
_logger = logger;
}
public async Task Execute(IJobExecutionContext context)
{
//var param = context.MergedJobDataMap;
// 可以直接獲取 JobDetail 的值
var jobKey = context.JobDetail.Key;
var jobId = jobKey.Name;
var executeLog = await ExecuteJob(context, async () => await Run(context, jobId.ObjToInt()));
//通過數據庫配置,獲取傳遞過來的參數
JobDataMap data = context.JobDetail.JobDataMap;
}
public async Task Run(IJobExecutionContext context, int jobid)
{
var count = await _userServices.QueryCount(x=>x.ID > 0);
if (jobid > 0)
{
var model = await _tasksQzServices.QueryById(jobid);
if (model != null)
{
model.RunTimes += 1;
var separator = "<br>";
string remark = $"【{DateTime.Now}】執行任務【Id:{context.JobDetail.Key.Name},組別:context.JobDetail.Key.Group}】【執行成功】{separator}";
model.Remark = remark + string.Join(separator, StringHelper.GetTopDataBySeparator(model.Remark, separator, 3));
//_logger.LogInformation(remark);
await _tasksQzServices.Update(model);
}
}
//_logger.LogInformation("用戶總數量" + count.ToString());
await Console.Out.WriteLineAsync("用戶總數量" + count.ToString());
}
}
2.5 Quartz 啟動服務中間件
/// <summary>
/// Quartz 啟動服務
/// </summary>
public static class QuartzJobMildd
{
public static void UseQuartzJobMildd(this IApplicationBuilder app, ITasksQzServices tasksQzServices, ISchedulerCenter schedulerCenter)
{
if (app == null) throw new
ArgumentNullException(nameof(QuartzJobMildd));
try
{
if (Appsettings.app("Middleware", "QuartzNetJob", "Enabled").ObjToBool())
{
var allQzServices = tasksQzServices.Query().Result;
foreach (var item in allQzServices)
{
if (item.IsStart)
{
var resuleModel = schedulerCenter.AddScheduleJobAsync(item).Result;
if (resuleModel.success)
{
Console.WriteLine($"QuartzNetJob{item.Name}啟動成功! ");
}
else
{
Console.WriteLine($"QuartzNetJob{item.Name}啟動失敗! 錯誤信息{resuleModel.msg}");
}
}
}
}
}
catch (Exception ex)
{
Console.WriteLine($"An error was reported when starting the job service.\n{ex.Message}");
throw;
}
}
}
2.6 任務調度啟動服務 JobSetup
/// <summary>
/// 任務調度 啟動服務
/// </summary>
public static class JobSetup
{
public static void AddJobSetup(this IServiceCollection services)
{
if (services == null) throw new ArgumentNullException(nameof(services));
//services.AddHostedService<Job1TimedService>();
//services.AddHostedService<Job2TimedService>();
services.AddSingleton<IJobFactory, JobFactory>();
services.AddTransient<Job_Users_Quartz>();//Job使用瞬時依賴注入
services.AddSingleton<ISchedulerCenter, SchedulerCenterServer>();
}
}
2.7 Quartz 中間件和服務在Startup的的啟用
public void ConfigureServices(IServiceCollection services)
{
services.AddJobSetup();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env,
ITasksQzServices tasksQzServices, ISchedulerCenter schedulerCenter)
{
//// 開啟QuartzNetJob調度服務
app.UseQuartzJobMildd(tasksQzServices, schedulerCenter);
}
2.8 Quartz 的 TasksQz 類定義
/// <summary>
/// 任務計划表
/// </summary>
public class TasksQz
{
/// <summary>
/// ID
/// </summary>
[SugarColumn(IsNullable = false,IsPrimaryKey = true,IsIdentity = true)]
public int Id { get; set; }
/// <summary>
/// 任務名稱
/// </summary>
[SugarColumn(ColumnDataType = "nvarchar",Length = 200,IsNullable = true)]
public string Name { get; set;}
/// <summary>
/// 任務分組
/// </summary>
[SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
public string JobGroup { get; set; }
/// <summary>
/// 任務運行時間表達式
/// </summary>
[SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
public string Cron { get; set; }
/// <summary>
/// 任務所在DLL對應的程序集名稱
/// </summary>
[SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
public string AssemblyName { get; set; }
/// <summary>
/// 任務所在類
/// </summary>
[SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
public string ClassName { get; set; }
/// <summary>
/// 任務描述
/// </summary>
[SugarColumn(ColumnDataType = "nvarchar", Length = 200, IsNullable = true)]
public string Remark { get; set; }
/// <summary>
/// 執行次數
/// </summary>
public int RunTimes { get; set; }
/// <summary>
/// 開始時間
/// </summary>
public DateTime? BeginTime { get; set; }
/// <summary>
/// 結束時間
/// </summary>
public DateTime? EndTime { get; set; }
/// <summary>
/// 觸發器類型(0 simple 1 cron)
/// </summary>
public int TriggerType { get; set; }
/// <summary>
/// 執行間隔時間,秒為單位
/// </summary>
public int IntervalSecond { get; set; }
/// <summary>
/// 是否啟動
/// </summary>
public bool IsStart { get; set; }
/// <summary>
/// 執行傳參
/// </summary>
public string JobParams { get; set; }
/// <summary>
/// 是否刪除 1 刪除
/// </summary>
[SugarColumn(IsNullable = true)]
public bool? IsDeleted { get; set; }
/// <summary>
/// 創建時間
/// </summary>
[SugarColumn(IsNullable = true)]
public DateTime CreateTime { get; set; } = DateTime.Now;
}
參考源碼:
https://github.com/anjoy8/Blog.Core
Quartz管理工具:
https://github.com/guryanovev/crystalquartz