Python—在Django中使用Celery


一.Django中的請求

  Django Web中從一個http請求發起,到獲得響應返回html頁面的流程大致如下:

    http請求發起 

    經過中間件

      http handling(request解析) 

    url mapping(url匹配找到對應的View) 

    在View中進行邏輯(包括調用Model類進行數據庫的增刪改查)

    經過中間件

    返回對應的template/response。

  

  同步請求:所有邏輯處理、數據計算任務在View中處理完畢后返回response。在View處理任務時用戶處於等待狀態,直到頁面返回結果。

  異步請求:View中先返回response,再在后台處理任務。用戶無需等待,可以繼續瀏覽網站。當任務處理完成時,我們可以再告知用戶。

二.Django中使用Celery

安裝

pip3 install django-celery

配置

  首先創建一個django項目,結構如下:

                

 

 

    之后再settings.py的同級目錄添加celeryconfig.py配置文件,更多配置信息可以參考官方文檔。

import djcelery
from datetime import timedelta

djcelery.setup_loader()

# 導入任務
CELERY_IMPORTS = [
    'celeryapp.tasks'
]
# 設置隊列
CELERY_QUEUES = {
    'beat_tasks': {
        'exchange': 'beat_tasks',
        'exchange_type': 'direct',
        'binding_key': 'beat_tasks'
    },
    'work_queue': {
        'exchange': 'work_queue',
        'exchange_type': 'direct',
        'binding_key': 'work_queue'
    }
}
# 設置默認列隊,不符合其他隊列的任務放在默認隊列
CELERY_DEFAULT_QUEUE = 'work_queue'

# 有些情況下可以防止死鎖
CELERYD_FORCE_EXECV = True

# 設置並發數量
CELERYD_CONCURRENCY = 4

# 每個worker最多執行100個任務,防止泄露內存
CELERYD_MAX_TASKS_PER_CHILD = 100

# 單個任務最多執行時間
CELERYD_TASK_TIME_LIMIT = 12 * 30

# 設置定時執行
CELERYBAET_SCHEDULE = {
    'task1': {
        'task': 'course-task',
        'schedule': timedelta(seconds=5),
        'options': {
            'queue': 'beat_tasks'
        }
    }
}

CELERY_ACCEPT_CONTENT = ['pickle', 'json', ]

BROKER_BACKEND = 'redis'
BROKER_URL = 'redis://localhost:6379/1'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
celeryconfig.py
from .celeryconfig import *  # 導入Celery配置信息


INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'celeryapp.apps.CeleryappConfig',
    'djcelery'  # 注冊celery
]
settings.py
import time
from celery.task import Task


class Course(Task):
    name = 'course-task'

    def run(self, *args, **kwargs):
        print('start...')
        time.sleep(3)
        print(f'args={args},kwargs={kwargs}')
        print('end task....')
tasks.py
from django.http import JsonResponse
from celeryapp.tasks import Course


def course(request, *args, **kwargs):
    # 執行異步任務
    print("start course...")
    # Course.delay()
    # 可以使用apply_async傳遞參數,指定隊列
    Course.apply_async(args=('hello',), queue='work_queue')
    print("end course...")
    return JsonResponse({'result': 'ok'})
views.py
from django.contrib import admin
from django.urls import path
from celeryapp.views import course

urlpatterns = [
    path('admin/', admin.site.urls),
    path('course/', course),
]
urls.py

啟動redis作為消息中間人

redis-server 

啟動django項目,然后訪問http://localhost:8000/course/,觸發任務

python manage.py runserver

啟動worker

python manage.py celery worker -l info   

    可以看到配置情況,以及任務的執行情況:

            

 

 

      

 

 

      

 

 

 啟動beat

python manage.py celery beat -l info

         

 

 

 三.常見錯誤

♦  AttributeError: ‘str’ object has no attribute ‘items’

解決方法: redis版本過高,降低redis版本即可

pip install redis==2.10.6

 

♦  from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger SyntaxError: invalid syntax

  這個是python3.7目前不支持kombu,降低python版本至3.6即可,可以使用conda進行直接安裝

conda install python=3.6.8

 四.過程監控

  Celery提供了一個工具flower,將各個任務的執行情況、各個worker的健康狀態進行監控並以可視化的方式展現。

安裝監控

pip install flower

執行flower

python manage.py celery flower

本地端口:5555查看監控

刷新course頁面,查看tasks,發現有剛剛執行完成的任務

  查看broker

 

 進入Monitor查看任務執行情況,執行成功,執行失敗,消耗的時間,隊列里面的任務情況

點擊的worker查看具體worker情況 

 可以給flower添加密碼認證,添加之后再訪問則需要輸入用戶名和密碼

python manage.py celery flower --basic_auth=username:password 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM