一.Django中的請求
Django Web中從一個http請求發起,到獲得響應返回html頁面的流程大致如下:
http請求發起
經過中間件
http handling(request解析)
url mapping(url匹配找到對應的View)
在View中進行邏輯(包括調用Model類進行數據庫的增刪改查)
經過中間件
返回對應的template/response。
同步請求:所有邏輯處理、數據計算任務在View中處理完畢后返回response。在View處理任務時用戶處於等待狀態,直到頁面返回結果。
異步請求:View中先返回response,再在后台處理任務。用戶無需等待,可以繼續瀏覽網站。當任務處理完成時,我們可以再告知用戶。
二.Django中使用Celery
安裝
pip3 install django-celery
配置
首先創建一個django項目,結構如下:
之后再settings.py的同級目錄添加celeryconfig.py配置文件,更多配置信息可以參考官方文檔。

import djcelery from datetime import timedelta djcelery.setup_loader() # 導入任務 CELERY_IMPORTS = [ 'celeryapp.tasks' ] # 設置隊列 CELERY_QUEUES = { 'beat_tasks': { 'exchange': 'beat_tasks', 'exchange_type': 'direct', 'binding_key': 'beat_tasks' }, 'work_queue': { 'exchange': 'work_queue', 'exchange_type': 'direct', 'binding_key': 'work_queue' } } # 設置默認列隊,不符合其他隊列的任務放在默認隊列 CELERY_DEFAULT_QUEUE = 'work_queue' # 有些情況下可以防止死鎖 CELERYD_FORCE_EXECV = True # 設置並發數量 CELERYD_CONCURRENCY = 4 # 每個worker最多執行100個任務,防止泄露內存 CELERYD_MAX_TASKS_PER_CHILD = 100 # 單個任務最多執行時間 CELERYD_TASK_TIME_LIMIT = 12 * 30 # 設置定時執行 CELERYBAET_SCHEDULE = { 'task1': { 'task': 'course-task', 'schedule': timedelta(seconds=5), 'options': { 'queue': 'beat_tasks' } } } CELERY_ACCEPT_CONTENT = ['pickle', 'json', ] BROKER_BACKEND = 'redis' BROKER_URL = 'redis://localhost:6379/1' CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'

from .celeryconfig import * # 導入Celery配置信息 INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'celeryapp.apps.CeleryappConfig', 'djcelery' # 注冊celery ]

import time from celery.task import Task class Course(Task): name = 'course-task' def run(self, *args, **kwargs): print('start...') time.sleep(3) print(f'args={args},kwargs={kwargs}') print('end task....')

from django.http import JsonResponse from celeryapp.tasks import Course def course(request, *args, **kwargs): # 執行異步任務 print("start course...") # Course.delay() # 可以使用apply_async傳遞參數,指定隊列 Course.apply_async(args=('hello',), queue='work_queue') print("end course...") return JsonResponse({'result': 'ok'})

from django.contrib import admin from django.urls import path from celeryapp.views import course urlpatterns = [ path('admin/', admin.site.urls), path('course/', course), ]
啟動redis作為消息中間人
啟動django項目,然后訪問http://localhost:8000/course/,觸發任務
python manage.py runserver
啟動worker
python manage.py celery worker -l info
可以看到配置情況,以及任務的執行情況:
啟動beat
python manage.py celery beat -l info
三.常見錯誤
♦ AttributeError: ‘str’ object has no attribute ‘items’
解決方法: redis版本過高,降低redis版本即可
pip install redis==2.10.6
♦ from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger SyntaxError: invalid syntax
這個是python3.7目前不支持kombu,降低python版本至3.6即可,可以使用conda進行直接安裝
conda install python=3.6.8
四.過程監控
Celery提供了一個工具flower,將各個任務的執行情況、各個worker的健康狀態進行監控並以可視化的方式展現。
安裝監控
pip install flower
執行flower
python manage.py celery flower
本地端口:5555查看監控
刷新course頁面,查看tasks,發現有剛剛執行完成的任務
查看broker
進入Monitor查看任務執行情況,執行成功,執行失敗,消耗的時間,隊列里面的任務情況
點擊的worker查看具體worker情況
可以給flower添加密碼認證,添加之后再訪問則需要輸入用戶名和密碼
python manage.py celery flower --basic_auth=username:password