python測試開發django-160.Celery 定時任務 (beat)


前言

Celery 可以異步執行,也可以通過定時任務觸發

環境准備

這里用redis作為中間件,django使用的版本是v2.1.2
安裝django需要用到的第三方包,注意版本號

pip install celery==3.1.26.post2
pip install django-celery==3.3.1
pip install redis==2.10.6

詳細的基礎教程參考前面的https://www.cnblogs.com/yoyoketang/p/15422804.html
本篇主要講定時任務如何實現,下圖中的Celery beat 定時任務

celery 的5個角色

  • Task 就是任務,有異步任務(Async Task)和定時任務(Celery Beat)
  • Broker 中間人,接收生產者發來的消息即Task,將任務存入隊列。任務的消費者是Worker。Celery 本身不提供隊列服務,推薦用Redis或RabbitMQ實現隊列服務。
  • Worker 執行任務的單元,它實時監控消息隊列,如果有任務就獲取任務並執行它。
  • Beat 定時任務調度器,根據配置定時將任務發送給Broker。
  • Backend 用於存儲任務的執行結果。

Django 中使用 Celery

要在 Django 項目中使用 Celery,您必須首先定義 Celery 庫的一個實例(稱為“應用程序”)

如果你有一個現代的 Django 項目布局,比如:

- proj/
  - manage.py
  - proj/
    - __init__.py
    - settings.py
    - urls.py

那么推薦的方法是創建一個新的proj/proj/celery.py模塊來定義 Celery 實例:

import os

from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

其中debug_task是測試的任務,可以注掉

# @app.task(bind=True)
# def debug_task(self):
#     print('Request: {0!r}'.format(self.request))

上面一段只需改這句,'proj'是自己django項目的app名稱

app = Celery('proj')

然后你需要在你的proj/proj/__init__.py 模塊中導入這個應用程序。這確保在 Django 啟動時加載應用程序,以便 @shared_task 裝飾器(稍后提到)將使用它:

proj/proj/__init__.py:

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)

上面這段固定的,不用改

tasks任務

在app下新建tasks.py,必須要是tasks.py文件名稱,django會自動查找到app下的該文件

from __future__ import absolute_import
from celery import shared_task


@shared_task
def add(x, y):
    print("task----------111111----------")
    return x + y


@shared_task
def mul(x, y):
    print("task----------22222----------")
    return x * y

tasks.py可以寫任務函數add、mul,讓它生效的最直接的方法就是添加app.task 或shared_task 這個裝飾器

添加setting配置

setting.py添加配置

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

#  celery 配置連接redis
BROKER_URL = 'redis://192.168.1.1:6379'
CELERY_RESULT_BACKEND = 'redis://192.168.1.1:6379'


# 配置定時任務
from celery.schedules import crontab
from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'yoyo(django app名稱).tasks.add',  # 任務
        'schedule': timedelta(seconds=5),  # 每5秒執行add函數
        'args': (11, 12)  # 運行參數
    },
    'mul': {
        'task': 'yoyo(django app名稱).tasks.mul',  # 任務
        'schedule': timedelta(seconds=10),  # 每10秒執行mul函數
        'args': (11, 2)  # 運行參數
    }
}

CELERYBEAT_SCHEDULE 是配置定時任務,可以添加多個任務,任務名稱可以與tasks中的函數名稱保持一致,也可以自己定義一個任務名稱。

  • task 參數是對應app目錄下的tasks文件中任務函數名稱
  • schedule 運行周期,支持contrab表達式
  • args 運行任務時候帶上的參數

啟動worker 和beat服務

啟動worker,執行任務

celery -A MyDjango(django 項目名稱) worker -l info

運行日志


D:\202107django\MyDjango>celery -A MyDjango worker -l info

 -------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.17134-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         yoyo:0x219342ff978
- ** ---------- .> transport:   redis://192.168.1.1:6379//
- ** ---------- .> results:     redis://192.168.1.1:6379/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . yoyo.tasks.add
  . yoyo.tasks.mul

[2021-10-21 12:20:32,079: INFO/MainProcess] Connected to redis://192.168.1.1:6379//
[2021-10-21 12:20:32,167: INFO/MainProcess] mingle: searching for neighbors
[2021-10-21 12:20:33,700: INFO/MainProcess] mingle: all alone

啟動beat 定時任務監聽

celery -A MyDjango(django 項目名稱) beat -l info

啟動日志

MyDjango>celery -A MyDjango beat -l info
celery beat v3.1.26.post2 (Cipater) is starting.
__    -    ... __   -        _
Configuration ->
    . broker -> redis://192.168.1.1:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> now (0s)
[2021-10-21 12:22:56,867: INFO/MainProcess] beat: Starting...

啟動完成后,會看到beat運行日志,定時任務已經推過去

worker運行日志,執行任務

crontab 周期任務

前面是設置每多少秒執行任務,這個只是測試下功能,任務很簡單,我們一般用crontab 實現周期性任務,比如每周1-5早上執行一遍任務,用crontab 可以輕松實現

# crontab任務
# 每周一8:30調用task.add
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    # Executes every Monday morning at 8:30 A.M
    'add': {
        'task': 'yoyo(django app名稱).tasks.add',  # 任務
        'schedule': crontab(hour=8, minute=30, day_of_week=1),
        'args': (11, 12)  # 運行參數
    }
}

crontab定時任務命令規則:

星期 命令 路徑
minute hour day month week command path
  • | * | * | * | * | command | path
  • minute: 表示分鍾,可以是從0到59之間的任何整數。
  • hour:表示小時,可以是從0到23之間的任何整數。
  • day:表示日期,可以是從1到31之間的任何整數。
  • month:表示月份,可以是從1到12之間的任何整數。
  • week:表示星期幾,可以是從0到7之間的任何整數,這里的0或7代表星期日。
  • command:要執行的命令,可以是系統命令,也可以是自己編寫的腳本文件。
  • path:需執行的文件,用絕對路徑

crontab命令常用的特殊字符

符號 說明
  • |表示任何時刻
    , |表示分割
  • |表示一個段,如第二段里: 1-5,就表示1到5點
    /n |表示每個n的單位執行一次,如第二段里,*/1, 就表示每隔1個小時執行一次命令。也可以寫成1-23/1

定時任務如果做成可配置,存到數據庫,可以用到 djcelery 實現


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM