基於 Django 2.0.4 的 djcelery 配置


Django Celery 配置實踐

所需環境

python 3.5.2

rabbitmq

安裝所需的包

pip install -r requirements.txt

QuickStart

創建Django項目

創建一個名為proj的Django項目

django-admin startproject proj

創建Django App

創建一個用於演示的django app,這里名為demo

django-admin startapp demo

在創建的app中,增加tasks.py文件,用於編寫celery任務

基礎配置項目

修改proj/settings.py配置文件,增加celery相關配置。

增加djcelery app

修改settings.py中INSTALLED_APPS,增加djcelery及app

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'djcelery',
    'demo'
]

celery相關的參數配置

如果僅僅需求使用celery異步執行任務的話,以下最基礎的配置就可以滿足需求

# 導入tasks文件,因為我們使用autodiscover_tasks
# 會自動導入每個app下的tasks.py,所以這個配置不是很必要
# 如果需要導入其他非tasks.py的模塊,則需要再此配置需要導入的模塊
# CELERY_IMPORTS = ('demo.tasks', )
# 配置 celery broker
CELERY_BROKER_URL = 'amqp://user:password@127.0.0.1:5672//'
# 配置 celery backend 用Redis會比較好
# 因為手上沒有redis服務器,所以演示時用RabbitMQ替代
CELERY_RESULT_BACKEND = 'amqp://user:password@127.0.0.1:5672//'

創建Celery實例

在proj目錄下,編輯celery.py文件,用於創建celery實例

from celery import Celery
from django.conf import settings

# 創建celery應用
celery_app = Celery('proj', broker=settings.CELERY_BROKER_URL)
# 從配置文件中加載除celery外的其他配置
celery_app.config_from_object('django.conf:settings')
# 自動檢索每個app下的tasks.py
celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

編寫異步任務

在之前創建的demo/tasks.py中,編寫一個用於演示的異步任務。

注意每個異步任務之前都需要使用@celery_app.task裝飾器。

celery_app實際是之前在proj/celery.py中創建的celery的實例,如果你的實例名稱不一樣,做對應的修改即可。

import logging
from proj.celery import celery_app

@celery_app.task
def async_task():
    logging.info('run async_task')

調用異步任務

在demo/views.py中定義一個頁面,只用來調用異步任務。

from django.http import HttpResponse
from demo.tasks import async_demo_task

# Create your views here.
def demo_task(request):
    # delay表示將任務交給celery執行
    async_demo_task.delay()
    return HttpResponse('任務已經運行')

在proj/urls.py中注冊對應的url。

from django.contrib import admin
from django.urls import path
from demo.views import demo_task

urlpatterns = [
    path('admin/', admin.site.urls),
    path('async_demo_task', demo_task),
]

啟動Celery Worker

使用命令啟動worker:

manage.py celery -A proj worker -l info

對參數做個簡單的說明:

-A proj是指項目目錄下的celery實例。演示項目名為proj,所以-A的值是proj。如果項目名是其他名字,將proj換成項目對應的名字。

-l info 是指日志記錄的級別,這里記錄的是info級別的日志。

如果配置沒有問題,能成功連接broker,則會有類似以下的日志:

 -------------- celery@Matrix.local v3.1.26.post2 (Cipater)
---- **** ----- 
--- * ***  * -- Darwin-17.5.0-x86_64-i386-64bit
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj:0x108ab1eb8
- ** ---------- .> transport:   amqp://user:**@127.0.0.1:5672//
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . demo.tasks.async_demo_task

[2018-04-24 08:24:47,656: INFO/MainProcess] Connected to amqp://user:**@127.0.0.1:5672//

需要注意的是日志中的tasks部分,可以看到已經自動識別到了demo.tasks.async_demo_task這個用於演示的任務。

如果沒有識別到,檢查下celery實例是否調用autodiscover_tasks方法,或配置文件的CELERY_IMPORTS是否配置正確。

調用異步任務

在demo/views.py中定義一個頁面,只用來調用異步任務。

from django.http import HttpResponse
from demo.tasks import async_demo_task

# Create your views here.
def demo_task(request):
    # delay表示將任務交給celery執行
    async_demo_task.delay()
    return HttpResponse('任務已經運行')

在proj/urls.py中注冊對應的url。

from django.contrib import admin
from django.urls import path
from demo.views import demo_task

urlpatterns = [
    path('admin/', admin.site.urls),
    path('async_demo_task', demo_task),
]

最后,啟動django,訪問url http://127.0.0.1:8000/async_demo_task 調用異步任務。

在worker的日志中,可以看到類似的執行結果,即說明任務已經由celery異步執行。

如果出現"Using settings.DEBUG leads to a memory leak, never "的警告信息,則在生產環境中關閉掉django的debug模式即可。

[2018-04-24 09:25:52,677: INFO/MainProcess] Received task: demo.tasks.async_demo_task[1105c262-9371-4791-abd2-6f78d654b391]
[2018-04-24 09:25:52,681: INFO/Worker-4] run async_task
[2018-04-24 09:25:52,899: INFO/MainProcess] Task demo.tasks.async_demo_task[1105c262-9371-4791-abd2-6f78d654b391] succeeded in 0.21868160199665s: None

為任務分配隊列

請參考這里celery-demo

配置計划任務

同樣請參這里celery-demo

使用Django Admin管理Celery計划任務

使用djcelery,而不直接使用celery的好處就在於可以通過Django Admin對Celery的計划任務進行管理。

啟動進程

使用計划任務時,除了保證原先的worker正常運行外(worker的啟動方式見上),還需要啟動beats:

python manage.py celery beat

也可以beat和worker一起啟動

python manage.py celery -A project worker -l info --beat

創建數據庫

python manage.py migrate

創建Django Admin和djcelery對應的表,這里的數據庫使用默認的sqlite。

創建管理員

python manage.py createsuperuser,依次輸入超級管理員帳號、郵箱、密碼。

演示項目中設置帳號:admin 密碼: superplayer123

修改配置文件

在settings.py中,增加兩項配置:

# 設定時區,配置計划任務時需要
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

創建計划任務

訪問 http://127.0.0.1:8000/admin/djcelery/periodictask/add/,用於創建定時任務。

簡單的解釋下創建定時任務的選項:

字段 說明
名稱 便於理解的計划任務名稱
Task (registered) 選擇一個已注冊的任務
Task (custom)  
Enabled 任務是否啟用
Interval 按某個時間間隔執行
Crontab 定時任務, 和Interval二選一
Arguments 以list的形式傳入參數,json格式
Keyword arguments: 以dict的形式傳入參數,json格式
Expires 任務到期時間
Queue 指定隊列,隊列名需要在配置文件的 CELERY_QUEUES定義好
Exchange Exchange
Routing key Routing key

通過Model操作計划任務

本質上來說,就是對PeriodicTask這個model的操作。

下面模擬一個簡單的增加計划任務的接口:

def add_task(request):
    interval = IntervalSchedule.objects.filter(every=30, period='seconds').first()
    periodic_task = PeriodicTask(name='test', task='demo.tasks.async_demo_task', interval=interval)
    periodic_task.save()
    return HttpResponse('任務已經添加')

在proj/urls.py中增加url地址進行訪問: 

urlpatterns = [
    path('admin/', admin.site.urls),
    path('async_demo_task', demo_task),
    path('add_task', add_task),
    path('get_periodic_task_list', get_periodic_task_list),
]

通過瀏覽器訪問http://127.0.0.1:8000/add_task 就可以直接添加一個間隔30秒的計划任務了。

然后在beat中可以看到類似日志,檢測到了Schedule改變,並且自動運行剛剛添加的任務。

[2018-05-03 17:18:10,012: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2018-05-03 17:18:10,013: INFO/MainProcess] Writing entries (0)...
[2018-05-03 17:18:40,020: INFO/MainProcess] Scheduler: Sending due task test (demo.tasks.async_demo_task)
[2018-05-03 17:19:10,021: INFO/MainProcess] Scheduler: Sending due task test (demo.tasks.async_demo_task)

同樣的,通過獲取PeriodicTask的數據,也可以得到正在運行的任務。

def get_periodic_task_list(request):
    """
    獲取周期性任務列表
    :return:
    """
    periodic_task_list = PeriodicTask.objects.all()
    data = [model_to_dict(periodic_task) for periodic_task in periodic_task_list]
    resp = json.dumps(data, cls=CustomJSONEncoder, ensure_ascii=False)
    return HttpResponse(resp, content_type='application/json', status=200)

更多的功能都可以通過操作djcelery的model進行實現。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM