golang 第三方定時任務庫 github.com/robfig/cron/v3 核心源碼解讀


定時任務是一個通用場景的功能,在golang中,現在github最為常見的一個第三方定時任務庫就是 github.com/robfig/cron/v3 目前(2020年1月9日) 7.2k Star。
我之前使用Python的時候用慣了apscheduler,切換這個是真的不習慣。

個人博客原文地址

https://www.charmcode.cn/article/2021-01-09_golang_cron_code

感覺github.com/robfig/cron/v3功能太簡陋了,

  • 不支持定時任務持久化,我重啟一下服務,調度任務信息就沒了,需要自己存儲調度信息。
  • 再比如不支持一次定時見issue等,雖然有PR 但是 v3 分支還是依舊不支持,parse文件函數不支持,雖然可以按照作者的說法,調用一次之后再調用移除可以實現。
  • 不支持立即運行,見issue ,作者表示可以調用后手動調用解決,但是我感覺不夠優雅,沒有指定首次運行時間方便。(我突然有種想提PR的沖動,哈哈哈)

綜上,個人感覺這個庫封裝的不是很完善,作為一個golang新手,讀解析一下這個定時任務庫,還是很有收獲的。如果能力允許,以解決以上問題為目標,自己提PR。

注意

文章內容皆為個人見解,並且只看了核心的實現方式,細節部分沒有解析,不保證准確,如果和你的理解有歧義,以你的為准。

前置知識

你需要掌握golang的 goroutine知識,包括channel通信,select多路復用, time.NewTimer等知識,否則解讀起來就會很困難。
time.NewTimer的作用

func main(){
	// Calling NewTimer method
	timer := time.NewTimer(5 * time.Second)

	// Notifying the channel
	<-timer.C

	// Printed after 5 seconds 5秒之后輸出
	fmt.Println("Timer is inactivated")
}

簡單的demo

更多使用demo可以參考 個人Go學習筆記


package _1_demo

import (
	"fmt"
	"github.com/robfig/cron/v3"
	"testing"
	"time"
)

// 定時任務
func jobTask() {
	fmt.Printf( "任務啟動: %s \n",time.Now().Format("2006-01-02 15:04:05"))
}

func TestCron(t *testing.T) {
	// 創建一個cron對象
	c := cron.New()

	// 任務調度
	enterId, err := c.AddFunc("@every 3s", jobTask)
	if err!=nil{
		panic(err)
	}
	fmt.Printf("任務id是 %d \n", enterId)

	// 同步執行任務會阻塞當前執行順序  一般使用Start()
	//c.Run()
	//fmt.Println("當前執行順序.......")

	// goroutine 協程啟動定時任務(看到后面Start函數和run()函數,就會明白啟動這一步也可以寫在任務調度之前執行)
	c.Start()
	// Start()內部有一個running 布爾值 限制只有一個Cron對象啟動 所以這一步多個 c.Start() 也只會有一個運行
	c.Start()
	c.Start()

	// 用於阻塞 后面可以使用 select {} 阻塞
	time.Sleep(time.Second * 9)

	// 關閉定時任務(其實不關閉也可以,主進程直接結束了, 內部的goroutine協程也會自動結束)
	c.Stop()

}

源碼解讀

核心文件主要就是cron.go文件

首先可以看到 c := cron.New() 創建了這個 Cron結構體對象

type Cron struct {
	entries   []*Entry                         // 用於存放job指針對象的數組
	chain     Chain
	stop      chan struct{}                  // 定制調度任務
	add       chan *Entry                    // 添加一個調度任務
	remove    chan EntryID               // 移除 一個調度任務
	snapshot  chan chan []Entry     // 正在運行中的調度任務 
	running   bool                             // 保證整個Cron對象只啟動一次 和啟動后其他chan正常
	logger    Logger                          // 記錄日志
	runningMu sync.Mutex              // 協程鎖,確保執行安全
	location  *time.Location            // 時區
	parser    ScheduleParser           // 解析參數
	nextID    EntryID                         // 下一個調度任務的id
	jobWaiter sync.WaitGroup        // 確保單一的調度任務執行完畢
}

Entry包含那些


// Entry consists of a schedule and the func to execute on that schedule.
type Entry struct {
	// ID is the cron-assigned ID of this entry, which may be used to look up a
	// snapshot or remove it.
	ID EntryID  // 任務調度Id,默認是自增 創建任務時返回

	// Schedule on which this job should be run.
	Schedule Schedule // 調度任務運行

	// Next time the job will run, or the zero time if Cron has not been
	// started or this entry's schedule is unsatisfiable
	Next time.Time       // 下次執行時間

	// Prev is the last time this job was run, or the zero time if never.
	Prev time.Time      // 上次執行時間

	// WrappedJob is the thing to run when the Schedule is activated.
	WrappedJob Job      // 執行的任務

	// Job is the thing that was submitted to cron.
	// It is kept around so that user code that needs to get at the job later,
	// e.g. via Entries() can do so.
	Job Job
}

調度任務enterId, err := c.AddFunc("@every 3s", jobTask) 會使用以下兩個文件來解析定時執行的參數,也就是翻譯給golang 解析@erery 3s是干什么

啟動c.Start()

// Start the cron scheduler in its own goroutine, or no-op if already started.
func (c *Cron) Start() {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		return
	}
	c.running = true
	go c.run()
}

可以看到Start() 執行了三個操作

  • 1 上鎖 最后解鎖
  • 2 判斷此對象的狀態是否正在運行,如果運行了直接 return
  • 3 如果沒有運行,就修改狀態,然后啟動協程運行 run()方法

核心邏輯run()方法

這里需要知道的知識

  • time.NewTimer(d Duration)返回一個sleep指定時間的channel
  • select{} 多路復用阻塞
    • 任意一個case滿足select{}就回直接執行結束
  • sort.Sort 的用法(我這里不是很熟悉)
// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
	c.logger.Info("start")

	// Figure out the next activation times for each entry.
	now := c.now()  // 獲取現在時間
        // 循環調度任務 計算下一次執行時間
	for _, entry := range c.entries {  
		entry.Next = entry.Schedule.Next(now)
		c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
	}
        
        // 最外層死循環 這一層會一直存在
	for {
		// Determine the next entry to run.
		sort.Sort(byTime(c.entries)) // 排序確定下一個要運行的目標

		var timer *time.Timer // 聲明一個Timer 指針變量
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
			// If there are no entries yet, just sleep - it still handles new entries
			// and stop requests.
                        // 如果cron啟動后 還沒有 調度信息的話 就生成一個sleep10W小時的 chan Time,用於阻塞下面的 select{} ,因為`select`是多路復用,其他channel能返回數據時,select就回執行不會阻塞。
                        // 所以當沒有任務時,啟動Start()程序 就會被這個阻塞
			timer = time.NewTimer(100000 * time.Hour)
		} else {
                        // 如果有調度信息,就 sleep 調度任務中第一個的 循環時間 
			timer = time.NewTimer(c.entries[0].Next.Sub(now))
		}
                // 第二層死循環  內部使用select{}阻塞
		for {
			select {
                        // 上一步中的 timer sleep時間如果到了就執行
			case now = <-timer.C:
				now = now.In(c.location)
				c.logger.Info("wake", "now", now)

				// Run every entry whose next time was less than now
				for _, e := range c.entries {
					if e.Next.After(now) || e.Next.IsZero() {
						break
					}
					c.startJob(e.WrappedJob)
					e.Prev = e.Next
					e.Next = e.Schedule.Next(now)
					c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
				}
                        // 向Cron中添加了 一個調度任務就會執行
			case newEntry := <-c.add:
				timer.Stop()
				now = c.now()
				newEntry.Next = newEntry.Schedule.Next(now)
				c.entries = append(c.entries, newEntry)
				c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
                        
			case replyChan := <-c.snapshot:
				replyChan <- c.entrySnapshot()
				continue
                        // 停止定時任務
			case <-c.stop:
				timer.Stop()
				c.logger.Info("stop")
				return
                        // 移除任務
			case id := <-c.remove:
				timer.Stop()
				now = c.now()
				c.removeEntry(id)
				c.logger.Info("removed", "entry", id)
			}
                        // 當以上任意一個channel滿足時,就會結束內層循環 重復上一層步驟
			break
		}
	}
}

自己的總結

這個robfig/cron/v3 這個庫實現定時任務的核心邏輯,就是利用以下幾個點:

  • 主體for循環
    • 循環配合time.NewTimerchannel sleep確保定時任務能定時執行
  • select多路選擇阻塞
    • 確保能在任意時間改變其case中的channel 執行
  • time.NewTimersleep定時時間

疑惑的地方

自己看完還是有很多疑惑的地方,很多函數用法,沒怎么用過比如:

  • snapshot chan chan []Entry
    • 定義一個chan 類型是 chan []Entry ?? 沒怎么見過這種用法
  • 其他函數用法(慢慢學習吧)

總體來說思路設計的很巧妙,感覺如果只是單純的寫web接口的話,很少直接接觸到這樣的設計。
最后順便說一句Golang簡單的語法是真的方便。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM