1 前言
轉到Go已經將近三個月,寫業務代碼又找到了屬於Go的條件反射了。
后置聲明和多參數返回這些Go風格代碼寫起來也不會那么蹩腳,甚至還有點小適應~
反而,前幾天在寫Java的時候,發現Java怎么啟動這么慢,Java怎么能夠容忍這些用不到的代碼還理直氣壯的躺在那……等等,這些話在哪聽過類似的???
“Go為什么要后置聲明,多別扭啊”
“Go里面為啥要定義這么多的struct,看的頭暈”
……
其實,沒有最好的語言,只有最適合的。
前面《Go語言學習》系列主要介紹了一些Go的基礎知識和相較於Java的一些新特性。后續如果有相關的體會和新的還會繼續更新。
從這篇開始,開始學習Go的一些工具類庫和開源組件,希望在學習這些優秀的開源項目過程中,更深入的了解Go,發現Go的威力。
2 cron簡介
robfig/cron是一個第三方開源的任務調度庫,也就是我們平時說的定時任務。
Github:https://github.com/robfig/cron
官方文檔:https://godoc.org/github.com/robfig/cron
3 cron如何使用
1、新建文件cron-demo.go
package main import ( "fmt" "github.com/robfig/cron" "time" ) func main() { c := cron.New() c.AddFunc("*/3 * * * * *", func() { fmt.Println("every 3 seconds executing") }) go c.Start() defer c.Stop() select { case <-time.After(time.Second * 10): return } }
-
cron.New創建一個定時器管理器
-
c.AddFunc添加一個定時任務,第一個參數是cron時間表達式,第二個參數是要觸發執行的函數
-
go c.Start()新啟一個協程,運行定時任務
-
c.Stop是等待停止信號結束任務
2、在cron-demo.go文件下執行go build
本項目采用go mod進行包管理,所以執行go build命令后,會在go.mod文件中生成對應的依賴版本如圖所示
3、運行cron-demo.go
可以看出每3秒執行一次,直到10秒后過期退出進程,任務結束。
代碼參見項目:go-demo項目(https://github.com/DMinerJackie/go-demo/tree/master/main/src/cron)
看上去這個任務調度還是蠻好用的,那么具體是如何實現的呢,看了下源碼,也是非常的短小精悍,目錄結構如下。
下面通過幾個問題一起看下cron是如何實現任務調度。
4 cron如何解析任務表達式
上例我們看到添加“*/3 * * * * *”這樣的表達式,就能實現每3秒執行一次。
顯然,這個表達式只是對人友好的一種約定表達形式,要真正在指定時間執行任務,cron肯定是要讀取並解析這個c表達式,轉化為具體的時間再執行。
那我們來看看,這個具體是如何執行的。
進入AddFunc函數實現
// AddFunc adds a func to the Cron to be run on the given schedule. func (c *Cron) AddFunc(spec string, cmd func()) error { return c.AddJob(spec, FuncJob(cmd)) }
這只是套了個殼,具體還要進入AddJob函數
// AddJob adds a Job to the Cron to be run on the given schedule. func (c *Cron) AddJob(spec string, cmd Job) error { schedule, err := Parse(spec) if err != nil { return err } c.Schedule(schedule, cmd) return nil }
該函數第一行就是解析cron表達式,順藤摸瓜,我們看到具體實現如下
// Parse returns a new crontab schedule representing the given spec. // It returns a descriptive error if the spec is not valid. // It accepts crontab specs and features configured by NewParser. func (p Parser) Parse(spec string) (Schedule, error) { if len(spec) == 0 { return nil, fmt.Errorf("Empty spec string") } if spec[0] == '@' && p.options&Descriptor > 0 { return parseDescriptor(spec) } // Figure out how many fields we need max := 0 for _, place := range places { if p.options&place > 0 { max++ } } min := max - p.optionals // Split fields on whitespace fields := strings.Fields(spec) // 使用空白符拆分cron表達式 // Validate number of fields if count := len(fields); count < min || count > max { if min == max { return nil, fmt.Errorf("Expected exactly %d fields, found %d: %s", min, count, spec) } return nil, fmt.Errorf("Expected %d to %d fields, found %d: %s", min, max, count, spec) } // Fill in missing fields fields = expandFields(fields, p.options) var err error field := func(field string, r bounds) uint64 { // 抽象出filed函數,方便下面調用 if err != nil { return 0 } var bits uint64 bits, err = getField(field, r) return bits } var ( second = field(fields[0], seconds) minute = field(fields[1], minutes) hour = field(fields[2], hours) dayofmonth = field(fields[3], dom) month = field(fields[4], months) dayofweek = field(fields[5], dow) ) if err != nil { return nil, err } return &SpecSchedule{ Second: second, Minute: minute, Hour: hour, Dom: dayofmonth, Month: month, Dow: dayofweek, }, nil }
該函數主要是將cron表達式映射為“Second, Minute, Hour, Dom, Month, Dow”6個時間維度的結構體SpecSchedule。
SpecSchedule是實現了方法“Next(time.Time) time.Time”的結構體,而“Next(time.Time) time.Time”是定義在Schedule接口中的
// The Schedule describes a job's duty cycle. type Schedule interface { // Return the next activation time, later than the given time. // Next is invoked initially, and then each time the job is run. Next(time.Time) time.Time }
所以,最終可以理解是將cron解析后轉換為下一次要執行的時刻,等待執行。
5 cron如何執行任務
我們知道通過parser.go可以將人很好理解的表達式轉換為cron可以讀懂的要執行的時間。
有了要執行的時間點,那么cron具體是如何執行這些任務的呢?
我們看下Start函數的具體實現
// Start the cron scheduler in its own go-routine, or no-op if already started. func (c *Cron) Start() { if c.running { return } c.running = true go c.run() }
這里會通過判定Cron的running字段是否在運行來巨額聽是否要啟動任務。
顯然這里running是false,因為在調用c.New初始化的時候running被設置為false。
所以,這里新啟一個協程用於執行定時任務,再次順藤摸瓜,我們看到run函數的實現
// Run the scheduler. this is private just due to the need to synchronize // access to the 'running' state variable. func (c *Cron) run() { // Figure out the next activation times for each entry. now := c.now() for _, entry := range c.entries { entry.Next = entry.Schedule.Next(now) } for { // Determine the next entry to run. sort.Sort(byTime(c.entries)) var timer *time.Timer if len(c.entries) == 0 || c.entries[0].Next.IsZero() { // 如果沒有要執行的任務或者第一個任務的待執行時間為空,則睡眠 // If there are no entries yet, just sleep - it still handles new entries // and stop requests. timer = time.NewTimer(100000 * time.Hour) } else { timer = time.NewTimer(c.entries[0].Next.Sub(now)) // 否則新建一個距離現在到下一個要觸發執行的Timer } for { select { case now = <-timer.C: // 觸發時間到,執行任務 now = now.In(c.location) // Run every entry whose next time was less than now for _, e := range c.entries { if e.Next.After(now) || e.Next.IsZero() { break } go c.runWithRecovery(e.Job) e.Prev = e.Next e.Next = e.Schedule.Next(now) } case newEntry := <-c.add: // 添加任務 timer.Stop() now = c.now() newEntry.Next = newEntry.Schedule.Next(now) c.entries = append(c.entries, newEntry) case <-c.snapshot: // 調用c.Entries()返回一個現有任務列表的snapshot c.snapshot <- c.entrySnapshot() continue case <-c.stop: // 任務結束,退出 timer.Stop() return } break } } }
-
進入該函數,首先遍歷所以任務,找到所有任務下一個要執行的時間。
-
然后進入外層for循環,對於各個任務按照執行時間進行排序,保證離當前時間最近的先執行。
-
再對任務列表進行判定,是否有任務如果沒有,則休眠,否則初始化一個timer。
里層的for循環才是重頭戲,下面主要分析這個for循環里面的任務加入和執行。
在此之前,需要了解下go標准庫的timer
timer用於指定在某個時間間隔后,調用函數或者表達式。
使用NewTimer就可以創建一個Timer,在指定時間間隔到達后,可以通過<-timer.C接收值。
package main import ( "fmt" "time" ) func main() { timer1 := time.NewTimer(2 * time.Second) <-timer1.C fmt.Println("Timer 1 expired") timer2 := time.NewTimer(time.Second) go func() { <-timer2.C fmt.Println("Timer 2 expired") }() stop2 := timer2.Stop() if stop2 { fmt.Println("Timer 2 stopped") } }
執行結果為
Timer 1 expired Timer 2 stopped
timer1表示2秒后到期,在此之前都是阻塞狀態,2秒后<-timer1.C接收到信號,執行下面的打印語句。
timer2表示1秒后到期,但是中途被Stop掉了,相當於清除了定時功能。
有了這個背景之后,我們再來看run函數的里層for循環。
接收到c.add信道
case newEntry := <-c.add: // 添加任務 timer.Stop() now = c.now() newEntry.Next = newEntry.Schedule.Next(now) c.entries = append(c.entries, newEntry)
將timer停掉,清除設置的定時功能,並以當前時間點為起點,設置添加任務的下一次執行時間,並添加到entries任務隊列中。
接收到timer.C信道
case now = <-timer.C: // 觸發時間到,執行任務 now = now.In(c.location) // Run every entry whose next time was less than now for _, e := range c.entries { if e.Next.After(now) || e.Next.IsZero() { break } go c.runWithRecovery(e.Job) e.Prev = e.Next e.Next = e.Schedule.Next(now) }
當定任務到點后,time.C就會接收到值,並新開協程執行真正需要執行的Job,之后再更新下一個要執行的任務列表。
我們進入runWithRecovery函數,該函數從函數名就可以看出,即使出現panic也可以重新recovery,保證其他任務不受影響。
func (c *Cron) runWithRecovery(j Job) { defer func() { if r := recover(); r != nil { const size = 64 << 10 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] c.logf("cron: panic running job: %v\n%s", r, buf) } }() j.Run() }
追根溯源,我們發現真正執行Job的是j.Run()的執行。進入這個Run函數的實現,我們看到
func (f FuncJob) Run() { f() }
沒錯,我們要執行的任務一直從AddFunc一直往下傳遞,直到這里,我們通過調用Run函數,將包裝的FuncJob類型的函數通過f()的形式進行執行。
這里說的可能比較模糊,舉個例子,Go里面的閉包定義
func () { fmt.Println("test") }()
如果這里定義后面沒有"()"該函數就不會執行,所以結合這個看上面的定時任務是如何執行就更容易理解了。
6 代碼閱讀體會
1、channel的奧妙
通過channel可以讓感知變得輕而易舉,比如timer.C就像是時間到了,自然會有人來敲門告訴你。而不需要我們自己主動去獲取是否到期了。
2、常用類庫的使用
比如在parser里面我們看到了"fields := strings.Fields(spec)",在日常開發中,我們可以靈活使用這些API,避免自己造輪子的情況。
3、多思考
之前做Java的時候,更多的是沉浸在各種工具和框架的使用,對於這些工具和框架的實現關注的不多。比如從Quartz到Spring Job,我們需要更新的是越來越好用的定時任務工具,而底層的實現升級Spring都幫我們考慮好了。這種對業務對項目有友好的,可以快速的實現業務功能開發,但是對於開發者並不友好,友好的設計麻痹了開發者對於底層原理的深究的欲望。