設置時區
CELERY_TIMEZONE = 'Asia/Shanghai'
啟動時區設置
CELERY_ENABLE_UTC = True
限制任務的執行頻率
下面這個就是限制tasks模塊下的add函數,每秒鍾只能執行10次
CELERY_ANNOTATIONS = {'tasks.add':{'rate_limit':'10/s'}}
或者限制所有的任務的刷新頻率
CELERY_ANNOTATIONS = {'*':{'rate_limit':'10/s'}}
也可以設置如果任務執行失敗后調用的函數
def my_on_failure(self,exc,task_id,args,kwargs,einfo): print('task failed') CELERY_ANNOTATIONS = {'*':{'on_failure':my_on_failure}}
並發的worker數量,也是命令行-c指定的數目
事實上並不是worker數量越多越好,保證任務不堆積,加上一些新增任務的預留就可以了
CELERYD_CONCURRENCY = 20
celery worker每次去redis取任務的數量,默認值就是4
CELERYD_PREFETCH_MULTIPLIER = 4
每個worker執行了多少次任務后就會死掉,建議數量大一些
CELERYD_MAX_TASKS_PER_CHILD = 200
使用redis作為任務隊列
組成: db+scheme://user:password@host:port/dbname
BROKER_URL = 'redis://127.0.0.1:6379/0'
celery任務執行結果的超時時間
CELERY_TASK_RESULT_EXPIRES = 1200
單個任務的運行時間限制,否則會被殺死
CELERYD_TASK_TIME_LIMIT = 60
使用redis存儲任務執行結果,默認不使用
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
將任務結果使用'pickle'序列化成'json'格式
任務序列化方式
CELERY_TASK_SERIALIZER = 'pickle'
任務執行結果序列化方式
CELERY_RESULT_SERIALIZER = 'json'
也可以直接在Celery對象中設置序列化方式
app = Celery('tasks', broker='...', task_serializer='yaml')
關閉限速
CELERY_DISABLE_RATE_LIMITS = True
一份比較常用的配置文件:
在celery4.x以后,就是BROKER_URL,如果是以前,需要寫成CELERY_BROKER_URL
BROKER_URL = 'redis://127.0.0.1:6379/0'
指定結果的接收地址
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
指定任務序列化方式
CELERY_TASK_SERIALIZER = 'msgpack'
指定結果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
指定任務接受的序列化類型.
CELERY_ACCEPT_CONTENT = ['msgpack']
任務過期時間,celery任務執行結果的超時時間
CELERY_TASK_RESULT_EXPIRES = 24 * 60 * 60
任務發送完成是否需要確認,對性能會稍有影響
CELERY_ACKS_LATE = True
壓縮方案選擇,可以是zlib, bzip2,默認是發送沒有壓縮的數據
CELERY_MESSAGE_COMPRESSION = 'zlib'
規定完成任務的時間
在5s內完成任務,否則執行該任務的worker將被殺死,任務移交給父進程
CELERYD_TASK_TIME_LIMIT = 5
celery worker的並發數,默認是服務器的內核數目,也是命令行-c參數指定的數目
CELERYD_CONCURRENCY = 4
celery worker 每次去BROKER中預取任務的數量
CELERYD_PREFETCH_MULTIPLIER = 4
每個worker執行了多少任務就會死掉,默認是無限的
CELERYD_MAX_TASKS_PER_CHILD = 40
設置默認的隊列名稱,如果一個消息不符合其他的隊列就會放在默認隊列里面,如果什么都不設置的話,數據都會發送到默認的隊列中
CELERY_DEFAULT_QUEUE = "default"
隊列的詳細設置
CELERY_QUEUES = {
"default": { # 這是上面指定的默認隊列 "exchange": "default", "exchange_type": "direct", "routing_key": "default" }, "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列 "routing_key": "topic.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "task_eeg": { # 設置扇形交換機 "exchange": "tasks", "exchange_type": "fanout", "binding_key": "tasks", },
或者配置成下面兩種方式:
# 配置隊列(settings.py) CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('for_task_collect', Exchange('for_task_collect'), routing_key='for_task_collect'), Queue('for_task_compute', Exchange('for_task_compute'), routing_key='for_task_compute'), )
# 路由(哪個任務放入哪個隊列) CELERY_ROUTES = { 'umonitor.tasks.multiple_thread_metric_collector': { 'queue': 'for_task_collect', 'routing_key': 'for_task_collect' }, 'compute.tasks.multiple_thread_metric_aggregate': { 'queue': 'for_task_compute', 'routing_key': 'for_task_compute' }, 'compute.tasks.test': { 'queue': 'for_task_compute', 'routing_key': 'for_task_compute' }, }