robfig/cron是GO語言中一個定時執行注冊任務的package, 最近我在工程中使用到了它,由於它的實現優雅且簡單(主要是簡單),所以將源碼過了一遍,記錄和分享在此。
文檔:http://godoc.org/github.com/robfig/cron,repo: https://github.com/robfig/cron
-
基本玩法
Demo代碼如下,先用cron.New()初始化一個實例,然后調用AddFunc(spec string, cmd func()) 注冊你希望調用的func,第一個參數為調度的時間策略,第二個參數為到時間后執行的方法。robfig/cron支持非常多樣的時間策略(下面的代碼舉了一些例子),最后通過cron.Start()方法啟動。
func TestCronDemo(t *testing.T) { c := cron.New() // 通過AddFunc注冊 c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") }) c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") }) c.AddFunc("CRON_TZ=Asia/Tokyo 30 04 * * *", func() { fmt.Println("Runs at 04:30 Tokyo time every day") }) c.AddFunc("@every 5m", func() { fmt.Println("every 5m, start 5m fron now") }) // 通過AddJob注冊 // var cJob cronJobDemo // c.AddJob("@every 5s", cJob) // 啟動 c.Start() // 停止 c.Stop() } type cronJobDemo int func (c cronJobDemo) Run() { fmt.Println("5s func trigger") return }
上面代碼中,第9、10行的代碼調用方法AddJob(spec string, cmd Job)也可以實現AddFunc注冊的功能,Job是interface,需要入參類型實現方法:Run()。實際上,方法AddFunc內部將參數cmd 進行了包裝(wrapper),然后也是調用方法AddJob進行注冊。
后面介紹都會說成AddJob,等效於AddFunc。
-
AddJob后發生了什么? (主要的數據結構)
對於Cron的整體邏輯,最關鍵的兩個數據結構就是struct Entry和Cron。
每當你用AddJob注冊一個定時調用策略,就會為這個策略生成一個唯一的Entry,不難想象,Entry里會存儲被執行的時間、需要被調度執行的實體Job。
生成entry后,再將entry放到struct Cron的entry列表里,Cron的結構里,主要是一些用來和外部交互的channel,比如通過channel添加、刪除entry等。詳見下面的代碼。
// Entry 數據結構,每一個被調度實體一個 type Entry struct { // 唯一id,用於查詢和刪除 ID EntryID // 本Entry的調度時間,不是絕對時間,在生成entry時會計算出來 Schedule Schedule // 本entry下次需要執行的絕對時間,會一直被更新 // 被封裝的含義是Job可以多層嵌套,可以實現基於需要執行Job的額外處理 // 比如抓取Job異常、如果Job沒有返回下一個時間點的Job是還是繼續執行還是delay Next time.Time // 上一次被執行時間,主要用來查詢 Prev time.Time // WrappedJob 是真實執行的Job實體 WrappedJob Job // Job 主要給用戶查詢 Job Job } // Cron 數據結構,為robfig/cron的運行實體使用的s數據結構 type Cron struct { entries []*Entry // 調度執行實體列表 // chain 用來定義entry里的warppedJob使用什么邏輯(e.g. skipIfLastRunning) // 即一個cron里所有entry只有一個封裝邏輯 chain Chain stop chan struct{} // 停止整個cron的channel add chan *Entry // 增加一個entry的channel remove chan EntryID // 移除一個entry的channel snapshot chan chan []Entry // 獲取entry整體快照的channel running bool // 代表是否已經在執行,是cron為使用者提供的動態修改entry的接口准備的 logger Logger // 封裝golang的log包 runningMu sync.Mutex // 用來修改運行中的cron數據,比如增加entry,移除entry location *time.Location // 地理位置 parser ScheduleParser // 對時間格式的解析,為interface, 可以定制自己的時間規則。 nextID EntryID // entry的全局ID,新增一個entry就加1 jobWaiter sync.WaitGroup // run job時會進行add(1), job 結束會done(),stop整個cron,以此保證所有job都能退出 }
需要注意的是,WrappedJob和chain這兩個成員,這是Cron實現的Job封裝邏輯,目前是解決實際調度Job的異常處理。比如你希望自己的上一個時間點的JobA沒有結束,下一個時間點的JobA就不執行,這個“不執行”的邏輯實現就定義在chain,初始化時通過chain將JobA進行封裝寫入WrappedJob,那么每次JobA調用前會先執行封裝邏輯,進行判斷。
-
Start后發生了什么? (程序的主體)
cron.Start()執行后,cron的后台程序(方法run())就開始運行了。而它的主體,就是一個定時器的實現和到時后的job運行,加上cron里的數據維護。
cron的定時器實現是一個簡潔而典型的業務層實現,着重了解下,具體的流程圖可見下圖。
它的關鍵和值得學習之處是:
-
- 每個entry都包含自己下一次執行的絕對時間
- 先對entries按下次執行時間升序排序,只需要對第一個entry啟動定時器
- 定時器到時,只輪詢entries里需要執行的entries,不需要全部輪詢。
- 且 執行的是當前時間之前的所有job,容錯高;
- 第一個定時器處理結束開啟下次定時器時,也只需要更新執行過的entries的下次執行時間,不需要更新所有的entries
上面的邏輯說完,程序主體已經清晰,除此之外,程序主體里的定時器監聽和其他多個channel共用了select-case,這些channel在struct Cron里能看到,實現了entries的動態添加、刪除、entries快照獲取等功能。代碼結構如下:
將這些操作通過channel讓程序主體來操作,可以有效的減少互斥鎖的使用,也會引入問題,會導致有的job執行時間不是非常精准,導致某些entry被遺漏:
-
- 比如最近的jobA的timer在1ms后就要到時,此時加入一個entry,耗時3ms
- 添加完entry后,再重新啟動timer(還是jobA的timer,此處還利 用了golang的time.NewTimer(d Duration)的入參為負數會立即到時的特點)
- 下次到時的時間必然不是jobA期待的執行時間(理論上晚了2ms)
當然,channel的操作首先是非常簡潔省時的,其次,定時器實現里,會掃描所有當前時間之前的entries來執行,增加了容錯性
-
值得稱贊的細節
-
interface的使用
struct Entry里的Schedule和Cron里的ScheduleParser都是interface,意味着我們是可以自己定制注冊job時的時間策略的格式的,只要自己實現時間策略的解析和獲取方法就好
這讓我想起了以前看過golang里什么時候用interface和struct的討論,我覺得這是個很好的例子:預期對同一個接口有多個實現時就抽象成interface,不知道該不該用就用struct。
-
wrapper的實現
上面有提到,通過對Job的封裝,cron實現了同一個job多次調用時的異常處理等,值得以后在實踐中借鑒。
最后是我加了一點注釋的代碼,https://github.com/jiangz222/cron/tree/comments-v3