Celery 源碼解析五: 遠程控制管理


序列文章:

今天要聊的話題可能被大家關注得不過,但是對於 Celery 來說確實很有用的功能,曾經我在工作中遇到這類情況,就是我們將所有的任務都放在同一個隊列里面,然后有一天突然某個同學的代碼寫得不對,導致大量的耗時任務被同時塞進了消息隊列里面,這就悲劇了,這直接導致了其他服務長時間不可用,例如發送登錄短信驗證碼無法使用了,還有支付信息無法同步了等等,反正就是造成了一些不小的影響。

當時我們的處理方式就很被動,只能手動連接上 MQ,然后把消息卸掉,其實也就手動將這些消息拋棄掉,從而讓其他業務的消息可能正常運行。但是,這種方式也只適合當初作為少量流量的情況,對於搭建了大集群和大量任務的消息隊列來說,這種方式是不可想象的,這么做是要死人的,不僅僅是被累垮,上頭的口水都能把你淹了。所以,這個時候,我需要介紹一個 Celery 不太常被人使用的功能——遠程控制。

遠程控制功能

其實 Celery 很早之前就存在控制命令,例如可以使用 Python shell 的 shell 命令,可以查看任務狀態的 status 命令等等,但是這些命令都是本地的,不能讓人覺得有意思,但是,這里有兩個系列的命令很厲害,它們分別是:

  • inspect:主要用於查看 Celery 狀態信息
  • control:主要用於設置 Celery 狀態

例如,我在機器 A 運行着一個 Celery,機器 B 也運行着一個 Celery,機器 C 沒有運行 Celery,但是我可以在機器 C 上查詢機器 A 或者機器 B 上的任務狀態,甚至可以刪除和停止任務,這些都是很簡單可以實現的,但是本文不是講解這些功能的文章,而是解析這些功能的文章,所以有興趣的同學可以參考這份官方文檔繼續了解。

遠程控制功能組件

要實現遠程控制功能,我們需要從宏觀上先看看 Celery 的設計思路,在 Celery 中,采用的是分布式的管理方式,其實沒有太大秘密,就是每個節點之間都是通過廣播/單播進行通信,從而達到協同效果,但是這過程還是有很多不好之處的,值得我們來思考一番。

Celery 每個運行的實例都維護着一個 Control Node,其實就是一個可以接收/發送消息的對象,這個對象的封裝是 Kombu.pidbox.Mailbox.Node,我們就先來看看創建的實現吧。

還是回到第一篇,在 Consumer 的 Blueprint 中,有一個叫做 Control 的 Bootstep,這個就是用於節點管理和通信的,我們來看一下:

其實代碼還是比較簡單的,有兩個地方值得我們關注,分別是:

  • Line 25:這里是構建了一個 Pidbox
  • Line 26 - Line 28:這個 Bootstep 的 start、stop 和 shutdown 方法都是使用的 box 的

所以這個 Pidbox 是什么就很重要了,在我們看之前,不放看下上面的注釋,也許會更容易一些:

雖然這里關系說得很明確了,但是我們還是有必要看看的,畢竟有可能里面有設置什么特殊的東西:

ok,確實還好,很誠實得就是用 Kombu 的 mailbox,但是 kombu 的 mailbox 是什么,可能很多人都沒試過,我之前也沒試過,后來試了一下,感覺還挺有意思的,注意,下面這段可能是現在互聯網上公開的為數不多的可以運行得 Kombu Demo 示例,甚至於講解。

Kombu Mailbox

在 Kombu 中提供了 Mailbox 的實現,它的作用就是通過 Mailbox 我們可以實現不同實例之間的消息發送和處理,具體可以是單播廣播,這個在 Celery 中是作為 Control 的功能使用的,但是,在其他的模型里面,例如 Celery 試圖實現但是沒有實現的 Actor 模型里面也是可以用的。

Anyway,下面還是講講 Kombu 中的 Mailbox 是怎么用的吧,當初找相關的資料費了老大力了,但是,並沒有太大收獲,所以自己總結了一番。在 Kombu 中,Mailbox 中只有一個概念,那就是 Node

  • Node:每個 Node 都是一個實例,互相直接沒有關聯,可以完全獨立,他們通過 mailbox 進行通信

但是,為了測試,我們還會引入一個 client 的概念,但是這個概念不是 Kombu 自己的,而是我為了演示效果添加進來的,所以現在我們應該有兩個地方,分別是:NodeClient,其中可以認為 NodeServer 端,Client 是觸發端,你會發現,Client 只是做了一件觸發的工作,沒有其他更多的事情:

這是 node 的代碼,你會發現它底層其實還是依賴於 Kombu 的 Connection,所以可以看到依賴的還是我們 Celery 里面的 Broker,這點很重要。然后再看看我們是怎么觸發的:

可以發現這里非常簡單,還是通過 Connection 構建出 mailbox,有一點需要注意的就是,Broker 要一致,不然你讓他們怎么通信?執行這段代碼,然后你就會在 node 上看到執行效果了,具體怎樣,體驗之后就明了。

Celery 的遠程控制

看過 Kombu 的實現例子之后,我們來看看 Celery 是怎么構建這些對象的,首先還是得從最開始的 control 開始說起,control 在 Celery 中也是有兩處的,一處是 app/control.py,另外一處就是:woker/control.py,可以認為第一處的是對外的接口,而第二處的是初始化的入口,實現自然就是 Kombu 提供的了,這里只是用到他們而已。

所以,現在來看,我們的目標很簡單了,無非是看

  1. 如何初始化 mailbox 和 Node 的
  2. 提供了那些對外接口可以使用的

下面就這兩個問題進行一一解答

mailbox 的初始化

故事是從 control 這個 Bootstep 開始說起,這是是初始化的起源:

這段代碼我們前面已經見過了,同時我們也已經知道了 self.box 是個啥了,但是,對於更進一步的 c.app.control.mailbox.Node 中的 c.app.control.mailbox 是啥還不知道,不妨來看看:

ok,這里也很清晰,因為這個 control_cls 就是我們后面要看的:

control_cls = 'celery.app.control:Control'

所以 Mailbox 也就是 Kombu 的 Mailbox 了,這里沒做什么改動。除此之外,還有一個地方需要我們去關注的,那就是收到消息后怎么處理,這個得看到 Bootstep 的 start 操作,這里是初始化過程中會被調用到的:

start 的第一句(Line 50)沒毛病,因為用的都是 Celery 的 Connection,然后是第二句,這里我們根據之前的例子,已經很清楚會發生什么事情了,所以關鍵就是 Line 37 中的 on_message,每當有其他消息過來的時候,這里都是處理點。

Line 42 來看,Celery 還是甩鍋給了 Kombu,但是這也不是啥問題,所以我們得找 Kombu 問清楚它是怎么處理的:

ok,這里一直有個叫 clock 的東西,我先不看,后面再說一下,先看看 dispatch 是如何處理的:

Line 99 這里其實是通過是否有發送主體(參數傳過來的 reply_to)來判斷是 單播 還是 多播 的消息,然后選擇不同的處理方式,分別是 Line 118 中的 handle_call 用來處理 單播,而 Line 121 中的 handler_cast 用來處理 多播。這里有一個點,那就是 Line 116 中的 handlers 里面放置了所有注冊的函數的信息,這個我們稍后會看到。

接口的注冊

前面說了,Celery 注冊了很多管理接口給我們使用,我們就看看有那些注冊接口以及這些接口是如何注冊進去的,我們是否可以自定義管理接口。關於接口注冊相關的代碼,我們得走到 celery/worker/control.py 中,現在進來看一看:

這里有兩個注冊函數,其實也就是我們前面說過的對應的兩類操作,分別是 查看設置 類,然后也可以發現,其實注冊就是往 Panel 這個 Dict 里頭寫入一些 key 和 value 對,然而,這里有兩個 Dict 是需要我們關注的,他們分別是:

  • data:key 就是名稱,value 是處理函數
  • meta:key 是名稱,value 是元數據,整個數據描述為:

ok,了解完這些我們就知道了遠程命令的對象和處理函數的對應關系都放在 data 和 meta 里面,這有什么用?回想一下之前 Mailbox 的構造函數的地方:

注意看 Line 28,用的就是這里的 data,然后就直接用來構造 Node 了,現在和前面的關系對應起來,了解了吧?

下面我就找個復雜點的例子看看,是怎么講一個函數注冊進 data 這個 Dict 里面的:

這個功能很明確了,在注釋中已經提及了,但是,我們並不 care 它的功能,我們更多的是關注它是怎么發生的,在 Line 227 這里就是調用注冊函數進行注冊,可以看到,args 分別對應到我們下面的幾個命名參數,然后調用 control_command 之后其實就是直接掛在 Dict 上了,沒有其他操作,需要注意的是 Line 51if args 這個條件,我在整個 Celery 中都沒有看到有使用,所以應該這里是預留的。

重要:有一點值得注意的是,前面也有稍微提到,Celery 的分布式實現機制是廣播,所以我們在單機上發送的命令,只要沒有指定主機,那么都是以廣播的形式發送出去,所有的實例都將受到這個消息,然后根據消息處理本機的事務,所以我們在看代碼的時候需要着重關注這一理念。

遠程控制客戶端

關於控制消息接收和處理的邏輯我們已經看完了,那么我們來看看我們在命令行中敲下命令的時候,這一切是怎么運行起來的。要看這些邏輯,我推薦的入口是:celery/bin/control.py,這是一條典型的 Celery 命令類,這里的結構就比較復雜了,我不多說,直接看最后的結果,那就是調用的時候:

可以發現這里很簡單得就直接用一個 廣播 了事,還有比這更粗暴的么?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM