為什么要使用celery
Celery是一個使用Python開發的分布式任務調度模塊,因此對於大量使用Python構建的系統,可以說是無縫銜接,使用起來很方便。Celery專注於實時處理任務,同時也支持任務的定時調度。因此適合實時異步任務定時任務等調度場景。Celery需要依靠RabbitMQ等作為消息代理,同時也支持Redis甚至是Mysql,Mongo等,當然,官方默認推薦的是RabbitMQ。
broker的選擇
雖然官方支持的broker有很多,包括RabbitMQ,Redis甚至是數據庫,但是不推薦使用數據庫,因為數據庫需要不斷訪問磁盤,當你的任務量大了之后會造成很嚴重的性能問題,同時你的應用很可能也在使用同一個數據庫,這樣可能導致你的應用被拖垮。如果業務環境比較簡單可以選擇Redis,如果比較復雜選擇RabbitMQ,因為RabbitMQ是官方推薦的,但是比Redis操作起來又相對復雜些。我的選擇是broker用RabbitMQ,backend用Redis
celery不能用root用戶啟動問題 C_FORCE_ROOT environment
如果使用root用戶啟動celery會遇到下面的問題
Running a worker with superuser privileges when the worker accepts messages serialized with pickle is a very bad idea! If you really want to continue then you have to set the C_FORCE_ROOT environment variable (but please think about this before you do).
解決辦法:
from celery import Celery, platforms platforms.C_FORCE_ROOT = True #加上這一行
任務重復執行
celery執行定時任務的時候遇到了重復執行的問題,當時是用redis做broker和backend。
官方文檔中有相關描述。
If a task is not acknowledged within the Visibility Timeout the task will
be redelivered to another worker and executed.
This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.
So you have to increase the visibility timeout to match the time of the longest ETA you are planning to use.
Note that Celery will redeliver messages at worker shutdown, so having a long visibility timeout will only delay the redelivery of ‘lost’ tasks in the event of a power failure or forcefully terminated workers.
Periodic tasks will not be affected by the visibility timeout, as this is a concept separate from ETA/countdown.
You can increase this timeout by configuring a transport option with the same name:
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}
The value must be an int describing the number of seconds.
就是說當我們設置一個ETA時間比visibility_timeout長的任務時,每過一次 visibility_timeout 時間,celery就會認為這個任務沒被worker執行成功,重新分配給其它worker再執行。
解決辦法就是把 visibility_timeout參數調大,比我們ETA的時間差要大。celery本身的定位就主要是實時的異步隊列,對於這種長時間定時執行,支持不太好。
但是第二天依然重復執行了。。。
最后我的解決方法是在每次定時任務執行完就在redis中寫入一個唯一的key對應一個時間戳,當下次任務執行前去獲取redis中的這個key對應的value值,和當前的時間做比較,當滿足我們的定時頻率要求時才執行,這樣保證了同一個任務在規定的時間內只會執行一次。
使用不同的queue
當你有很多任務需要執行的時候,不要偷懶只使用默認的queue,這樣會相互影響,並且拖慢任務執行的,導致重要的任務不能被快速的執行。雞蛋不能放在同一個籃子里的道理大家都懂。
有一種簡單的方式設置queue
Automatic routing
The simplest way to do routing is to use the CELERY_CREATE_MISSING_QUEUES setting (on by default).
With this setting on, a named queue that is not already defined in CELERY_QUEUES will be created automatically. This makes it easy to perform simple routing tasks.
Say you have two servers, x, and y that handles regular tasks, and one server z, that only handles feed related tasks. You can use this configuration:
CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}}
With this route enabled import feed tasks will be routed to the “feeds” queue, while all other tasks will be routed to the default queue (named “celery” for historical reasons).
Now you can start server z to only process the feeds queue like this:
user@z:/$ celery -A proj worker -Q feeds
You can specify as many queues as you want, so you can make this server process the default queue as well:
user@z:/$ celery -A proj worker -Q feeds,celery
直接使用
CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}} user@z:/$ celery -A proj worker -Q feeds,celery
指定routes,就會自動生成對應的queue,然后使用-Q指定queue啟動celery就可以,默認的queue名字是celery。可以看官方文檔對默認queue的名字進行修改。
啟動多個workers執行不同的任務
在同一台機器上,對於優先級不同的任務最好啟動不同的worker去執行,比如把實時任務和定時任務分開,把執行頻率高的任務和執行頻率低的任務分開,這樣有利於保證高優先級的任務可以得到更多的系統資源,同時高頻率的實時任務日志比較多也會影響實時任務的日志查看,分開就可以記錄到不同的日志文件,方便查看。
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2.%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3.%h
可以像這樣啟動不同的worker,%h可以指定hostname,詳細說明可以查看官方文檔
高優先級的任務可以分配更多的concurrency,但是並不是worker並法數越多越好,保證任務不堆積就好。
是否需要關注任務執行狀態
這個要視具體的業務場景來看,如果對結果不關心,或者任務的執行本身會對數據產生影響,通過對數據的判斷可以知道執行的結果那就不需要返回celery任務的退出狀態,可以設置
CELERY_IGNORE_RESULT = True
或者
@app.task(ignore_result=True) def mytask(…): something()
但是,如果業務需要根據任務執行的狀態進行響應的處理就不要這樣設置。
內存泄漏
長時間運行Celery有可能發生內存泄露,可以像下面這樣設置
CELERYD_MAX_TASKS_PER_CHILD = 40 # 每個worker執行了多少任務就會死掉