在項目的開發過程中,難免會遇見后需要后台處理的任務,例如定時發送郵件通知、后台處理耗時的數據處理等,這個時候你就需要Quartz.Net
了。
Quartz.Net
是純凈的,它是一個.Net程序集,是非常流行的Java作業調度系統Quartz的C#實現。
Quartz.Net
一款功能齊全的任務調度系統,從小型應用到大型企業級系統都能適用。功能齊全體現在觸發器的多樣性上面,即支持簡單的定時器,也支持Cron表達式;即能執行重復的作業任務,也支持指定例外的日歷;任務也可以是多樣性的,只要繼承IJob接口即可。
對於小型應用,Quartz.Net
可以集成到你的系統中,對於企業級系統,它提供了Routing支持,提供了Group來組織和管理任務,此外還有持久化、插件功能、負載均衡和故障遷移等滿足不同應用場景的需要。
Hello Quartz.Net
開始使用一個框架,和學習一門開發語言一樣,最好是從Hello World程序開始。
首先創建一個示例程序,然后添加Quartz.Net的引用。
Install-Package Quartz -Version 3.0.7
我們使用的是當前最新版本3.0.7進行演示。添加引用以后,來創建一個Job類HelloQuartzJob
。
public class HelloQuartzJob : IJob
{
public Task Execute(IJobExecutionContext context)
{
return Task.Factory.StartNew(() =>
{
Console.WriteLine("Hello Quartz.Net");
});
}
}
這是個非常簡單的Job類,它在執行時輸出文本Hello Quartz.Net
。
接下來,我們在程序啟動時創建調度器(Scheduler),並添加HelloQuartzJob的調度:
static async Task MainAsync()
{
var schedulerFactory = new StdSchedulerFactory();
var scheduler = await schedulerFactory.GetScheduler();
await scheduler.Start();
Console.WriteLine($"任務調度器已啟動");
//創建作業和觸發器
var jobDetail = JobBuilder.Create<HelloQuartzJob>().Build();
var trigger = TriggerBuilder.Create()
.WithSimpleSchedule(m => {
m.WithRepeatCount(3).WithIntervalInSeconds(1);
})
.Build();
//添加調度
await scheduler.ScheduleJob(jobDetail, trigger);
}
然后運行程序,你會看到如下圖:
通過演示可以看出,要執行一個定時任務,一般需要四步:
- 創建任務調度器。調度器通常在應用程序啟動時創建,一個應用程序實例通常只需要一個調度器即可。
- 創建Job和JobDetail。Job是作業的類型,描述了作業是如何執行的,這個類是由我們定義的;JobDetail是Quartz對作業的封裝,它包含Job類型,以及Job在執行時用到的數據,還包括是否要持久化、是否覆蓋已存在的作業等選項。
- 創建觸發器。觸發器描述了在何時執行作業。
- 添加調度。當完成以上三步以后,就可以對作業進行調度了。
作業:Job和JobDetail
Job是作業的類型,描述了作業是如何執行的,這個類型是由我們定義的,例如上文的HelloQuartzJob
。Job實現IJob接口,而IJob接口只有一個Execute
方法,參數context
中包含了與當前上下文中關聯的Scheduler、JobDetail、Trigger等。
一個典型的Job定義如下:
public class HelloQuartzJob : IJob
{
public Task Execute(IJobExecutionContext context)
{
return Task.Factory.StartNew(() =>
{
Console.WriteLine("Hello Quartz.Net");
});
}
}
JobData
Job不是孤立存在的,它需要執行的參數,這些參數如何傳遞進來呢?我們來定義一個Job類進行演示。
public class SayHelloJob : IJob
{
public string UserName { get; set; }
public Task Execute(IJobExecutionContext context)
{
return Task.Factory.StartNew(() =>
{
Console.WriteLine($"Hello {UserName}!");
});
}
}
SayHelloJob
在執行時需要參數UserName
,這個參數被稱為JobData,Quartz.Net
通過JobDataMap的方式傳遞參數。代碼如下:
//創建作業
var jobDetail = JobBuilder.Create<SayHelloJob>()
.SetJobData(new JobDataMap() {
new KeyValuePair<string, object>("UserName", "Tom")
})
.Build();
通過JobBuilder的SetJobData方法,傳入JobDataMap對象,JobDataMap對象中可以包含多個參數,這些參數可以映射到Job類的屬性上。我們完善代碼運行示例,可以看到如下圖:
JobDetail
JobDetail是Quartz對作業的封裝,它包含Job類型,以及Job在執行時用到的數據,還包括是否孤立存儲、請求恢復作業等選項。
JobDetail是通過JobBuilder進行創建的。例如:
var jobDetail = JobBuilder.Create<SayHelloJob>()
.SetJobData(new JobDataMap() {
new KeyValuePair<string, object>("UserName", "Tom")
})
.StoreDurably(true)
.RequestRecovery(true)
.WithIdentity("SayHelloJob-Tom", "DemoGroup")
.WithDescription("Say hello to Tom job")
.Build();
參數說明:
- SetJobData:設置JobData
- StoreDurably:孤立存儲,指即使該JobDetail沒有關聯的Trigger,也會進行存儲
- RequestRecovery:請求恢復,指應用崩潰后再次啟動,會重新執行該作業
- WithIdentity:作業的唯一標識
- WithDescription:作業的描述信息
除此之外,Quartz.Net
還支持兩個非常有用的特性:
- DisallowConcurrentExecution:禁止並行執行,該特性是針對JobDetail生效的
- PersistJobDataAfterExecution:在執行完成后持久化JobData,該特性是針對Job類型生效的,意味着所有使用該Job的JobDetail都會在執行完成后持久化JobData。
持久化JobData
我們來演示一下該PersistJobDataAfterExecution
特性,在SayHelloJob
中,我們新加一個字段RunSuccess
,記錄任務是否執行成功。
首先在SayHelloJob
添加特性:
[PersistJobDataAfterExecution]
public class SayHelloJob : IJob { }
然后在創建JobDetail
時添加JobData:
var jobDetail = JobBuilder.Create<SayHelloJob>()
.SetJobData(new JobDataMap() {
new KeyValuePair<string, object>("UserName", "Tom"),
new KeyValuePair<string, object>("RunSuccess", false)
})
在執行時Job時,更新RunSuccess的值:
public Task Execute(IJobExecutionContext context)
{
return Task.Factory.StartNew(() =>
{
Console.WriteLine($"Prev Run Success:{RunSuccess}");
Console.WriteLine($"Hello {UserName}!");
context.JobDetail.JobDataMap.Put("RunSuccess", true);
});
}
接下來看一下執行效果:
觸發器:Trigger
Trigger是觸發器,用來定制執行作業。Trigger有兩種類型:SampleTrigger和CronTrigger,我們分別進行說明。
SampleTrigger
顧名思義,這是個簡單的觸發器,有以下特性:
- 重復執行:WithRepeatCount()/RepeatForever()
- 設置間隔時間:WithInterval()
- 定時執行:StartAt()/StartNow()
- 設定優先級:WithPriority(),默認為5
需要注意:當Trigger到達StartAt指定的時間時會執行一次,這一次執行是不包含在WithRepeatCount中的。在我們上面的例子中可以看出,添加調度后會立即執行一次,然后重復三次,最終執行了四次。
CronTrigger
CronTrigger是通過Cron表達式來完成調度的。Cron表達式非常靈活,可以實現幾乎各種定時場景的需要。
關於Cron表達式,大家可以移步 Quartz Cron表達式
使用CronTrigger的示例如下:
var trigger = TriggerBuilder.Create()
.WithCronSchedule("*/1 * * * * ?")
.Build();
日歷:Calendar
Calendar可以與Trigger進行關聯,從Trigger中排出執行計划。例如你只希望在工作日執行作業,那么我們可以定義一個休息日的日歷,將它與Trigger關聯,從而排出休息日的執行計划。
Calendar示例代碼如下:
var calandar = new HolidayCalendar();
calandar.AddExcludedDate(DateTime.Today);
await scheduler.AddCalendar("holidayCalendar", calandar, false, false);
var trigger = TriggerBuilder.Create()
.WithCronSchedule("*/1 * * * * ?")
.ModifiedByCalendar("holidayCalendar")
.Build();
在這個示例中,我們創建了HolidayCalendar日歷,然后添加排除執行的日期。我們把今天添加到排除日期后,該Trigger今天將不會觸發。
監聽器:JobListeners/TriggerListeners/SchedulerListeners
監聽器是Quartz.Net
的另外一個出色的功能,它允許我們編寫監聽器達到在運行時獲取作業狀態、處理作業數據等功能。
JobListener
JobListener可以監聽Job執行前、執行后、否決執行的事件。我們通過代碼進行演示:
public class MyJobListener : IJobListener
{
public string Name { get; } = nameof(MyJobListener);
public Task JobToBeExecuted(IJobExecutionContext context, CancellationToken cancellationToken = default)
{
//Job即將執行
return Task.Factory.StartNew(() =>
{
Console.WriteLine($"Job: {context.JobDetail.Key} 即將執行");
});
}
public Task JobExecutionVetoed(IJobExecutionContext context, CancellationToken cancellationToken = default)
{
return Task.Factory.StartNew(()=> {
Console.WriteLine($"Job: {context.JobDetail.Key} 被否決執行");
});
}
public Task JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException, CancellationToken cancellationToken = default)
{
//Job執行完成
return Task.Factory.StartNew(() =>
{
Console.WriteLine($"Job: {context.JobDetail.Key} 執行完成");
});
}
}
定義完成后,將MyJobListener
添加到Scheduler中:
scheduler.ListenerManager.AddJobListener(new MyJobListener(), GroupMatcher<JobKey>.AnyGroup());
然后我們再運行程序,就可以看到Listener被調用了:
通過圖片可以看到,JobToBeExecuted
和JobWasExecuted
都被執行了,JobExecutionVetoed
沒有執行,那么如何觸發JobExecutionVetoed
呢?請繼續閱讀TriggerListener
的演示。
TriggerListener
TriggerListener可以監聽Trigger的執行情況,我們通過代碼進行演示:
public class MyTriggerListener : ITriggerListener
{
public string Name { get; } = nameof(MyTriggerListener);
public Task TriggerComplete(ITrigger trigger, IJobExecutionContext context, SchedulerInstruction triggerInstructionCode, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
public Task TriggerFired(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
public Task TriggerMisfired(ITrigger trigger, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
public Task<bool> VetoJobExecution(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default)
{
return Task.FromResult(true); //返回true表示否決Job繼續執行
}
}
將MyTriggerListener
添加到Scheduler中:
scheduler.ListenerManager.AddTriggerListener(new MyTriggerListener(), GroupMatcher<TriggerKey>.AnyGroup());
運行代碼可以看到如下效果:
從圖片中可以看到,JobListener中的JobExecutionVetoed
被執行了。
SchedulerListener
ISchedulerListener提供了Job、Trigger管理的監聽,與調度程序相關的事件包括:添加作業/觸發器,刪除作業/觸發器,調度程序中的嚴重錯誤,調度程序關閉的通知等。完整的接口定義如下:
public interface ISchedulerListener
{
Task JobAdded(IJobDetail jobDetail, CancellationToken cancellationToken = default);
Task JobDeleted(JobKey jobKey, CancellationToken cancellationToken = default);
Task JobInterrupted(JobKey jobKey, CancellationToken cancellationToken = default);
Task JobPaused(JobKey jobKey, CancellationToken cancellationToken = default);
Task JobResumed(JobKey jobKey, CancellationToken cancellationToken = default);
Task JobScheduled(ITrigger trigger, CancellationToken cancellationToken = default);
Task JobsPaused(string jobGroup, CancellationToken cancellationToken = default);
Task JobsResumed(string jobGroup, CancellationToken cancellationToken = default);
Task JobUnscheduled(TriggerKey triggerKey, CancellationToken cancellationToken = default);
Task SchedulerError(string msg, SchedulerException cause, CancellationToken cancellationToken = default);
Task SchedulerInStandbyMode(CancellationToken cancellationToken = default);
Task SchedulerShutdown(CancellationToken cancellationToken = default);
Task SchedulerShuttingdown(CancellationToken cancellationToken = default);
Task SchedulerStarted(CancellationToken cancellationToken = default);
Task SchedulerStarting(CancellationToken cancellationToken = default);
Task SchedulingDataCleared(CancellationToken cancellationToken = default);
Task TriggerFinalized(ITrigger trigger, CancellationToken cancellationToken = default);
Task TriggerPaused(TriggerKey triggerKey, CancellationToken cancellationToken = default);
Task TriggerResumed(TriggerKey triggerKey, CancellationToken cancellationToken = default);
Task TriggersPaused(string triggerGroup, CancellationToken cancellationToken = default);
Task TriggersResumed(string triggerGroup, CancellationToken cancellationToken = default);
}
添加SchedulerListener的代碼如下:
scheduler.ListenerManager.AddSchedulerListener(mySchedListener);
持久化:JobStore
Quartz.Net
支持Job的持久化操作,被稱為JobStore。默認情況下,Quartz將數據持久化到內存中,好處是內存的速度很快,壞處是無法提供負載均衡的支持,並且在程序崩潰后,我們將丟失所有Job數據,對於企業級系統來說,壞處明顯大於好處,因此有必要將數據存儲在數據庫中。
ADO.NET存儲
Quartz使用ADO.NET
訪問數據庫,支持的數據庫廠商非常廣泛:
- SqlServer - .NET Framework 2.0的SQL Server驅動程序
- OracleODP - Oracle的Oracle驅動程序
- OracleODPManaged - Oracle的Oracle 11托管驅動程序
- MySql - MySQL Connector / .NET
- SQLite - SQLite ADO.NET Provider
- SQLite-Microsoft - Microsoft SQLite ADO.NET Provider
- Firebird - Firebird ADO.NET提供程序
- Npgsql - PostgreSQL Npgsql
數據庫的創建語句可以在Quartz.Net
的源碼中找到:https://github.com/quartznet/quartznet/tree/master/database/tables
我們可以通過配置文件來配置Quartz使用數據庫存儲:
# job store
quartz.jobStore.type = Quartz.Impl.AdoJobStore.JobStoreTX, Quartz
quartz.jobStore.dataSource = quartz_store
quartz.jobStore.driverDelegateType = Quartz.Impl.AdoJobStore.PostgreSQLDelegate, Quartz
#quartz.jobStore.useProperties = true
quartz.dataSource.quartz_store.connectionString = Server=localhost;Database=quartz_store;userid=quartz_net;password=xxxxxx;Pooling=true;MinPoolSize=1;MaxPoolSize=10;Timeout=15;SslMode=Disable;
quartz.dataSource.quartz_store.provider = Npgsql
負載均衡
負載均衡是實現高可用的一種方式,當任務量變大以后,單台服務器很難滿足需要,使用負載均衡則使得系統具備了橫向擴展的能力,通過部署多個節點來增加處理Job的能力。
Quartz.Net
在使用負載均衡時,需要依賴ADO JobStore,意味着你需要使用數據庫持久化數據。然后我們可以使用以下配置完成負載均衡功能:
quartz.jobStore.clustered = true
quartz.scheduler.instanceId = AUTO
- clustered:集群的標識
- instanceId:當前Scheduler實例的ID,每個示例的ID不能重復,使用AUTO時系統會自動生成ID
當我們在多台服務器上運行Scheduler實例時,需要設置服務器的時鍾時間,確保服務器時間是相同的。針對windows服務器,可以設置從網絡自動同步時間。
通過Routing訪問Quartz實例
通過Routing訪問Quartz實例的功能,為我們做系統分離提供了很好的途徑。
我們可以通過以下配置實現Quartz的服務器端遠程訪問:
# export this server to remoting context
quartz.scheduler.exporter.type = Quartz.Simpl.RemotingSchedulerExporter, Quartz
quartz.scheduler.exporter.port = 555
quartz.scheduler.exporter.bindName = QuartzScheduler
quartz.scheduler.exporter.channelType = tcp
quartz.scheduler.exporter.channelName = httpQuartz
然后我們在客戶端系統中配置訪問:
quartz.scheduler.proxy = true
quartz.scheduler.proxy.address = tcp://localhost:555/QuartzScheduler
開發實踐
理想中的任務調度系統應該是一個后台服務,默無聲息的運行在系統后台,業務系統通過接口完成對任務的添加、刪除等操作。在構架Windows服務時,可以和TopShelf集成完成windows服務的開發。
Install-Package Topshelf
進行服務開發的另外一個問題是,Quartz本身是不支持依賴注入的,而解決依賴注入的問題,則可以使用Autofac,幸運的是已經有大神完成了TopShelf與Autofac的集成,我們只需要使用即可。
Install-Package Topshelf.Autofac
Quartz.Net Job的添加有兩種方式:運行時動態添加和通過配置文件添加。這里推薦使用動態的方式進行添加(示例代碼是采用動態方式進行添加的),除非你的Job是相對固定的。
而對Scheduler的配置可以采用配置文件的方式,方便在部署時進行維護。