django 異步任務實現及Celery beat實現定時/輪詢任務


Celery定時任務

requirements

celery==3.1.25  異步任務
django-celery==3.2.2  定時任務管理包
redis==2.10.6
django-redis-cache==1.7.1 方便配置Redis緩存

 

配置

1、工程主APP下的__init__.py文件里添加:

from .celery import app as celery_app

__all__ = ['celery_app']

2、工程主APP新建個celery.py文件:

from __future__ import absolute_import
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dts.settings')

from django.conf import settings  # noqa

app = Celery('dts')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

  

 

啟用Celery的定時任務需要設置CELERYBEAT_SCHEDULE 。 
這里寫圖片描述 

說明:

CELERY_RESULT_SERIALIZER = 'json'為了避免錯誤:# Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)
CELERY_RESULT_BACKEND: 就是把異步任務放到指定地方,方便后續操作。比如后續取出任務判斷是否完成

Celery的定時任務都由celery beat來進行調度。celery beat默認按照settings.py之中的時區時間來調度定時任務。

創建定時任務

一種創建定時任務的方式是配置CELERYBEAT_SCHEDULE:

#每30秒調用task.add
from datetime import timedelta
from celery.schedules import crontab #為了避免該行報錯,在該文件添加絕對包含、from __future__ import absolute_import
CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'tasks.add', 'schedule': timedelta(seconds=30), 'args': (16, 16) },
    'sys_check_unpaid': {
'task': 'dashboard.tasks.mdata_schedule',
'schedule': crontab(minute=0, hour=8, day_of_month=11),
},
}
 1 #crontab任務
 2 #每周一7:30調用task.add
 3 from celery.schedules import crontab
 4 
 5 CELERYBEAT_SCHEDULE = {
 6     # Executes every Monday morning at 7:30 A.M
 7     'add-every-monday-morning': {
 8         'task': 'tasks.add',
 9         'schedule': crontab(hour=7, minute=30, day_of_week=1),
10         'args': (16, 16),
11     },
12 }

 

使用數據庫存儲定時任務

使用數據庫存儲定時任務需要設置CELERYBEAT_SCHEDULE如下:

這里寫圖片描述

 1 import datetime
 2 import json
 3 from djcelery import models as celery_models
 4 from django.utils import timezone
 5 #創建任務
 6 def create_task(name, task, task_args, crontab_time):
 7     '''
 8     name # 任務名字
 9     task # 執行的任務 "myapp.tasks.add"
10     task_args # 任務參數 {"x":1, "Y":1}
11 
12     crontab_time # 定時任務時間 格式:
13     {
14         'month_of_year': 9 # 月份
15         'day_of_month': 5 # 日期
16         'hour': 01 # 小時
17         'minute':05 # 分鍾
18     }
19     '''
20 
21     # task任務, created是否定時創建
22     task, created = celery_models.PeriodicTask.objects.
23                             get_or_create(name=name,task=task)
24     # 獲取 crontab
25     crontab = celery_models.CrontabSchedule.objects.
26                             filter(**crontab_time).first()
27     if crontab is None:
28     # 如果沒有就創建,有的話就繼續復用之前的crontab
29         crontab = celery_models.CrontabSchedule.objects.
30                             create(**crontab_time)
31     task.crontab = crontab # 設置crontab
32     task.enabled = True # 開啟task
33     task.kwargs = json.dumps(task_args) # 傳入task參數
34     expiration = timezone.now() + datetime.timedelta(day=1)
35     task.expires = expiration # 設置任務過期時間為現在時間的一天以后
36     task.save()
37     return True
38 #關閉任務
39 def disable_task(name):
40 '''
41 關閉任務
42 '''
43     try:
44         task = celery_models.PeriodicTask.objects.get(name=name)
45         task.enabled = False # 設置關閉
46         task.save()
47         return True
48     except celery_models.PeriodicTask.DoesNotExist:
49         return True

 

啟動beat

執行定時任務時, Celery會通過celery beat進程來完成。Celery beat會保持運行, 一旦到了某一定時任務需要執行時, Celery beat便將其加入到queue中. 不像worker進程, Celery beat只需要一個即可。而且為了避免有重復的任務被發送出去,所以Celery beat僅能有一個。

啟動:

python manage.py celery beat --loglevel=info

其實還有一種簡單的啟動方式worker和beat一起啟動:

python manage.py celery worker --loglevel=info --beat

定時刪除

由於很多任務都是一次執行完就不需要,留在數據庫里就是垃圾數據了有沒有辦法清除。方法肯定有因為django-celery本身就有定時任務功能我們加個任務就解決了。好我們看代碼:在django app目錄中打開taske.py加入如下代碼

from djcelery import models as celery_models
from django.utils import timezone
@task()
def delete():
    '''
    刪除任務
    從models中過濾出過期時間小於現在的時間然后刪除
    '''
    return celery_models.PeriodicTask.objects.filter(
                            expires__lt=timezone.now()).delete()

 

創建任務腳本里設置了 expires 1天以后過期,這樣在filter的時候就能當做條件把過期的任務找到並且刪除。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM