celery
celery的使用以及在Django中的配置,不詳細介紹,主要記錄在Django中使用的坑點。
坑點
時區問題
celery默認的時區是世界標准時間,比東八區慢了8個小時,如果發布定時任務,一定要注意定時的時間,否則可能用了正確的方法,但是並沒有調用成功
設置celery的時區可以在Django項目的settings.py
中添加一條設置
CELERY_TIMEZONE = 'Asia/Shanghai'
django-celery
可以識別在設置中的時區
也可以在發布定時任務的時候,指定到當前的時區,使用Django自帶的get_current_timezone()
# 將需要設定的時間轉換成當前時區的時間
from django.utils.timezone import get_current_timezone
import datetime
send_time = datetime.datetime.now() + datetime.timedelta(days=1)
tz = get_current_timezone()
send_time = tz.localize(send_time)
在使用異步任務的時候將轉換后的時間傳入到參數里面
celery_task.apply_async(args=[], kwdg={}, eta=send_time)
當然,你也可以使用間隔時間執行異步任務,對應apply_async()
里面的countdown參數
celery_task.apply_async(countdown=seconds)
celery的序列化問題
celery提供了兩個序列化的格式,pickle
和json
,pickle是python一個序列化的庫,可以實現多種格式數據的序列和反序列化,對應pickle和unpickle
設置中可以指定celery接受的數據格式,以及任務和結果的序列化器
# settings.py
# celery允許接收的數據格式,可以是一個字符串,比如'json'
CELERY_ACCEPT_CONTENT = ['pickle', 'json']
# 異步任務的序列化器,也可以是json
CELERY_TASK_SERIALIZER = 'pickle'
# 任務結果的數據格式,也可以是json
CELERY_RESULT_SERIALIZER = 'pickle'
在Django中的使用尤其需要注意,如果你需要向異步任務傳入一個queryset,需要將接收的格式和序列化器設置為'pickle',即如上設置
不建議將ORM對象傳給celery的異步任務,拿到的可能是過期數據,建議傳遞id
結果
如果不需要講異步任務執行的結果進行處理,即異步任務的執行結果和業務邏輯關系不大,建議不存儲celery異步任務的結果。
如果保留結果,celery將會為任務結果建立一個隊列,並且一直等到異步任務給出結果才會將任務從隊列中刪除,創建和管理任務的開銷很大,可以在這篇博客中看到:https://www.cnblogs.com/blaketairan/p/7136897.html
在Django的settings中設置忽略celery任務執行結果
CELERY_IGNORE_RESULT = True
使用不同的queue
如果任務A比任務B更重要,而任務B的量非常大,重要的任務A就需要不斷等待任務B完成后才能繼續進行,這時候,可以使用不同的queue來保存任務,讓不同的worker來執行兩種任務
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'),
Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'),
)
然后自定義router來執行不同的任務
CELERY_ROUTES = {
'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'},
'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'},
}
然后在啟動celery時,指定不同的worker
celery worker -E -l INFO -n workerA -Q for_task_A celery worker -E -l INFO -n workerB -Q for_task_B