說點什么:
整理下工作中配置celery的一些實踐,寫在這里,一方面是備忘,另外一方面是整理成文檔給其他同事使用。
演示用的項目,同時也發布在Github上: https://github.com/blackmatrix7/celery-demo
這份筆記會隨着經驗的積累,逐步調整完善,不過通常情況下,Github上的更新會比較快。
Celery 配置實踐筆記
Celery 配置實踐筆記,目前已記錄:
- 異步執行任務
- 為不同任務分配不同的隊列
- 計划任務
需要補充:
- 為不同的任務配置不同的優先級
- Celey任務的返回結果
創建Celery
配置Celery參數
在創建celery實例之前,需要對celery的參數進行一些配置。
在這里列出一些比較常用的Celery配置項:
| 配置項名稱 | 說明 |
|---|---|
| CELERY_DEFAULT_QUEUE | 默認的隊列名稱,當沒有為task特別指定隊列時,采用此隊列 |
| CELERY_BROKER_URL | 消息代理,用於發布者傳遞消息給消費者,推薦RabbitMQ |
| CELERY_RESULT_BACKEND | 后端,用於存儲任務執行結果,推薦redis |
| CELERY_TASK_SERIALIZER | 任務的序列化方式 |
| CELERY_RESULT_SERIALIZER | 任務執行結果的序列化方式 |
| CELERY_ACCEPT_CONTENT | |
| CELERYD_CONCURRENCY | 任務消費者的並發數 |
| CELERY_TIMEZONE | 時區設置,計划任務需要,推薦 Asia/Shanghai |
| CELERY_QUEUES | Celery隊列設定 |
| CELERY_ROUTES | Celery路由設定,用於給不同的任務指派不同的隊列 |
| CELERYBEAT_SCHEDULE | Celery計划任務設定 |
為了使用方便,在這里引入了之前寫的config模塊(支持本地配置不提交到github),本身是非必須的,用dict存儲celery配置一樣可以。所以在下面的示例代碼中,會以注釋的形式,演示不使用config模塊實現celery配置。
# 等價於直接定義一個broker,如
# broker = 'amqp://user:password@127.0.0.1:5672//'
broker = current_config.CELERY_BROKER_URL
# 將config模塊的配置轉換成dict,等價於直接通過dict定義一組celery配置
celery_conf = {k: v for k, v in current_config.items()}
創建celery實例
將之前創建的broker和celery_conf傳遞給celery,用於創建celery實例。
manage.py
# 創建celery實例
celery = Celery('demo', broker=broker)
# 載入celery配置
celery.config_from_object(celery_conf)
定義模擬函數
在項目的handlers下,定義一些用來異步任務或計划任務的調用的函數。
這里定義了兩個py文件,分別是async_tasks.py和schedules.py,前者用於異步任務,后者用於計划任務。
異步任務下有兩個函數,模擬發送郵件和推送消息;計划任務下有數個根據計划任務執行事件定義的模擬函數。
創建celery worker
推薦以celery.start的方式來啟動celery
啟動參數
在創建完celery實例后,調用celery實例的start方法,來啟動celery。
在demo中,在manage.py中創建celery實例。
celery = Celery('demo', broker=broker)
celery.config_from_object(celery_conf)
# argv中傳入啟動celery所需的參數
celery.start(argv=['celery', 'worker', '-l', 'info', '-f', 'logs/celery.log'])
第一個參數是固定的,用於啟動celery
第二個參數是啟動的celery組件,這里啟動的是worker,用於執行任務
第三個參數和第四個參數為一組,指定日志的級別,這里記錄級別為info的日子
第五個參數和第六個參數為一組,指定日志文件的位置,這里將日志記錄在log/celery.log
指定消費的隊列
在啟動命令中,增加一個-Q的參數,用於指定消費的隊列。
celery.start(argv=['celery', 'worker', '-Q', 'message_queue', '-l', 'info', '-f', 'logs/message.log'])
上述的參數中,'-Q', 'message_queue'兩個參數,是指定這個worker消費名為“message_queue”的隊列。
創建異步任務
對於需要在celery中異步執行的函數,只需要在函數上增加一個裝飾器。
# 導入manage.py中創建的celery實例
from manage import celery
# 需要在celery中異步執行的函數,增加celery.task裝飾器
@celery.task
def async_send_email(send_from, send_to, subject, content):
"""
忽略方法實現
"""
pass
異步執行任務
對於以celery.task裝飾的函數,可以同步運行,也可以異步運行。
接上一個例子的async_send_email函數。
同步執行時,直接調用這個函數即可。
async_push_message(send_to=['張三', '李四'], content='老板喊你來背鍋了')
async_push_message(send_to=['劉五', '趙六'], content='老板喊你來領獎金了')
需要異步執行時,調用函數的delay()方法執行,此時會將任務委托給celery后台的worker執行。
async_push_message.delay(send_to=['張三', '李四'], content='老板喊你來背鍋了')
async_push_message.delay(send_to=['劉五', '趙六'], content='老板喊你來領獎金了')
為任務指定不同隊列
其實在之前配置celery參數時,就已經實現為不同任務指定不同隊列的目標。
在這里詳細說明下配置項:
# 導入Queue
from kombu import Queue
# 導入Task所在的模塊,所有使用celery.task裝飾器裝飾過的函數,所需要把所在的模塊導入
# 我們之前創建的幾個測試用函數,都在handlers.async_tasks和handlers.schedules中
# 所以在這里需要導入這兩個模塊,以str表示模塊的位置,模塊組成tuple后賦值給CELERY_IMPORTS
# 這樣Celery在啟動時,會自動找到這些模塊,並導入模塊內的task
CELERY_IMPORTS = ('handlers.async_tasks', 'handlers.schedules')
# 為Celery設定多個隊列,CELERY_QUEUES是個tuple,每個tuple的元素都是由一個Queue的實例組成
# 創建Queue的實例時,傳入name和routing_key,name即隊列名稱
CELERY_QUEUES = (
Queue(name='email_queue', routing_key='email_router'),
Queue(name='message_queue', routing_key='message_router'),
Queue(name='schedules_queue', routing_key='schedules_router'),
)
# 最后,為不同的task指派不同的隊列
# 將所有的task組成dict,key為task的名稱,即task所在的模塊,及函數名
# 如async_send_email所在的模塊為handlers.async_tasks
# 那么task名稱就是handlers.async_tasks.async_send_email
# 每個task的value值也是為dict,設定需要指派的隊列name,及對應的routing_key
# 這里的name和routing_key需要和CELERY_QUEUES設定的完全一致
CELERY_ROUTES = {
'handlers.async_tasks.async_send_email': {
'queue': 'email_queue',
'routing_key': 'vcan.email_router',
},
'handlers.async_tasks.async_push_message': {
'queue': 'message_queue',
'routing_key': 'message_router',
},
'handlers.schedules.every_30_seconds': {
'queue': 'schedules_queue',
'routing_key': 'schedules_router',
}
}
配置完成后,不同的task會根據CELERY_ROUTES的設置,指派到不同的消息隊列。
計划任務
celery除用於異步執行任務外,還可以用於執行計划任務。
設定celery時區
在開始配置計划任務之前對配置文件中設定celery時區。
# 設定 Celery 時區
CELERY_TIMEZONE = 'Asia/Shanghai'
定義計划任務
在配置文件中定義計划任務
計划任務也是異步執行任務的一種方式,所以也需要參考之前的說明,為計划任務分配不同的隊列。
在配置文件中,新增一項CELERYBEAT_SCHEDULE的配置
from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
# 給計划任務取一個獨一無二的名字吧
'every-30-seconds': {
# task就是需要執行計划任務的函數
'task': 'handlers.schedules.every_30_seconds',
# 配置計划任務的執行時間,這里是每30秒執行一次
'schedule': timedelta(seconds=30),
# 傳入給計划任務函數的參數
'args': {'value': '2333333'}
}
}
CELERYBEAT_SCHEDULE是一個dict,每個key為計划任務的名稱,value也是dict,包含task、schedule、args。
task即需要執行計划任務的函數,這里是handlers.schedules.every_30_seconds,即handlers的schedules的every_30_seconds函數,如需配置其他的函數,依照此規則定義即可。
args是傳遞給計划任務函數的參數,在這個例子中,即傳遞給every_30_seconds的參數,如果無需參數,則args配置為None。
schedule即配置計划任務的執行時間,例子中使用的是timedelta實例,用於實現固定間隔某些時間執行。除此之外,還可以設定某個時間點執行,或重復某個時間點執行。這個就需要用到celery的crontab類。
'push_occupancy_rates': {
'task': 'handlers.schedules.test_func_b',
# 每天中午12點執行
'schedule': crontab(hour='12', minute='0'),
'args': None
},
關於crontab更詳細的配置方式,可以參考官方手冊:
| Example | Meaning |
|---|---|
| crontab() | Execute every minute. |
| crontab(minute=0, hour=0) | Execute daily at midnight. |
| crontab(minute=0, hour='*/3') | Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm. |
| crontab(minute=0, hour='0,3,6,9,12,15,18,21') | Same as previous. |
| crontab(minute='*/15') | Execute every 15 minutes. |
| crontab(day_of_week='sunday') | Execute every minute (!) at Sundays. |
| crontab(minute='', hour='', day_of_week='sun') | Same as previous. |
| crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri') | Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays. |
| crontab(minute=0,hour='/2,/3') | Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
| crontab(minute=0, hour='*/5') | Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5). |
| crontab(minute=0,hour='*/3,8-17') | Execute every hour divisible by 3, and every hour during office hours (8am-5pm). |
| crontab(0, 0,day_of_month='2') | Execute on the second day of every month. |
| crontab(0, 0, day_of_month='2-30/3') | Execute on every even numbered day. |
| crontab(0, 0, day_of_month='1-7,15-21') | Execute on the first and third weeks of the month. |
| crontab(0, 0,day_of_month='11', month_of_year='5') | Execute on the eleventh of May every year. |
| crontab(0, 0, month_of_year='*/3') | Execute on the first month of every quarter. |
