django+xadmin+djcelery實現后台管理定時任務


繼上一篇中間表的數據是動態的,圖表展示的數據才比較准確。這里用到一個新的模塊Djcelery,安裝配置步驟如下:

1.安裝

  redis==2.10.6

  celery==3.1.23

  django-celery==3.1.17

  flower==0.9.2

  supervisor==3.3.4

 flower用於監控定時任務,supervisor管理進程,可選

2.配置

settings.py中添加以下幾行:

#最頂頭加上
from __future__ import absolute_import

# celery settings
import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://localhost:6379'
# BROKER_URL = 'redis://:密碼@主機地址:端口號/數據庫號'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定時任務
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERYD_MAX_TASKS_PER_CHILD = 40
CELERY_TIMEZONE = 'Asia/Shanghai'

INSTALLED_APPS = [
  'djcelery',# 添加djcelery
]

 3.注冊定時任務的幾個表

from __future__ import absolute_import, unicode_literals
from djcelery.models import (
    TaskState, WorkerState,
    PeriodicTask, IntervalSchedule, CrontabSchedule,
)
from xadmin.sites import site
site.register(IntervalSchedule) # 存儲循環任務設置的時間
site.register(CrontabSchedule) # 存儲定時任務設置的時間
site.register(PeriodicTask) # 存儲任務
site.register(TaskState) # 存儲任務執行狀態
site.register(WorkerState) # 存儲執行任務的worker

4.主應用下添加celery.py,__init__.py修改如下:

# __init__.py
from __future__ import absolute_import
from .celery import app as celery_app


# celery.py
from __future__ import absolute_import

import os
from celery import Celery, platforms
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'hermes.settings')

# hermes主應用名
app = Celery('hermes')
platforms.C_FORCE_ROOT = True

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

5.添加任務 應用下添加tasks.py

from __future__ import absolute_import

from celery import task
import time

from .channels import Cache_data_to_redis

# 更新指定日期數據到sms_organizationcount
@task
def readAndWrite(begin,end):
    begin = str(begin)[:4] + '-' + str(begin)[4:6] + '-' + str(begin)[6:8]
    end = str(end)[:4] + '-' + str(end)[4:6] + '-' + str(end)[6:8]
    i = 0
    begin_time = time.time()
    read = Cache_data_to_redis().connection
    Rcursor = read.cursor()
    query = "SELECT id from sms_organizationcount WHERE alia_date_time between '"
    query += begin
    query += "' and '"
    query += end
    query += "'"
    readSql = "SELECT alia_month_time, alia_date_time, count(*) as total_nums, count(t.`status`=2 or null) as error_nums, name FROM \
              (select *, DATE_FORMAT(req_time,'%Y-%m') as alia_month_time, DATE_FORMAT(req_time,'%Y-%m-%d') as alia_date_time, \
              LEFT(body,LOCATE('】',body)) as name from sms_smslog where LOCATE('】',body) >0 \
              and LEFT(body,1)='【' and DATE_FORMAT(req_time,'%Y-%m-%d') between '"
    readSql += begin
    readSql += "' and '"
    readSql += end
    readSql += "')"
    readSql += " as t GROUP BY alia_date_time , name;"
    Rcursor.execute(readSql)
    readResult = Rcursor.fetchall()
    Rcursor.execute(query)
    query_result = Rcursor.fetchall()
    deleteSql = "delete from sms_organizationcount where alia_date_time between '%s' and '%s'" % (begin,end)
    if query_result:
        delete_record = Cache_data_to_redis().connection
        Dcursor = delete_record.cursor()
        Dcursor.execute(deleteSql)
        delete_record.commit()
        delete_record.close()
       for value in readResult:
        write = Cache_data_to_redis().connection
        Wcursor = write.cursor()
        writeSql = "INSERT into sms_organizationcount (alia_month_time, alia_date_time, total_nums, error_nums, `name`) " \
                   " VALUES ('%s', '%s', '%s', '%s', '%s' )" %\
                   (value['alia_month_time'], value['alia_date_time'], value['total_nums'], value['error_nums'], value['name'])
        try:
            Wcursor.execute(writeSql)
            i += 1
            write.commit()
        except:
            write.rollback()
        write.close()
    read.close()
    end_time = time.time()
    pass_time = end_time - begin_time
    return i, pass_time

6.最終效果如下圖:

7.終端啟動celery命令:

# 查看注冊的task
celery -A hermes inspect registered
# 啟動
python  manage.py celery -A django_celery_demo  worker -B    # django_celery_demo為celery和setting所在文件夾名

#celery_beat起不來
# 動態的輸出啟動進程時的輸出
supervisorctl tail programname stdout


# flower監控celery
python manage.py celery flower
ip:5555

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM