項目中需要使用一個簡單的定時任務調度的框架,最初直接從GitHub上搜了一個star比較多的,就是 https://github.com/robfig/cron 這個,目前有8000+ star。剛開始使用的時候發現問題不大,但是隨着單機需要定時調度的任務越來越多,高峰期差不多接近500QPS,隨着業務的推廣使用,可以預期增長還會比較快,但是已經遇到CPU使用率偏高的問題,通過pprof分析,很多都是在做排序,看了下這個項目的代碼,整體執行的過程大概如下:
1. 對所有任務進行排序,按照下次執行時間進行排序
2. 選擇數組中第一個任務,計算下次執行時間減去當前時間得到時間t,然后sleep t
3. 然后從數組第一個元素開始遍歷任務,如果此任務需要調度的時間<now,那么就執行此任務,執行之后重新計算這個任務的next執行時間
4. 每次待執行的任務執行完畢之后,都會重新對這個數組進行排序
5. 然后再循環從排好序的數組中找到第一個需要執行的任務去執行。
代碼如下:
for { // Determine the next entry to run. sort.Sort(byTime(c.entries)) var timer *time.Timer if len(c.entries) == 0 || c.entries[0].Next.IsZero() { // If there are no entries yet, just sleep - it still handles new entries // and stop requests. timer = time.NewTimer(100000 * time.Hour) } else { timer = time.NewTimer(c.entries[0].Next.Sub(now)) } for { select { case now = <-timer.C: now = now.In(c.location) c.logger.Info("wake", "now", now) // Run every entry whose next time was less than now for _, e := range c.entries { if e.Next.After(now) || e.Next.IsZero() { break } c.startJob(e.WrappedJob) e.Prev = e.Next e.Next = e.Schedule.Next(now) c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next) } case newEntry := <-c.add: timer.Stop() now = c.now() newEntry.Next = newEntry.Schedule.Next(now) c.entries = append(c.entries, newEntry) c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next) case replyChan := <-c.snapshot: replyChan <- c.entrySnapshot() continue case <-c.stop: timer.Stop() c.logger.Info("stop") return case id := <-c.remove: timer.Stop() now = c.now() c.removeEntry(id) c.logger.Info("removed", "entry", id) } break } }
問題就顯而易見了,執行一個任務(或幾個任務)都重新計算next執行時間,重新排序,最壞情況就是每次執行1個任務,排序一遍,那么執行k個任務需要的時間的時間復雜度就是O(k*nlogn),這無疑是非常低效的。
於是想着怎么優化一下這個框架,不難想到每次找最先需要執行的任務就是從一堆任務中找schedule_time最小的那一個(設schedule_time是任務的執行時間),那么比較容易想到的思路就是使用最小堆:
1. 在初始化任務列表的時候就直接構建一個最小堆
2. 每次執行查看peek元素是否需要執行
3. 需要執行就pop堆頂元素,計算next執行時間,重新push入堆
4. 不需要執行就break到外層循環取堆頂元素,計算next_time-now() = need_sleep_time,然后select 睡眠、add、remove等操作。
我修改為min-heap的方式之后,每次添加任務的時候通過堆的屬性進行up和down調整,每次添加任務時間復雜度O(logn),執行k個任務時間復雜度是O(klogn)。經過驗證線上CPU使用降低4~5倍。CPU從50%左右降低至10%左右。
優化后的代碼如下,只是其中一部分,關鍵部分已經高亮。
全部的代碼也已經在github上已經創建了一個Fork的倉庫並推送上去了,全部單測Case也都PASS。感興趣可以點過去看。https://github.com/tovenja/cron
for { // Determine the next entry to run. // Use min-heap no need sort anymore
// 這里不再需要排序了,因為add的時候直接進行堆調整
//sort.Sort(byTime(c.entries)) var timer *time.Timer if len(c.entries) == 0 || c.entries[0].Next.IsZero() { // If there are no entries yet, just sleep - it still handles new entries // and stop requests. timer = time.NewTimer(100000 * time.Hour) } else { timer = time.NewTimer(c.entries[0].Next.Sub(now)) //fmt.Printf(" %v, %+v\n", c.entries[0].Next.Sub(now), c.entries[0].ID) } for { select { case now = <-timer.C: now = now.In(c.location) c.logger.Info("wake", "now", now) // Run every entry whose next time was less than now for { e := c.entries.Peek() if e.Next.After(now) || e.Next.IsZero() { break } e = heap.Pop(&c.entries).(*Entry) c.startJob(e.WrappedJob) e.Prev = e.Next e.Next = e.Schedule.Next(now) heap.Push(&c.entries, e) c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next) } case newEntry := <-c.add: timer.Stop() now = c.now() newEntry.Next = newEntry.Schedule.Next(now) heap.Push(&c.entries, newEntry) c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next) case replyChan := <-c.snapshot: replyChan <- c.entrySnapshot() continue case <-c.stop: timer.Stop() c.logger.Info("stop") return case id := <-c.remove: timer.Stop() now = c.now() c.removeEntry(id) c.logger.Info("removed", "entry", id) } break } }