定時任務是一個通用場景的功能,在golang中,現在github最為常見的一個第三方定時任務庫就是 github.com/robfig/cron/v3 目前(2020年1月9日) 7.2k Star。
我之前使用Python的時候用慣了apscheduler,切換這個是真的不習慣。
個人博客原文地址
https://www.charmcode.cn/article/2021-01-09_golang_cron_code
感覺github.com/robfig/cron/v3功能太簡陋了,
- 不支持定時任務持久化,我重啟一下服務,調度任務信息就沒了,需要自己存儲調度信息。
- 再比如不支持一次定時見issue等,雖然有PR 但是 v3 分支還是依舊不支持,parse文件函數不支持,雖然可以按照作者的說法,調用一次之后再調用移除可以實現。
- 不支持立即運行,見issue ,作者表示可以調用后手動調用解決,但是我感覺不夠優雅,沒有指定首次運行時間方便。(我突然有種想提PR的沖動,哈哈哈)
綜上,個人感覺這個庫封裝的不是很完善,作為一個golang新手,讀解析一下這個定時任務庫,還是很有收獲的。如果能力允許,以解決以上問題為目標,自己提PR。
注意
文章內容皆為個人見解,並且只看了核心的實現方式,細節部分沒有解析,不保證准確,如果和你的理解有歧義,以你的為准。
前置知識
你需要掌握golang的 goroutine知識,包括channel通信,select多路復用, time.NewTimer等知識,否則解讀起來就會很困難。
time.NewTimer的作用
func main(){
// Calling NewTimer method
timer := time.NewTimer(5 * time.Second)
// Notifying the channel
<-timer.C
// Printed after 5 seconds 5秒之后輸出
fmt.Println("Timer is inactivated")
}
簡單的demo
更多使用demo可以參考 個人Go學習筆記
package _1_demo
import (
"fmt"
"github.com/robfig/cron/v3"
"testing"
"time"
)
// 定時任務
func jobTask() {
fmt.Printf( "任務啟動: %s \n",time.Now().Format("2006-01-02 15:04:05"))
}
func TestCron(t *testing.T) {
// 創建一個cron對象
c := cron.New()
// 任務調度
enterId, err := c.AddFunc("@every 3s", jobTask)
if err!=nil{
panic(err)
}
fmt.Printf("任務id是 %d \n", enterId)
// 同步執行任務會阻塞當前執行順序 一般使用Start()
//c.Run()
//fmt.Println("當前執行順序.......")
// goroutine 協程啟動定時任務(看到后面Start函數和run()函數,就會明白啟動這一步也可以寫在任務調度之前執行)
c.Start()
// Start()內部有一個running 布爾值 限制只有一個Cron對象啟動 所以這一步多個 c.Start() 也只會有一個運行
c.Start()
c.Start()
// 用於阻塞 后面可以使用 select {} 阻塞
time.Sleep(time.Second * 9)
// 關閉定時任務(其實不關閉也可以,主進程直接結束了, 內部的goroutine協程也會自動結束)
c.Stop()
}
源碼解讀
核心文件主要就是cron.go文件
首先可以看到 c := cron.New() 創建了這個 Cron結構體對象
type Cron struct {
entries []*Entry // 用於存放job指針對象的數組
chain Chain
stop chan struct{} // 定制調度任務
add chan *Entry // 添加一個調度任務
remove chan EntryID // 移除 一個調度任務
snapshot chan chan []Entry // 正在運行中的調度任務
running bool // 保證整個Cron對象只啟動一次 和啟動后其他chan正常
logger Logger // 記錄日志
runningMu sync.Mutex // 協程鎖,確保執行安全
location *time.Location // 時區
parser ScheduleParser // 解析參數
nextID EntryID // 下一個調度任務的id
jobWaiter sync.WaitGroup // 確保單一的調度任務執行完畢
}
Entry包含那些
// Entry consists of a schedule and the func to execute on that schedule.
type Entry struct {
// ID is the cron-assigned ID of this entry, which may be used to look up a
// snapshot or remove it.
ID EntryID // 任務調度Id,默認是自增 創建任務時返回
// Schedule on which this job should be run.
Schedule Schedule // 調度任務運行
// Next time the job will run, or the zero time if Cron has not been
// started or this entry's schedule is unsatisfiable
Next time.Time // 下次執行時間
// Prev is the last time this job was run, or the zero time if never.
Prev time.Time // 上次執行時間
// WrappedJob is the thing to run when the Schedule is activated.
WrappedJob Job // 執行的任務
// Job is the thing that was submitted to cron.
// It is kept around so that user code that needs to get at the job later,
// e.g. via Entries() can do so.
Job Job
}
調度任務enterId, err := c.AddFunc("@every 3s", jobTask) 會使用以下兩個文件來解析定時執行的參數,也就是翻譯給golang 解析@erery 3s是干什么
- https://github.com/robfig/cron/blob/v3/parser.go 解析自定義的定時參數 比如@erery等
- https://github.com/robfig/cron/blob/v3/spec.go 解析 crontab 表達式
計算調度任務的間隔時間等。
啟動c.Start()
// Start the cron scheduler in its own goroutine, or no-op if already started.
func (c *Cron) Start() {
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
return
}
c.running = true
go c.run()
}
可以看到Start() 執行了三個操作
- 1 上鎖 最后解鎖
- 2 判斷此對象的狀態是否正在運行,如果運行了直接 return
- 3 如果沒有運行,就修改狀態,然后啟動協程運行 run()方法
核心邏輯run()方法
這里需要知道的知識
time.NewTimer(d Duration)返回一個sleep指定時間的channel- select{} 多路復用阻塞
- 任意一個case滿足select{}就回直接執行結束
- sort.Sort 的用法(我這里不是很熟悉)
// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
c.logger.Info("start")
// Figure out the next activation times for each entry.
now := c.now() // 獲取現在時間
// 循環調度任務 計算下一次執行時間
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
}
// 最外層死循環 這一層會一直存在
for {
// Determine the next entry to run.
sort.Sort(byTime(c.entries)) // 排序確定下一個要運行的目標
var timer *time.Timer // 聲明一個Timer 指針變量
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// If there are no entries yet, just sleep - it still handles new entries
// and stop requests.
// 如果cron啟動后 還沒有 調度信息的話 就生成一個sleep10W小時的 chan Time,用於阻塞下面的 select{} ,因為`select`是多路復用,其他channel能返回數據時,select就回執行不會阻塞。
// 所以當沒有任務時,啟動Start()程序 就會被這個阻塞
timer = time.NewTimer(100000 * time.Hour)
} else {
// 如果有調度信息,就 sleep 調度任務中第一個的 循環時間
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
// 第二層死循環 內部使用select{}阻塞
for {
select {
// 上一步中的 timer sleep時間如果到了就執行
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
// 向Cron中添加了 一個調度任務就會執行
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
// 停止定時任務
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
// 移除任務
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
// 當以上任意一個channel滿足時,就會結束內層循環 重復上一層步驟
break
}
}
}
自己的總結
這個robfig/cron/v3 這個庫實現定時任務的核心邏輯,就是利用以下幾個點:
- 主體
for循環- 循環配合
time.NewTimerchannel sleep確保定時任務能定時執行
- 循環配合
select多路選擇阻塞- 確保能在任意時間改變其case中的channel 執行
time.NewTimersleep定時時間
疑惑的地方
自己看完還是有很多疑惑的地方,很多函數用法,沒怎么用過比如:
- snapshot chan chan []Entry
- 定義一個chan 類型是 chan []Entry ?? 沒怎么見過這種用法
- 其他函數用法(慢慢學習吧)
總體來說思路設計的很巧妙,感覺如果只是單純的寫web接口的話,很少直接接觸到這樣的設計。
最后順便說一句Golang簡單的語法是真的方便。
