引言
前面已經學習了celery+redis的異步和定時任務,下面介紹如何結合django來使用。
環境配置
在動手之前,一定要准備好的是環境,celery版本有很多,在使用過程中如何版本與django和redis版本不配套,將會很麻煩。
我這里的環境如下:
celery==4.3.0 Django==2.2.2 django-celery-beat==1.5.0 django-celery-results==1.1.2 kombu==4.6.11 -- celery的依賴 PyMySQL==0.9.3 redis==3.2.1 python-crontab==2.5.1
創建項目
再復習一下創建django項目的命令,打開cmd窗口,輸入:
django-admin startproject 項目名
進入剛剛創建的項目根目錄下,創建應用(app),輸入:
python manage.py startapp 應用名

在應用celerytest根目錄下新建tasks.py文件,用於定義計划任務,注意此處只能以tasks命名(設計如此)
在django的項目目錄(djangocelerydemo)中創建celery.py(與settings.py在同一級目錄)文件,當然你也可以命名成celeryconfig.py文件,
這個文件沒有要求,為啥要創建這個文件呢?
因為,要將Celery與Django項目一起使用,必須首先定義Celery庫的實例,也就是創建celery的應用。文件放在此處,這種設置方法可以讓celery自動在所有app中查找tasks文件,比較適合多人多APP同時開發的中大型項目 詳情參考:Using Celery with Django
項目結構與配置

第一步,在djangocelerydemo/setting.py文件配置如下:
# APP配置
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_celery_beat',
'django_celery_results', # 查看 celery 執行結果
'celerytest.apps.CelerytestConfig',
]
# 數據庫配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'djangocelerydemo',
'HOST': '127.0.0.1',
'PORT': '3306',
'USER': 'root',
'PASSWORD': '123456',
}
}
# 時區與celery相關配置
LANGUAGE_CODE = 'zh-hans'
# 簡體中文界面
TIME_ZONE = 'Asia/Shanghai'
# 亞洲/上海時區
USE_I18N = True
USE_L10N = True
USE_TZ = False
# 不使用國際標准時間
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/2.2/howto/static-files/
STATIC_URL = '/static/'
# django-celery-beat
# django-celery-results
CELERY_ENABLE_UTC = False
# 不使用國際標准時間
CELERY_TIMEZONE = 'Asia/Shanghai'
# 使用亞洲/上海時區
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 解決時區問題
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
# 使用0號數據庫
CELERY_BROKER_TRANSPORT = 'redis'
# 使用redis作為中間件
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 自定義調度類,使用Django的ORM
CELERY_RESULT_BACKEND = 'django-db'
# 任務結果,使用Django的ORM
CELERY_ACCEPT_CONTENT = ['application/json']
# 設置任務接收的序列化類型
CELERY_TASK_SERIALIZER = 'json'
# 設置任務序列化方式
CELERY_RESULT_SERIALIZER = 'json'
# 設置結果序列化方式
注意,如何你的函數返回的不是json, 將報錯:
kombu.exceptions.EncodeError: Object of type 'set' is not JSON serializable
解決:
CELERY_TASK_SERIALIZER = 'pickle' # 設置任務序列化方式 CELERY_RESULT_SERIALIZER = 'pickle' # 設置結果序列化方式 CELERY_ACCEPT_CONTENT = ['pickle', 'json'] # 設置任務接收的序列化類型
將之前setting中三個替換成這三個即可。原因:celery4版本的 默認使用 JSON 作為 serializer ,而 celery3版本的默認使用 pickle。所以為了讓報錯消除,需要添加以上設置。
第二步,在djangocelerydemo/celeryconfig.py文件配置如下:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms
from django.utils.datetime_safe import datetime
# 獲取當前文件夾名,即為該 Django 的項目名
project_name = os.path.split(os.path.abspath('.'))[-1]
project_settings = '%s.settings' % project_name
print(project_settings)
# 設置默認celery命令行的環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangocelerydemo.settings')
# 實例化 Celery,項目名稱
app = Celery('djangocelerydemo')
# 解決時區問題
app.now = datetime.now
# 使用 django 的 settings 文件配置 celery
app.config_from_object('django.conf:settings', namespace='CELERY')
# 從所有應用中加載任務模塊tasks.py
app.autodiscover_tasks()
# 解決celery不能root用戶啟動的問題
platforms.C_FORCE_ROOT = True
第三步,在djangocelerydemo/__init__.py文件配置如下:
# 引入celery實例對象
from __future__ import absolute_import, unicode_literals
from djangocelerydemo.celeryconfig import app as celery_app
__all__ = ('celery_app',)
import pymysql
pymysql.install_as_MySQLdb()
這里不是app下面的__init__.py文件,是項目下面的。
第四步,在celerytest/tasks.py文件配置如下:
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from djangocelerydemo.celeryconfig import app
@app.task
def plan_task_1():
print("任務_1已運行!")
return {"任務_1:success"}
@app.task
def plan_task_2():
print("任務_2已運行!")
return {"任務_2:success"}
第五步,最后別忘記配置apps.py了,如:
from django.apps import AppConfig
class CelerytestConfig(AppConfig):
name = 'celerytest'
第六步,數據遷移,因為沒有模型,不需要激活,所以在項目根目錄下直接輸入:
python manage.py migrate

如果看到這7大金剛,證明你前面所有的工作已經正確的完成了。
溫馨提示一下,上面的文件中,凡是用到celery文件的,需在第一行(習慣)加入代碼:
from __future__ import absolute_import, unicode_literals
定時任務配置
上面基本上配置完后,創建管理員賬號,如:
python manage createsuperuser
訪問系統地址:http://127.0.0.1:1234/admin/

名詞解析:
界面中 CELERY RESULTS 為 django_celery_results 創建的用於保存任務結果的數據庫表。
Periodic tasks 下面則是由 django_celery_beat 創建的用於保存 Celery 任務及其執行規則的幾張數據庫表,具體含義如下:
1、Clocked:定義在具體某個時間點觸發的執行規則
2、Crontabs:類似於 Linux 系統下 crontab 的語法,定時任務的執行時間
3、Intervals:定義任務重復執行的時間間隔
4、Periodic tasks:具體某個待執行的任務,需要與其他表(Clocked、Crontabs、Intervals、Solar events)中定義的執行規則相關聯
5、Solar events:根據日升和日落等太陽運行軌跡確定執行規則
配置一個每十秒執行一次的規則,步驟如下:

配置定時計划任務,如圖:

執行定時任務
前面已經講過了異步任務和定時任務的命令,現在再次復習一下:
在項目根目錄下執行異步任務命令:
celery -A pro_name worker -l info
這里還是要注意,win10會報一個這樣的錯誤:
ValueError: not enough values to unpack (expected 3, got 0)
需要在上面命令加一個:
celery -A pro_name worker -l info -P eventlet
pro_name是django項目的名稱

成功后,你會看見兩個任務。
在項目根目錄下執行定時任務命令:
celery -A pro_name beat -l info

每十秒執行一次:

執行的結果:

在web界面上可以查到:

這里需要注意:celery.backend_cleanup。有一個內建的周期性任務將刪除過期的任務結果(celery.backend_cleanup),前提是 celery beat 已經被啟用。這個任務每天上午4點運行。值 None 或者 0 意思是結果永不刪除(取決於后端聲明)
注意事項
這里如果安裝celery或者運行任務調度器出現如下報錯:
ModuleNotFoundError: No module named 'vine.five'

celery安裝的時候,會把amqp、vine和kombu一起安裝完成,所以一定要注意celery版本要和amqp、vine和kombu匹配,不然你將很麻煩。
正常版本是:
# 這四個要匹配 amqp==2.6.1 celery==4.3.0 kombu==4.6.11 vine==1.3.0 # 另外匹配項 redis==3.2.1 Django==2.2.2 django-celery-beat==1.5.0 django-celery-results==1.1.2 PyMySQL==0.9.3
這樣的話,你不會看到上面的報錯信息。
總結
以上就是django+celery+redis實例,celery很強大,需要深入研究。如果對python測試開發相關技術感興趣的伙伴,歡迎加入測試開發學習交流QQ群:696400122,不積跬步,無以至千里。
