celery beat是用來開啟定時任務調度的,一般用法為:啟動celery beat,然后啟動worker,讓beat去調用worker里面的任務
一般我們在代碼里面通過model層的插入直接就可以新建定時任務
schedule,created = IntervalSchedule.objects.get_or_create( every = 10, period = IntervalSchedule.SECONDS ) PeriodicTask.objects.create( interval = schedule, name = random.random(), task = "adv_celery.tasks.task1.tasks.test", #添加參數 args = json.dumps(["hello "]), kwargs = json.dumps({}), #expires = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) )
由於我們使用的數據庫插入模式,記得配置
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
然后我們在adv_celery.tasks.task1.tasks這個文件里面建立一個超級簡單的test任務,對應它的路徑
task = "adv_celery.tasks.task1.tasks.test",
這個任務這樣就可以
@shared_task def test(arg): print("312")
然后開啟celerybeat,
celery -A 你的應用 beat
再開啟worker
celery -A 你的應用 worker
為了防止爆發
MySQL backend does not support timezone-ae datetimes when USE_TZ is False.
問題,在django的setting.py設置
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False
在任務中獲取異步任務id
@task
def do_job(path): cache.set(do_job.request.id, operation_results)
在執行異步任務時即拿到任務
task = task_addnums.delay()
task_id = task.id
根據id獲取任務結果
from celery.result import AsyncResult res=AsyncResult("62051878-ca77-4895-a61f-6f9525681347") # 參數為task id res.result
啟動以后
redis打印的日志
[6772] 28 Oct 09:37:08.306 * Background saving terminated with success [6772] 28 Oct 09:42:09.064 * 100 changes in 300 seconds. Saving... [6772] 28 Oct 09:42:09.065 * Background saving started by pid 12368