一:Django與Celery的使用
目錄結構:

在 pro/pro/__init__.py
from __future__ import absolute_import, unicode_literals # 確保應用程序總是能被找到 # Django啟動,shared_task將使用這個應用程序。 from .celery import app as celery_app __all__ = ['celery_app']
pro/pro/settings.py 添加以下
CELERY_BROKER_URL = 'redis://127.0.0.1' # 任務中間人 CELERY_RESULT_BACKEND = 'redis://127.0.0.1' # 結果存儲
pro/pro/urls.py
"""proj URL Configuration The `urlpatterns` list routes URLs to views. For more information please see: https://docs.djangoproject.com/en/2.1/topics/http/urls/ Examples: Function views 1. Add an import: from my_app import views 2. Add a URL to urlpatterns: path('', views.home, name='home') Class-based views 1. Add an import: from other_app.views import Home 2. Add a URL to urlpatterns: path('', Home.as_view(), name='home') Including another URLconf 1. Import the include() function: from django.urls import include, path 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) """ from django.contrib import admin from django.urls import path from celeryapp01 import views urlpatterns = [ path('admin/', admin.site.urls), path('celery_index/', views.celery_index), ]
pro/pro/celery.py
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') # 設置默認環境變量 app = Celery('proj') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # 從所有已注冊的Django應用配置中加載任務模塊。從而使celery 能夠自動發現這些任務 app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
pro/celeryapp01/views.py # views 調用任務
from .tasks import add,mul,xsum from django.http import HttpResponse def celery_index(request): task = add.delay(2,3) # 調用 result = task.get() # 獲取結果 return HttpResponse(result)
pro/celeryapp01/tasks.py # 任務
from __future__ import absolute_import, unicode_literals from django.test import TestCase # Create your tasks here from celery import shared_task @shared_task # 引用當前任務 def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
啟動worker和Django:
celery -A proj worker -l debug
python manage.py runserver
在瀏覽器輸入 http://127.0.0.1:8000/celery_index
二:Django與Celery的定時任務
有一個 Django-celery-beat 擴展,它將調度存儲在Django數據庫中,並提供了一個方便的管理界面來在運行時管理周期性任務
1.安裝:
pip3 install django-celery-beat
2.添加到settings
INSTALLED_APPS = ( ..., 'django_celery_beat', )
3.數據庫遷移同步
python manage.py migrate
# 創建后台管理賬號
python manage.py createsuperuser
4.使用Django調度程序
celery -A proj beat -l info -S django
5.登陸 127.0.0.1:8000/admin

定時任務

參數:


