Celery基本原理探討


本文對Celery進行了研究,由於其實現相對比較復雜沒有足夠的時間和精力對各方各面的源碼進行分析,因此本文根據Celery的使用方法以及實際行為分析其運行原理,並根據查閱相關代碼進行了一定程度的驗證。
希望本文能有助於讀者理解celery是如何工作的,從而能夠更好地使用這個任務框架,而不僅僅是復制官網上的例子來配置。

Celery是Python中任務隊列的事實標准。其特點在於:

  • 啟動后,本身是一個任務分發進程,會啟動若干個worker進程完成任務
  • 需要依賴一個消息隊列來負責任務從客戶端到Celery進程的派發。這樣的好處是,客戶端代碼只需要向MQ中派發任務請求以及參數,Celery進程就可以從MQ中讀取消息並派發給worker,從而達到了客戶端程序與Celery進程解耦的效果。而且Celery進程並不需要監聽任何端口,減少了配置的復雜性。常用的消息隊列實現可以使用RabbitMQ,Redis等等。

下面我們結合Celery的基本使用來分析一下Celery是怎么工作的。本文以Python2為例。

1. 定義Celery配置文件並啟動

首先,我們需要定義我們的Celery進程訪問哪個Redis進程(假設我們使用Redis作為message backend,在celery的術語中叫做broker)。
Celery提供的方式是創建一個celery instance。我們假設文件目錄如下:

lab
    - play
        - __init__.py
        - celery.py
        - tasks.py

然后創建lab/play/celery.py文件:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('play',
    broker='redis://127.0.0.1:6379',
    include=['play.tasks'])

if __name__ == '__main__':
    app.start()

由於可能會有多個celery進程訪問同一個redis,為了讓它們之間隔離開就需要給每個celery實例一個名字,我們這里就叫play
除了name和broker參數以外,還使用了include參數來告訴所有的works到哪里去import tasks的代碼,因為workers才是真正執行所有這些任務的單位。

好了,接下來就可以啟動celery了。在lab目錄下執行:

celery -A play.celery worker -l info

即可啟動celery進程。Python的路徑和模塊系統還是比較復雜的,因此在指定包名的時候要注意。

除了使用celery命令以外,由於我們再celery.py中已經加了if __name__ == '__main__':部分代碼,因此也可以在lab下直接執行:
python -m play.celery -A play.celery worker -l info

在啟動了celery以后,celery進程監聽redis消息,並fork出多個worker進程准備將監聽到的消息分發給它們執行。

2. 編寫任務並執行

現在執行的部分有了,我們開始定義真正需要執行的部分。

我們可以專門寫一個文件來存放任務代碼(也可以直接寫在celery.py里面):

# lab/play/tasks.py
from __future__ import absolute_import, unicode_literals
import time
from celery import Celery

app = Celery('play',
    broker='redis://127.0.0.1:6379')

@app.task
def say_hi():
    print 'hi!'

使用另一個Python進程(也可以使用交互式python或者ipython),在lab下執行:

>>> from play.tasks import say_hi
>>> say_hi.delay()
>>> <AsyncResult: db6737ba-ecee-4fd2-8227-a76c594ba338>
>>>

結果就是say_hi函數向消息隊列中發出了一個調用請求由某個worker執行。Celery進程會輸出:

[2017-09-03 13:49:57,340: INFO/MainProcess] Received task: play.tasks.say_hi[85ff01ca-d7c9-4401-bfa3-0a9ad96c7192]  
[2017-09-03 13:49:57,343: WARNING/ForkPoolWorker-1] hi!
[2017-09-03 13:49:57,344: INFO/ForkPoolWorker-1] Task play.tasks.say_hi[85ff01ca-d7c9-4401-bfa3-0a9ad96c7192] succeeded in 0.0016004400095s: None

現在我們來分析一下tasks.py這個文件。很奇怪的一點是,一上來我們又創建了一個app實例。當我們import了task文件后會不會又創建了一個celery進程呢?答案是不會的,因為只有調用了app.start()才會啟動。這只有手動調用或者借助celery命令執行后才會發生。如果只是new了一個instance出來,相當於創建了一個配置文件,不會發生任何重要的實質性的操作。
但是這個app對象也不是什么都不干的。接下來我們定義了兩個task函數,並將這個兩個函數使用@app.task包裝了起來。這樣的效果是把這兩個普通函數包裝成了celery的task對象,這樣他們就有了delay方法。當我們執行delay方法時,這些task會找自己所屬的那個celery instance,從中獲取配置信息(主要是broker的地址)后將調用請求發往消息隊列。

不過,這樣定義task的方法並不是很好,因為需要在代碼中就顯式將task函數和一個具體的celery instance綁定了起來。這就使得我們無法復用這些tasks。因此我們可以使用celery的另一種定義tasks的方式來重寫我們現有的代碼(這也是推薦給django使用的方案):

from __future__ import absolute_import, unicode_literals
import time
from celery import shared_task

@shared_task
def say_hi():
    print 'hi!'

這里我們不再創建app實例,而是直接使用@shared_task來包裝。這樣就沒有綁定哪個app的問題了。但是正如我們之前所說,在調用tasks的時候,task還是會去尋找自己屬於哪個celery instance從而獲取配置信息。如果你都不綁定app instance,配置信息哪里來呢?

答案是,tasks和celery instance之間仍然具有綁定或關聯的關系,只不過不再是顯式的了。簡單來說,每個celery instance被創建以后,它就會被自動的注冊到某個全局的位置。當一個shared task被執行時,這個task就會自己去這個全局的位置找有哪些celery instances可以從中獲取配置信息。如果有多個celery instance都注冊了,那么可能它們的消息隊列都會被這個task發消息(沒有確認過,只是猜測。但這可能就是shared_task的來源)。這就意味着,只要在我們Python進程的任何一個地方(對Django服務器進程也是如此),只要隨便哪個地方創建一個celery instance就可以,然后只要import tasks然后使用delay執行即可。這樣就解決了celery tasks復用的問題。代碼之間的耦合也更小。

更進一步,在我們的python進程中,甚至都不用再手寫一遍celery instance的創建調用。直接import play.celery 就可以了,這個文件雖然被celery進程用作了配置文件,但這不妨礙我們在自己的進程中也用這個文件。不如說這是更好的一種解決方案。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM