Celery 源碼解析三: Task 對象的實現


序列文章:

 

Task 的實現在 Celery 中你會發現有兩處,一處位於 celery/app/task.py,這是第一個;第二個位於 celery/task/base.py 中,這是第二個。他們之間是有關系的,你可以認為第一個是對外暴露的接口,而第二個是具體的實現!所以,我們由簡入繁,先來看看對外的接口:

其實這就是個我們聲明 Task 的對象,例如我們使用這么一段代碼:

我們可以看看 add 對象是啥:

In [1]: add
Out[1]: <@task: worker.add of tasks:0x10c9b06d0>

你會發現其實他就是我們的一個 Task 對象,所以你就可以觀察一下我們平時使用這個 add 的形式在里面是如何實現的了,例如我們最常使用的可能就兩種方式了,分別是:

In [2]: add.delay()
In [3]: add.apply_async()

其他你看一下源碼就會發現他們的實現是一樣的,就像這樣:

我們現在很清楚,調用 apply_async 是將我們的 Task 提交到 MQ 中,然后獲得一個 celery.result.AsyncResult 對象,那么具體都做了哪些工作,還是需要進一步查看的。apply_async 的參數有很多,所以我們需要先給他歸個類,這樣就好看多了,概括着看,可以分為這么幾類:

  • AMQP 類:connection、queue、exchange、routing_key、producer、publisher、headers
  • MQ 策略類:countdown、eta、expires、retry、retry_policy、priority、
  • 管理類:shadow、serializer、compression、add_to_parent
  • 其他:args、kwargs、link、link_error、

這樣一看就感覺一目了然多了,AMQP 類的我們就不關注了,畢竟都看了這么多了,應該大家都熟悉了。這里的主要關注點還是在 MQ 策略類和管理類上,着重在 MQ 策略類上,因為管理類的功能稍微比較簡單一些。

async 發送消息

apply_async 中,我們可以看到有兩處執行邏輯,第一處是:

這里是直接調用 apply,然后這里的條件 task_always_eager 是什么意思我們還沒見過,可以看一下文檔:

ok,了解,其實就是說這是個同步的接口,那么我們就可以對應着看到下面這處應該是異步的實現咯:

既然如此,我們一個個得來看。

同步發送消息實現

同步執行消息的一層函數比較簡單,只是簡單的構建了一個 tracer,然后就從 tracer 調用中拿到調用結果,我們看上去會比較舒服:

但是,這個 tracer 的內容就復雜啦,但是這個 build_tracer 的構建函數不需要太過關注,所以我們需要關注的是 build_tracer 返回的這個 tracer 函數,但是這個函數的內容很多,為了簡約一下,所以給大家抽象了一番。同步調用過程中,可以分為幾部分功能,分別是:

  • 信號處理:執行前/后/成功這幾個時刻需要釋放一些信號給感興趣的成員
  • 失敗處理:對於沒有執行的情況需要進行細分處理,例如:reject/ignore/retry/exception 等
  • 依賴處理:因為 Celery 支持一些簡單的依賴,所以執行完成之后需要執行被依賴的 tasks
  • 執行邏輯:這個就是正常的函數調用咯
  • 其他:例如統計執行時間,出入棧之類的

我們就看下任務的執行邏輯是怎么樣的,在代碼里面是很簡單的一個函數調用,其實就是看 Task 對象有沒有實現 __call__ 方法,如果沒有就使用 run 方法:

那 task 的 __call__ 實現也不是太復雜,其實最后調用的也是 run 方法,所以到最后都還是 run 方法的責任啦,但是,這里的基類是不實現 run 方法的,所以這個實現就落實到具體的實現類中了,那么你以為 run 方法會在 celery/app/base.py 中實現?我之前也是這么想的,但是,后來我發現不是的,這個實現其實就是我們在代碼里面使用 @app.task 裝飾的函數,其實就是講我們自定義的函數封裝成 run,這樣調用 run 不就執行的我們的函數了嗎?有意思吧,這個封裝的方式我們后面再說,也就是說同步的方式我們就到此吧,也差不多了。

異步發送消息實現

看完同步的我們再來看看異步的,在說異步的之前,我們先思考一下,異步的應該是怎樣?之前看的時候我猜想異步不就是把 Task 對象塞進 MQ 中么,就應該是這么簡單,但是,看完之后發現還是 too young 了,因為從同步中我們就可以看出,還是有很多功課要完成的,不管怎樣,一起來再看一遍。

從前邊我們說有同步和異步兩種形式那里我們可以發現同步和異步的除了功能不一樣之外,還有調用的對象也不一樣,同步的是調用 Task 自己的方法,也就是說消息被 Task 自己消化了;而異步的確實使用的 Celery 對象的方法,也就是說還得依賴於 Celery 這個 Boss 來實現。這是為啥呢?很明顯嘛,Task 自身沒有關於 MQ 的任何消息,而只有一個綁定的 Celery 對象,所以從抽象層面就只能交給 Celery 了,而 Celery 卻包含了所有你需要的信息,是可以完成這個任務的。

所以,異步的消息到了 Celery 是這么被發出去的:

這里出現了一個我們還沒怎么接觸過的 amqp,但是沒關系,隨着等下的了解,我們會認識到它的,這里的幾個關鍵步驟都是通過 amqp 來完成的,所以我們應該着重看看他們

異步消息體的創建

在 Celery 中,異步消息體是通過 create_task_message 來創建的,我們可以發現,這里是傳了一大堆參數進去,但是,無妨,對於這些參數,我們大部分都在前面見過了,不怵,主要還是需要關注一下內部都為消息體做了什么工作:

這里可以發現兩件事情

  1. 消息體的預處理都是在這里完成的,例如檢驗和轉換參數格式
  2. 構建消息就用了四個屬性:headerspropertiesbodysent_event

這里其實就是所有的構建消息體的代碼了,為什么呢,因為 task_message 是一個 nametuple:

異步消息的發送

異步消息的發送這里不是直接就調用的一個函數,而是動態得創建了一個 sender ,然后才調用這個 sender 發送的(沒搞懂為啥,為了擴展?)。而創建 sender 的邏輯倒是比較簡單,所以忽略了,直接來看真正的 send 操作是如何完成的,其實之前提過了,這里真正的 send 操作就像之前我們看同步的執行邏輯一樣尿性,又臭又長,真的,又臭又長,而且作者自己都加注釋承認了,他的理由是為了性能!

同樣得,為了方便我們的理解,我還是采用抽絲剝繭的方式來給大家介紹一下,首先,我習慣性得分個類:

  • MQ 的各項功能:routing_key/exchange/delivery_mode/retry
  • 任務執行的前后處理:發送前/發送后
  • 真正的發送邏輯
  • 其他

其實重頭戲應該在 MQ 的參數確定上,因為只要這些參數都確定了,消息的發送只是一個 producer.publish 就解決的事情,所以我們花些精力來看看 MQ 的參數都是怎么決定出來的:

  • queue_name
    1. 調用 task.delay 的時候傳的,沒傳並且也沒傳 exchange 那就是 default
    2. 不會出現傳了 exchange 但是不傳 queue
  • routing_key
    1. 調用 task.delay 的時候傳的,沒傳就看 exchange 有沒有,沒有就是 queue 的值了
    2. 如果參數傳了 exchange,那么就是配置中的默認 routing_key
  • exchange:
    1. 調用 task.delay 的時候傳的,沒傳但是 exchange_type 類型是 direct,那么就是 ""
    2. 如果類型不是 direct,那么 queue 有 exchange 就用,沒有就使用默認的
  • delivery_mode
    1. 調用 task.delay 的時候傳的,沒傳就看 queue 里面有沒有,有就用
    2. 沒有就使用默認的
  • retry:
    1. 調用 task.delay 的時候傳了就用,沒傳就用默認的

等這些參數確認完之后,就使用這些參數發送了!

然后這樣子就將消息發出去了,等待 Worker 的接收,而 worker 的接受邏輯我們之前已經看到了,其實還是注冊的 Consumer 的 on_message

附加

在前面我們說如何構建異步消息體的時候,對於消息體只是簡單的用幾個 ... 忽略過,但是,對於整體理解來說,我們不應該忽略他們的實質內容,所以在最后我把他們都羅列出來,前后的會用到的。而且你會發現有點意思的是,對於我們的一個異步調用,task 名和 id 都是放在 headers 里頭的,而參數什么的卻是放在 body 里面,在我自己實現的異步 MQ 里面,這些都是放在 body 里面的,這點我倒是不太欣賞 Celery 的。

headers

properties

body

send_event


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM