參考:http://blog.csdn.net/Ricky110/article/details/77205291
環境:
centos7 + python3.6.1 + django2.0.1 + celery4.1.0 + redis3.2.10
yum install -y redis
pip3 install redis,celery,django
開始:
創建django工程my_report
創建app celery_test, 如下所示 :

-
INSTALLED_APPS中注冊app_celery
-
setting中celery配置
-
# Celery settings CELERY_BROKER_URL = 'redis://localhost:6379' #: Only add pickle to this list if your broker is secured CELERY_ACCEPT_CONTENT = ['json'] CELERY_RESULT_BACKEND = 'redis://localhost:6379' CELERY_TASK_SERIALIZER = 'json' CELERY_ENABLE_UTC = True CELERY_TIMEZONE = 'Asia/Shanghai'
- setting中mail配置
-
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend' EMAIL_HOST = "smtp.mail.haoyisheng.com" EMAIL_HOST_PASSWORD = '******' DEFAULT_FROM_EMAIL = EMAIL_HOST_USER = "lijianwei@mail.haoyisheng.com" EMAIL_PORT = 25 EMAIL_USE_TLS = True
- app所在目錄添加tasks.py文件(必須是該文件名), 用於處理任務
from __future__ import absolute_import, unicode_literals from celery import shared_task from django.core.mail import send_mail import logging logger = logging.getLogger(__name__) @shared_task def celery_send_email(subject, message, from_email, recipient_list, **kwrags): try: # 使用celery並發處理郵件發送的任務 logger.info("\n開始發送郵件") send_mail(subject, message, from_email, recipient_list, **kwrags) logger.info("郵件發送成功") return 'success!' except Exception as e: logger.error("郵件發送失敗: {}".format(e))
- 配置目錄my_report中添加celery.py文件
-
from __future__ import absolute_import, unicode_literals import os from celery import Celery # 為celery程序設置DJANGO_SETTINGS_MODULE環境變量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_report.settings') app = Celery('celery_test') # 從Django的設置文件中導入CELERY設置 app.config_from_object('django.conf:settings', namespace='CELERY') # 從所有已注冊的app中加載任務模塊 app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
- 配置目錄my_report的__init__.py文件中添加如下
-
from __future__ import absolute_import, unicode_literals # 這將保證celery app總能在django應用啟動時啟動 from .celery import app as celery_app __all__ = ['celery_app']
編寫url映射和視圖
from django.contrib import admin
from django.urls import path
from month_report import views
urlpatterns = [
path('admin/', admin.site.urls),
path('send_email/', views.add_task_to_celery, name='send_email'),
]
編寫views:
from django.http import HttpResponse from celery_test.tasks import celery_send_email
def add_task_to_celery(request):
celery_send_email.delay(u'郵件主題', 'test_mail_message', 'lijianwei@mail.haoyisheng.com',
['lijianwei@mail.haoyisheng.com'])
return HttpResponse('hello world')
- 在manage.py同級目錄執行如下命令, 啟動celery的worker進程(主要用於消費或執行任務)
celery -A my_report worker --loglevel=info
執行成功:

- 從客戶端請求
- http://ip:port/send_email/
- 然后會收到頁面返回hello world, 並且終端顯示時間處理結果為成功

然后驗證郵箱有收到郵件,成功
溫馨提示
- 當前使用方法,如需要在tasks.py中新添加任務,新增后,則需要重啟django, 並且需要從起celery worker進程, worker進程默認不能動態加載事件。
