一:Django与Celery的使用
目录结构:
在 pro/pro/__init__.py
from __future__ import absolute_import, unicode_literals # 确保应用程序总是能被找到 # Django启动,shared_task将使用这个应用程序。 from .celery import app as celery_app __all__ = ['celery_app']
pro/pro/settings.py 添加以下
CELERY_BROKER_URL = 'redis://127.0.0.1' # 任务中间人 CELERY_RESULT_BACKEND = 'redis://127.0.0.1' # 结果存储
pro/pro/urls.py
"""proj URL Configuration The `urlpatterns` list routes URLs to views. For more information please see: https://docs.djangoproject.com/en/2.1/topics/http/urls/ Examples: Function views 1. Add an import: from my_app import views 2. Add a URL to urlpatterns: path('', views.home, name='home') Class-based views 1. Add an import: from other_app.views import Home 2. Add a URL to urlpatterns: path('', Home.as_view(), name='home') Including another URLconf 1. Import the include() function: from django.urls import include, path 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) """ from django.contrib import admin from django.urls import path from celeryapp01 import views urlpatterns = [ path('admin/', admin.site.urls), path('celery_index/', views.celery_index), ]
pro/pro/celery.py
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') # 设置默认环境变量 app = Celery('proj') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # 从所有已注册的Django应用配置中加载任务模块。从而使celery 能够自动发现这些任务 app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
pro/celeryapp01/views.py # views 调用任务
from .tasks import add,mul,xsum from django.http import HttpResponse def celery_index(request): task = add.delay(2,3) # 调用 result = task.get() # 获取结果 return HttpResponse(result)
pro/celeryapp01/tasks.py # 任务
from __future__ import absolute_import, unicode_literals from django.test import TestCase # Create your tasks here from celery import shared_task @shared_task # 引用当前任务 def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
启动worker和Django:
celery -A proj worker -l debug
python manage.py runserver
在浏览器输入 http://127.0.0.1:8000/celery_index
二:Django与Celery的定时任务
有一个 Django-celery-beat 扩展,它将调度存储在Django数据库中,并提供了一个方便的管理界面来在运行时管理周期性任务
1.安装:
pip3 install django-celery-beat
2.添加到settings
INSTALLED_APPS = ( ..., 'django_celery_beat', )
3.数据库迁移同步
python manage.py migrate
# 创建后台管理账号
python manage.py createsuperuser
4.使用Django调度程序
celery -A proj beat -l info -S django
5.登陆 127.0.0.1:8000/admin
定时任务
参数: