序列文章:
-
Celery 源碼解析一:Worker 啟動流程概述
-
Celery 源碼解析二:Worker 的執行引擎
-
Celery 源碼解析三: Task 對象的實現
-
Celery 源碼解析四: 定時任務的實現
-
Celery 源碼解析五: 遠程控制管理
-
Celery 源碼解析六:Events 的實現
-
Celery 源碼解析七:Worker 之間的交互
-
Celery 源碼解析八:State 和 Result
在系列中的第二篇我們已經看過了 Celery 中的執行引擎是如何執行任務的,並且在第三篇中也介紹了任務的對象,但是,目前我們看到的都是被動的任務執行,也就是說目前執行的任務都是第三方調用發送過來的。可能你會有點奇怪,難道除了第三方調用發送,還有其他的調用發送方?是的,Celery 自身也會發送任務,在本文中,你將看到 Celery 如何利用自身的定時機制運行我們設置得定時任務,並且交給 Worker 執行。
定時任務的定義
在開始講解源碼之前,我們不妨先看下我們平常都是怎么定義定時任務的,還是以我們習慣的 Demo 為例:

定義就是這么簡單,這么隨意,但是,想要執行卻是需要我們運行一個定時器,也就是在命令行中啟動 Beater,正常情況下你這么做就可以了:

然后你就會看到一個個的定時任務被發送到 MQ 中,然后被 worker 消化。
定時任務的啟動
上面只是舉了個如何使用的例子,但是,在 Celery 內部是如何處理這些任務才是我們需要關心的真正的點。回想一下在我們第一篇中講 Worker 的啟動流程的文章,有一個很重要的 BootStep 我們還沒有講到,那就是 Worker 的 Beat,但是我在那里排的優先級卻是 2,確實如此,我也是在講完了所有 1 的優先級之后再講它的,所以它可以說重要,也可以說不重要。
既然都說開了,那么就不停下了,直接看看 Beat 的實現,Beat 的實現可以說是非常簡單,我們一眼就可以看完:

核心還是在 create 中咯,然后關鍵還是看 Line 199,這里又牽扯到 celery.beat.EmbeddedService,那我們基本上就可以確定在這了。

敲黑板了,注意看這里,Line 648 就決定了是使用 線程 還是 進程 來運行 Beat 服務,但是我們應該清楚,無論是使用 線程 還是 進程,思路都是相差不遠的,我們可以先找一個來看看。到這里,其實定時任務的啟動工作就算是完成了,因為后面就是以獨立的線程/進程執行了,主線程已經可以回去了。

定時任務的執行
其實無論是用 Thread 還是 Process,這里都是構造的 Service 對象,然后 start 的,那么這個 Service 對象具體是啥,其實也是在這個文件里面,但是我們不急着看它。在看它之前我先給大家描述一下這個文件里面幾個關鍵的類的關系,方便大家了解:

這里就出現了 4 個類,它們之間的關系還是比較明顯的,中樞部分就是 Scheduler,然后 Service 是驅動部分,最后的承載實體就是 SchedulerEntry 了,明白這層關系之后,我們再來看看 Service 是如何驅動的:

這里的靈魂一句就是 Line 557 中的這句循環了,我們知道這段代碼是運行在獨立的線程/進程中的,所以這里是個死循環,而循環的條件就是條件變量 shutdown 被設置了。這里不斷得嘗試做一件事情,這件事情就是調用 scheduler 的 tick 函數,並且根據它返回的值等待片刻,然后繼續執行,所以,關於這個 tick 里面有什么東西,很值得我們關注,從上面的 UML 圖中,我們可以看到 tick 是在 Scheduler 中,所以直接可以找到它:

這段代碼乍一看可能會很復雜,但是實質上很簡單,其中 H 是一個最小堆,它的作用就是承載了所有我們設置得定時任務,而最小堆的特性就是堆頂的元素是最小的,在這里就是 event 這個變量,那么你可能會問排序的依據是啥,排序的依據就是 Line 274 的關鍵詞 next_time_to_run,celery 會先計算每個定時任務下一次執行的時間戳 - 當前時間戳,然后根據這個時間差值進行排序,毫無疑問,差值最小的就是下一次需要執行的任務了。
同樣在 Line 274 這里還做了一個判斷,那就是差值最小的那個任務現在應不應該執行 is_due,如果應該執行,那么 Line 276 - Line 285 就是執行的邏輯了,這里需要注意的一點就是 Line 277 還對出堆的元素進行了判斷,以防不是我們剛才要執行的元素,這里我猜測的原因是這個 H 並不是線程安全的,在我們執行定時任務的時候,還可能有其他線程/進程在修改它,所以需要進行一個判斷。
還有一個值得我們關注的點就是 Line 279 中的提交定時任務,這個也可以說是我的此行的目的,但是,我們已經有了普通異步任務的經驗,相信這里不會讓我們太吃驚。

正如所期待的,這里只是想將 SchedulerEntry 轉換為 Task,然后至於 Task 怎么提交的異步任務,相信看過 第三篇文章的同學已經不陌生了,可以 pass 了。
那么到這里我們也算是將定時任務的執行看完了。
定時任務的持久化
雖然定時任務的執行我們是看完了,但是,定時任務還有一個很重要的地方我們還沒有看,那就是持久化。在 Celery 中,定時任務的執行並不會因為我們重啟了 Celery 而失效,反而在重啟 Celery 之后,Celery 會根據上一次關閉之前的執行狀態,重新計算新的執行周期,而這里計算的前提就是能夠獲取舊的執行信息,而在 Scheduler 中,這些信息都是默認保存在文件中的。
Celery 默認的存儲是通過 Python 默認的 shelve 庫實現的,shelve 是一個類似於字典對象的數據庫,我們可以通過調用 sync 命令在磁盤和內存中同步數據。當然,你也可以自定義存儲的位置,但是目前來看這個 store 存儲適合 PersistentScheduler 綁定的,所以我個人更建議通過自定義 Scheduler 來實現,我曾經在 Github 開源了一個基於 Redis 的實現,感興趣的同學可以看一下,地址是:celery redis beat
所以這個問題就變成了如何自定義 Scheduler,我根據自己的經驗,總結了以下步驟:
- 繼承
Scheduler類,實現構造函數 - 實現
Scheduler的tick()、should_sync()、_do_sync()、close()等方法 - 啟動的時候指定
Scheduler類的包路徑即可
總結
在本篇文章中,我們以 Beat 為觸發點,講解了 Celery 關於定時任務的定義、啟動、執行和持久化。通過本篇文章的介紹,應該可以自己定義或者修改出更好得定時調度器了,同時我們也知道保存在當前目錄下的定時文件有什么用了。
