Celery 源碼解析六:Events 的實現


序列文章:

 

在 Celery 中,除了遠程控制之外,還有一個元素可以讓我們對分布式中的任務的狀態有所掌控,而且從實際意義上來說,這個元素對 Celery 更為重要,這就是在本文中將要說到的 Event。

在 Celery 中,注冊了很多的 Event,這些 Event 將會在 Task/Worker 的狀態發生變化的時候被發出,然后被綁定的 Event 消費者(Receiver)所接受,綁定的 Event 消費者可以是一連串的回調函數,相信細心的同學在前面的源碼解析過程中也有發現一些關於 event 的蛛絲馬跡,但是,我都是忽略了先,下面就正式得給大家介紹 Event。

Event 有什么用

前面說了,Celery 在 Task/Worker 的狀態發生變化的時候就會發出 Event,所以,一個很明顯的應用就是監控 Event 的狀態,例如 Celery 大家所熟知的基於 WebUI 的管理工具 flower 就用到了 Event,但是,這也是一個比較明顯的應用,除此之外,我們還可以利用 Event 來給 Task 做快照,甚至實時對 Task 的狀態轉變做出響應,例如任務失敗之后觸發報警,任務成功之后執行被依賴的任務等等,總結一下,其實就是:

  1. 對 Task 的狀態做快照
  2. 對 Task 的狀態做實時處理
  3. 監控 Celery(Worker/Task) 的執行狀態

Event 的實現

了解完 Event 的功能之后,我們這里直接跳過了 Event 的使用實例,因為這個可以不用實例,而是我們根據前面的介紹,然后我們就明白了需要了解一下:

  1. Event 是如何產生的
  2. Event 的傳遞機制是如何實現的
  3. Event 的處理機制如何

我也將遵循這幾個問題的順序對 Celery 的實現進行一個總結。

Event 是如何產生的

現在我們已經知道了 Event 是 Task/Worker 產生的,所以出處必然在這些實現中,這就毫無難度了。不妨,我們就從最簡單的地方出發,看看 Worker 的 Event 是如何產生的,據我所知,現在 Worker 擁有的 Event 有三個:

  • worker-online
  • worker-heartbeat
  • worker-offline

對於 worker-online 那么應該就是在 Worker 的啟動過程中,所以我們還是回到第一篇文章中的介紹,看看里面有什么可以參考的。如果你回去看了的話,肯定會發現 Consumer 這個 Blueprint 里面有一個叫做 EventBootstep,這里很可疑,不妨去看看:

well,這里看上去沒啥有意思的,但是,看 Line 26 我們可以肯定的一點是 Event 是否可用還會取決於我們是否允許 gossip,這個是啥我們還不知道,但是無妨,先繼續看下去,這里還有一個東西值得我們關注,那就是 event_dispatcher,但是這里還沒啥可看的,畢竟是 None 嘛。

我們只是看到了冰山一角,繼續看看 start 又在干嘛:

這里第一句上來就是 close,有點蒙蔽啊,啥都不知道你就先上來 close 了,是不是很被動,沒關系,我告訴你這里是干啥的,這里就是清除 Celery 之前的 event_dispatcher,然后將之前的 event_dispatcher 返回回來,返回來干啥?在 Line 47 會根據之前的配置設置新的 event_dispatcher 啊,至於你先知道 event_dispatcher 是啥,看 Line 36 就知道啦,可以看到這就是一個 Dispatcher 的對象,所以我們需要關注一下這個對象了。

但是由於 Dispatcher 這個類太復雜,我就不一一攤開講了,不妨看看我們需要面對的幾個方法,第一個是 extend_buffer,看看:

這里的 _outbound_buffer 是一個 deque,所以我們可以知道其實就是將舊的 event 繼承過來,替別人背一下鍋。繼續看看 flush 在干些啥:

喲,這個稍微復雜點了,但是無妨,還是要看看,Line 204 這里只是簡單得將 deque 轉換為 list,然后 Line 207 、208 這里有點意思啦,這里就是發送 Event 了!!!難道我們已經完成任務了?已經發現了如何產生消息了?但是,馬上我們在后面又發現了還有一個 groups 的東西,這里發送消息又不一樣?不管了,先來看看 _publish 干啥:

看一下 _publish 的代碼,感覺沒了意思,又是使用 AMQP,Celery 這是講 MQ 貫徹到底啊!那似乎沒辦法了,這里就算完了,但是,我們的事情卻還沒完,因為這里都是針對的舊的任務,我們希望看到的 worker-online 還沒看到呢,但是 Bootstep 的工作卻是完成了,似乎這里線索就斷了。

但是,同樣細心的同學可能會記得,我們之前曾經說過一個叫做 HeartBootstep,它的職責是干啥來着?如果忘記了,不妨回到第一篇回顧一下,記得的話,我們進代碼看看,哈哈哈

nice,你會發現,這個 Bootstep 是依賴於 Events 的,同時在 Line 29 中給你會發現就用到了我們剛剛初始化的 event_dispatcher,然后就調用 start 了,所以不妨一起看看:

嘿嘿,看到沒,這里就是 worker-online 的發生地了,而且我們還順便捕捉到了 worker-heartbeat 這個 Event,so lucky,但是有個地方我們不明的,那就是這個 _send 干了什么,如果不出意料的話,應該是調用的 dispatcher._publish,走,看看去:

好,並沒有按照我的套路來,調用的居然是 event_dispatchersend,那么它和 _publish 有什么區別呢?不妨一起看一看:

這里和 _publish 的唯一不一樣的地方就是做了緩存處理,就是在 Line 185 這里,如果需要緩存,那么緩存一波,在 Line 192 這里如果緩存滿了,那么就發送唄。有一點值得注意的就是在 Line 198,這里調用的是 publish 而不是 _publish,搞那么多事,那么這里有有什么不一樣?

好唄,從這里看似乎除了對 clock 進行一個操作之外,沒有其他特殊之處,那么這個 clock 又是什么,起到什么作用呢?略微查找就知道了,這又是 Kombu 的東西,然后看解釋就知道了這是一個 Counter,可以用來給 Consumer 判定是否接受這個 Event 用的,所以我們可以先 pass。所以,總得來說,我們可以發現,這里已經對 Event 的產生有了一定了解了,這里可以產生的一個比較明顯的問題點就是:Celery 中 Event 的 send、publish 和 _publish 的區別是啥?

消息的傳遞機制

在跟蹤 Event 的產生的過程中我們已經順便將 Event 的發送給看了,其實還是 Kombu 的 AMQP 在作用,然后通過 Connection 發送到對應的 MQ 中,再后面就被 Consumer 接收,全鏈路就是這樣:

Event<Producer> ------> MQ ---------> Event<Consumer>(Receiver)

前半部分我們已經清楚了,但是后半部分還不清楚,所以我們的重點就是看看后半部分具體是怎么做的。但是后半部分要從何處入手這是個問題,我這里省去了查找的過程,直接說一下入口吧,位置就在 celery/bin/events.py,對於任一一種 Events,我們需要關注的是 run_evtop 這個函數,所以先來看看:

這里很簡短,繼續跟下去看看咯:

這里有點意思了,但是還是可以比較簡單得看到 Line 529 是關鍵所在:

看到這里我們就該偷笑了,看到 while 1 就意味差不多到最后了,哈哈,Line 508 使用的是 read 的 Connection,然后 Line 512 創建了一個 Receiver,在 Line 515 進行 capture,所以我們可以斷定,我們想找的就在這兩句里面了,直接看 Line 515 吧:

這里有點意思的就是又是遇到 Kombu 的鍋:

class kombu.mixins.ConsumerMixin[source]
Convenience mixin for implementing consumer programs.

It can be used outside of threads, with threads, or greenthreads (eventlet/gevent) too.

The basic class would need a connection attribute which must be a Connection instance, and define a get_consumers() method that returns a list of kombu.Consumer instances to use. Supporting multiple consumers is important so that multiple channels can be used for different QoS requirements.

這里其實是有多個 EventReceiver 綁定了這個 Connection,然后 ConsumerMixin 幫助協調這些 Receiver,每個 Receiver 都可以收到這些 Event,但是能不能處理就看他們的 routing_key 設置得好不好了。

Event 的處理機制

看完 Event 的接收機制我們知道了 Event 是以 AMQP 的形式接收的,那么毫無以為,處理邏輯應該在回調機制上回調的,所以關鍵還是在於 Line 512 中的 handlers,我們來看着:

Receiver 中的 process 我們發現了他是這么用 handlers 的,那么沒問題,state.event 才是最后的關鍵,state 中間做了兩層的封裝,到最后就成了 _create_dispatcher,但是同樣得,這個函數也是比較復雜,所以我這里對他進行精簡一下,概括起來是這樣的:

  1. 先找 group 的 handler,有的話就用這個了,否則看下面;這個默認是沒東西的,所以可以先pass
  2. 如果是 worker 的 Event,就執行 worker 對應的處理
  3. 如果是 task 的 Event,就執行 task 的對應處理

OK,這差不多就是 Event 的內容啦,關於 Event,后面有更精彩的應用會說到,不知道用 Celery 的同學平時對這個特性有沒有使用的場景?

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM