前言
本篇會繼續講解Sikiro.SMS.Job服務的實現,在我寫第一篇的時候,我就發現我當時設計的架構里Sikiro.SMS.Job這個可以選擇不需要,而使用MQ代替。但是為了說明調度任務使用實現也堅持寫了下。后面會一篇針對架構、實現優化的講解。
源碼地址:https://github.com/SkyChenSky/Sikiro.SMS
Quartz的簡介
Quartz.NET是一款功能齊全的開源作業調度框架,小至的應用程序,大到企業系統都可以適用。Quartz是作者James House用JAVA語言編寫的,而Quartz.NET是從Quartz移植過來的C#版本。
Quartz.Net的作用
- Quartz.Net是多線程的,允許多個JOB同時執行。
- Quartz.Net可以進行持久化,結合管理后台可以進行可視化的監控
- Quartz.Net提供API進行遠程操控,結合管理后台可以進行運維管理
在一般企業,可以利用Quartz.Net框架做各種的定時任務,例如,數據遷移、跑報表等等。
Cron表達式
| 字段名 | 是否必填 | 值范圍 | 特殊字符 |
|---|---|---|---|
| Seconds | YES | 0-59 | , - * / |
| Minutes | YES | 0-59 | , - * / |
| Hours | YES | 0-23 | , - * / |
| Day of month | YES | 1-31 | , - * ? / L W |
| Month | YES | 1-12 or JAN-DEC | , - * / |
| Day of week | YES | 1-7 or SUN-SAT | , - * ? / L # |
| Year | NO | empty, 1970-2099 | , - * / |
缺點
Quartz.Net的缺點很明顯,沒有自帶的管理后台,而同款的Hangfir調度任務框架則會有更加良好的易用性。但是在Github上有不少人開源了Quartz.Net的管理后台,對此作為了彌補。
其他
其他Quartz.Net的信息可以看我之前記錄的一篇文章《Quartz.NET的使用(附源碼)》
Quartz.Net DEMO:https://github.com/SkyChenSky/QuartzDotNetDemo.git
業務流程

從MongoDB持久化的數據,查詢出狀態為待處理並且定時時間小於當前時間的數據。通過Mongo驅動提供的FindOneAndUpdate對文檔進行原子性操作(更新中間狀態並查詢出剛更新的文檔)。如果有數據則發送到MQ,由Sikiro.SMS.Bus進行訂閱發送,因為本次有數據,我認為可能還會有其他需要發送的數據,因此立刻調用JOB自身方法,進行下一條需要處理的數據進行發送。如果此次JOB的執行並沒有數據,那么認為接下來一段時間沒有需要處理的數據,這次調度結束。
TimeSendSms示例
public class TimeSendSms : BaseJob { private readonly SmsService _smsService; private readonly IBus _bus; public TimeSendSms(SmsService smsService, IBus bus) { _smsService = smsService; _bus = bus; } protected override void ExecuteBusiness() { _smsService.GetToBeSend(); if (_smsService.Sms != null) _bus.Publish(_smsService.Sms.MapTo<SmsModel, SmsQueueModel>()); _smsService.ContinueDo(ExecuteBusiness); } protected override void OnException() { _smsService.RollBack(); } }
模板模式
Job的輪詢處理流程基本相似,查詢出需要執行數據-遍歷業務處理-如果有異常則特殊處理,因此針對類似流程相同,但是實現有差異的程序,我們可以使用模板模式。
public abstract class BaseJob : IJob { private void OnException(Action action) { try { action(); } catch (Exception e) { e.WriteToFile(); OnException(); } } public Task Execute(IJobExecutionContext context) { OnException(ExecuteBusiness); return null; } protected virtual void OnException() { } protected abstract void ExecuteBusiness(); }
Mongo的原子性
原子性
原子是物理概念,指的是指化學反應不可再分的基本微粒。而計算機領域的原子性強調的對象是操作(指令、事務)。我們所說的指令組是原子操作,意思要么一起成功,要么一起失敗。不允許2個指令里,一個成功一個失敗的情況存在。
MongoDB 原子操作
MongoDB的原子操作就是要么這個文檔完整的保存到Mongodb,要么沒有保存到Mongodb,不會出現查詢到的文檔沒有保存完整的情況。
MongoDB的文檔的保存,修改,刪除等操作都是原子性,除此之外還提供了FindOneAndDelete、FindOneAndUpdate、FindOneAndReplace等原子操作。
以FindOneAndUpdate為例,對某文檔FindOneAndUpdate,可以文檔B進行Update操作完成后返回出文檔B的結果,根據參數返回結果是更新前還是更新后(一般我們需要更新后)。
而這FindOneAndUpdate的操作對於我們更新到中間狀態的非常實用:
- 避免進行Update后無法良好的查詢到剛Update的文檔
- 避免應用集群部署時批量更新后,無法良好分配任務
- 批量更新多個文檔需要isolated標識隔離,全局鎖在大並發情況下性能並不樂觀
雖然以上可以通過更新時標識版本號進行解決,這無疑增加實現難度。
MongoDB鎖機制
Mongodb並發操作又讀寫鎖來進行控制。
簡單來說
當進行讀操作的時候會加讀鎖,這個時候其他讀操作可以也獲得讀鎖,但是不能加寫鎖,也就是說不能進行寫操作。
當進行寫操作的時候會加寫鎖,這個時候其他操作無法加任何鎖,也就是說不能進行其他的讀操作和寫操作。
多個JOB的並發性
綜上所述,落實到我們應用場景,在部署多個調度任務服務,或者JOB多個線程去跑時,我們可以使用FindOneAndUpdate,每個調度任務每次只處理一個文檔,Update操作的時候會進行寫鎖阻塞其他進程(進程)的寫操作。那么就可以保證每個調度任務都可以只處理唯一一個有效的文檔,避免重復處理。
下面是我的Sikiro.Nosql.Mongo的FindOneAndUpdate封裝示例,因為Update字段的不友好,所以我封裝了一下Lambda表達式,ReturnDocument = ReturnDocument.After標識響應數據是更新前還是更新后的文檔。
public T GetAndUpdate<T>(string database, string collection, Expression<Func<T, bool>> predicate, Expression<Func<T, T>> updateExpression) { var db = _mongoClient.GetDatabase(database); var col = db.GetCollection<T>(collection); var updateDefinitionList = MongoExpression<T>.GetUpdateDefinition(updateExpression); var updateDefinitionBuilder = new UpdateDefinitionBuilder<T>().Combine(updateDefinitionList); return col.FindOneAndUpdate(predicate, updateDefinitionBuilder, new FindOneAndUpdateOptions<T, T> { ReturnDocument = ReturnDocument.After });
SQL Server的UpdateSelect
SQL Server的操作也具有上述FindOneAndUpdate的功能,我們公司成他為UpdateSelect,下面是示例代碼:
UPDATE TOP ( 100 )
SYS_USER WITH ( UPDLOCK, READPAST )
SET USER_STATUS = 1
OUTPUT INSERTED.[USER_NAME] ,
INSERTED.SYS_USERID ,
INSERTED.EMAIL
FROM SYS_USER
WHERE CREATE_DATETIME < '2018-09-13'
AND USER_STATUS = 2;
結尾
本篇介紹了調度任務結合MongoDB原子操作的使用,使得調度任務服務可以具有良好的伸縮性。如果有任何建議與問題可以在下方評論反饋給我。
