Golang開源定時任務調度框架robfig/cron優化


項目中需要使用一個簡單的定時任務調度的框架,最初直接從GitHub上搜了一個star比較多的,就是 https://github.com/robfig/cron 這個,目前有8000+ star。剛開始使用的時候發現問題不大,但是隨着單機需要定時調度的任務越來越多,高峰期差不多接近500QPS,隨着業務的推廣使用,可以預期增長還會比較快,但是已經遇到CPU使用率偏高的問題,通過pprof分析,很多都是在做排序,看了下這個項目的代碼,整體執行的過程大概如下:

1. 對所有任務進行排序,按照下次執行時間進行排序

2. 選擇數組中第一個任務,計算下次執行時間減去當前時間得到時間t,然后sleep t

3. 然后從數組第一個元素開始遍歷任務,如果此任務需要調度的時間<now,那么就執行此任務,執行之后重新計算這個任務的next執行時間

4. 每次待執行的任務執行完畢之后,都會重新對這個數組進行排序

5. 然后再循環從排好序的數組中找到第一個需要執行的任務去執行。

 

 

代碼如下:

for {
        // Determine the next entry to run.
        sort.Sort(byTime(c.entries))

        var timer *time.Timer
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // If there are no entries yet, just sleep - it still handles new entries
            // and stop requests.
            timer = time.NewTimer(100000 * time.Hour)
        } else {
            timer = time.NewTimer(c.entries[0].Next.Sub(now))
        }

        for {
            select {
            case now = <-timer.C:
                now = now.In(c.location)
                c.logger.Info("wake", "now", now)

                // Run every entry whose next time was less than now
                for _, e := range c.entries {
                    if e.Next.After(now) || e.Next.IsZero() {
                        break
                    }
                    c.startJob(e.WrappedJob)
                    e.Prev = e.Next
                    e.Next = e.Schedule.Next(now)
                    c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
                }

            case newEntry := <-c.add:
                timer.Stop()
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                c.entries = append(c.entries, newEntry)
                c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue

            case <-c.stop:
                timer.Stop()
                c.logger.Info("stop")
                return

            case id := <-c.remove:
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed", "entry", id)
            }

            break
        }
    }

問題就顯而易見了,執行一個任務(或幾個任務)都重新計算next執行時間,重新排序,最壞情況就是每次執行1個任務,排序一遍,那么執行k個任務需要的時間的時間復雜度就是O(k*nlogn),這無疑是非常低效的。

於是想着怎么優化一下這個框架,不難想到每次找最先需要執行的任務就是從一堆任務中找schedule_time最小的那一個(設schedule_time是任務的執行時間),那么比較容易想到的思路就是使用最小堆:

1. 在初始化任務列表的時候就直接構建一個最小堆

2. 每次執行查看peek元素是否需要執行

3. 需要執行就pop堆頂元素,計算next執行時間,重新push入堆

4. 不需要執行就break到外層循環取堆頂元素,計算next_time-now() = need_sleep_time,然后select 睡眠、add、remove等操作。

 

我修改為min-heap的方式之后,每次添加任務的時候通過堆的屬性進行up和down調整,每次添加任務時間復雜度O(logn),執行k個任務時間復雜度是O(klogn)。經過驗證線上CPU使用降低4~5倍。CPU從50%左右降低至10%左右

 

優化后的代碼如下,只是其中一部分,關鍵部分已經高亮。

全部的代碼也已經在github上已經創建了一個Fork的倉庫並推送上去了,全部單測Case也都PASS。感興趣可以點過去看。https://github.com/tovenja/cron

    for {
        // Determine the next entry to run.
        // Use min-heap no need sort anymore


     // 這里不再需要排序了,因為add的時候直接進行堆調整
     //
sort.Sort(byTime(c.entries)) var timer *time.Timer if len(c.entries) == 0 || c.entries[0].Next.IsZero() { // If there are no entries yet, just sleep - it still handles new entries // and stop requests. timer = time.NewTimer(100000 * time.Hour) } else { timer = time.NewTimer(c.entries[0].Next.Sub(now)) //fmt.Printf(" %v, %+v\n", c.entries[0].Next.Sub(now), c.entries[0].ID) } for { select { case now = <-timer.C: now = now.In(c.location) c.logger.Info("wake", "now", now) // Run every entry whose next time was less than now for { e := c.entries.Peek() if e.Next.After(now) || e.Next.IsZero() { break } e = heap.Pop(&c.entries).(*Entry) c.startJob(e.WrappedJob) e.Prev = e.Next e.Next = e.Schedule.Next(now) heap.Push(&c.entries, e) c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next) } case newEntry := <-c.add: timer.Stop() now = c.now() newEntry.Next = newEntry.Schedule.Next(now) heap.Push(&c.entries, newEntry) c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next) case replyChan := <-c.snapshot: replyChan <- c.entrySnapshot() continue case <-c.stop: timer.Stop() c.logger.Info("stop") return case id := <-c.remove: timer.Stop() now = c.now() c.removeEntry(id) c.logger.Info("removed", "entry", id) } break } }

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM