引言
前面已經學習了celery+redis的異步和定時任務,下面介紹如何結合django來使用。
環境配置
在動手之前,一定要准備好的是環境,celery版本有很多,在使用過程中如何版本與django和redis版本不配套,將會很麻煩。
我這里的環境如下:
celery==4.3.0 Django==2.2.2 django-celery-beat==1.5.0 django-celery-results==1.1.2 kombu==4.6.11 -- celery的依賴 PyMySQL==0.9.3 redis==3.2.1 python-crontab==2.5.1
創建項目
再復習一下創建django項目的命令,打開cmd窗口,輸入:
django-admin startproject 項目名
進入剛剛創建的項目根目錄下,創建應用(app),輸入:
python manage.py startapp 應用名
在應用celerytest根目錄下新建tasks.py文件,用於定義計划任務,注意此處只能以tasks命名(設計如此)
在django的項目目錄(djangocelerydemo)中創建celery.py(與settings.py在同一級目錄)文件,當然你也可以命名成celeryconfig.py文件,
這個文件沒有要求,為啥要創建這個文件呢?
因為,要將Celery與Django項目一起使用,必須首先定義Celery庫的實例,也就是創建celery的應用。文件放在此處,這種設置方法可以讓celery自動在所有app中查找tasks文件,比較適合多人多APP同時開發的中大型項目 詳情參考:Using Celery with Django
項目結構與配置
第一步,在djangocelerydemo/setting.py文件配置如下:
# APP配置 INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'django_celery_beat', 'django_celery_results', # 查看 celery 執行結果 'celerytest.apps.CelerytestConfig', ] # 數據庫配置 DATABASES = { 'default': { 'ENGINE': 'django.db.backends.mysql', 'NAME': 'djangocelerydemo', 'HOST': '127.0.0.1', 'PORT': '3306', 'USER': 'root', 'PASSWORD': '123456', } } # 時區與celery相關配置 LANGUAGE_CODE = 'zh-hans' # 簡體中文界面 TIME_ZONE = 'Asia/Shanghai' # 亞洲/上海時區 USE_I18N = True USE_L10N = True USE_TZ = False # 不使用國際標准時間 # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/2.2/howto/static-files/ STATIC_URL = '/static/' # django-celery-beat # django-celery-results CELERY_ENABLE_UTC = False # 不使用國際標准時間 CELERY_TIMEZONE = 'Asia/Shanghai' # 使用亞洲/上海時區 DJANGO_CELERY_BEAT_TZ_AWARE = False # 解決時區問題 CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # 使用0號數據庫 CELERY_BROKER_TRANSPORT = 'redis' # 使用redis作為中間件 CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 自定義調度類,使用Django的ORM CELERY_RESULT_BACKEND = 'django-db' # 任務結果,使用Django的ORM CELERY_ACCEPT_CONTENT = ['application/json'] # 設置任務接收的序列化類型 CELERY_TASK_SERIALIZER = 'json' # 設置任務序列化方式 CELERY_RESULT_SERIALIZER = 'json' # 設置結果序列化方式
注意,如何你的函數返回的不是json, 將報錯:
kombu.exceptions.EncodeError: Object of type 'set' is not JSON serializable
解決:
CELERY_TASK_SERIALIZER = 'pickle' # 設置任務序列化方式 CELERY_RESULT_SERIALIZER = 'pickle' # 設置結果序列化方式 CELERY_ACCEPT_CONTENT = ['pickle', 'json'] # 設置任務接收的序列化類型
將之前setting中三個替換成這三個即可。原因:celery4版本的 默認使用 JSON 作為 serializer ,而 celery3版本的默認使用 pickle。所以為了讓報錯消除,需要添加以上設置。
第二步,在djangocelerydemo/celeryconfig.py文件配置如下:
from __future__ import absolute_import, unicode_literals import os from celery import Celery, platforms from django.utils.datetime_safe import datetime # 獲取當前文件夾名,即為該 Django 的項目名 project_name = os.path.split(os.path.abspath('.'))[-1] project_settings = '%s.settings' % project_name print(project_settings) # 設置默認celery命令行的環境變量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangocelerydemo.settings') # 實例化 Celery,項目名稱 app = Celery('djangocelerydemo') # 解決時區問題 app.now = datetime.now # 使用 django 的 settings 文件配置 celery app.config_from_object('django.conf:settings', namespace='CELERY') # 從所有應用中加載任務模塊tasks.py app.autodiscover_tasks() # 解決celery不能root用戶啟動的問題 platforms.C_FORCE_ROOT = True
第三步,在djangocelerydemo/__init__.py文件配置如下:
# 引入celery實例對象 from __future__ import absolute_import, unicode_literals from djangocelerydemo.celeryconfig import app as celery_app __all__ = ('celery_app',) import pymysql pymysql.install_as_MySQLdb()
這里不是app下面的__init__.py文件,是項目下面的。
第四步,在celerytest/tasks.py文件配置如下:
# Create your tasks here from __future__ import absolute_import, unicode_literals from djangocelerydemo.celeryconfig import app @app.task def plan_task_1(): print("任務_1已運行!") return {"任務_1:success"} @app.task def plan_task_2(): print("任務_2已運行!") return {"任務_2:success"}
第五步,最后別忘記配置apps.py了,如:
from django.apps import AppConfig class CelerytestConfig(AppConfig): name = 'celerytest'
第六步,數據遷移,因為沒有模型,不需要激活,所以在項目根目錄下直接輸入:
python manage.py migrate
如果看到這7大金剛,證明你前面所有的工作已經正確的完成了。
溫馨提示一下,上面的文件中,凡是用到celery文件的,需在第一行(習慣)加入代碼:
from __future__ import absolute_import, unicode_literals
定時任務配置
上面基本上配置完后,創建管理員賬號,如:
python manage createsuperuser
訪問系統地址:http://127.0.0.1:1234/admin/
名詞解析:
界面中 CELERY RESULTS 為 django_celery_results 創建的用於保存任務結果的數據庫表。
Periodic tasks 下面則是由 django_celery_beat 創建的用於保存 Celery 任務及其執行規則的幾張數據庫表,具體含義如下:
1、Clocked:定義在具體某個時間點觸發的執行規則
2、Crontabs:類似於 Linux 系統下 crontab 的語法,定時任務的執行時間
3、Intervals:定義任務重復執行的時間間隔
4、Periodic tasks:具體某個待執行的任務,需要與其他表(Clocked、Crontabs、Intervals、Solar events)中定義的執行規則相關聯
5、Solar events:根據日升和日落等太陽運行軌跡確定執行規則
配置一個每十秒執行一次的規則,步驟如下:
配置定時計划任務,如圖:
執行定時任務
前面已經講過了異步任務和定時任務的命令,現在再次復習一下:
在項目根目錄下執行異步任務命令:
celery -A pro_name worker -l info
這里還是要注意,win10會報一個這樣的錯誤:
ValueError: not enough values to unpack (expected 3, got 0)
需要在上面命令加一個:
celery -A pro_name worker -l info -P eventlet
pro_name是django項目的名稱
成功后,你會看見兩個任務。
在項目根目錄下執行定時任務命令:
celery -A pro_name beat -l info
每十秒執行一次:
執行的結果:
在web界面上可以查到:
這里需要注意:celery.backend_cleanup。有一個內建的周期性任務將刪除過期的任務結果(celery.backend_cleanup),前提是 celery beat 已經被啟用。這個任務每天上午4點運行。值 None 或者 0 意思是結果永不刪除(取決於后端聲明)
注意事項
這里如果安裝celery或者運行任務調度器出現如下報錯:
ModuleNotFoundError: No module named 'vine.five'
celery安裝的時候,會把amqp、vine和kombu一起安裝完成,所以一定要注意celery版本要和amqp、vine和kombu匹配,不然你將很麻煩。
正常版本是:
# 這四個要匹配 amqp==2.6.1 celery==4.3.0 kombu==4.6.11 vine==1.3.0 # 另外匹配項 redis==3.2.1 Django==2.2.2 django-celery-beat==1.5.0 django-celery-results==1.1.2 PyMySQL==0.9.3
這樣的話,你不會看到上面的報錯信息。
總結
以上就是django+celery+redis實例,celery很強大,需要深入研究。如果對python測試開發相關技術感興趣的伙伴,歡迎加入測試開發學習交流QQ群:696400122,不積跬步,無以至千里。