Celery 基本使用


1. 認識 Celery

Celery 是一個 基於 Python 開發的分布式異步消息任務隊列,可以實現任務異步處理,制定定時任務等。

  • 異步消息隊列:執行異步任務時,會返回一個任務 ID 給你,過一段時間后拿着任務 ID 去取執行結果
  • 定時任務:類似於 Windows / Linux 上的定時任務,到點執行任務

Celery 在執行任務時需要通過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 一般使用 rabbitMQRedis(默認采用 RabbitMQ)

優點:

  • 簡單易用
  • 高可用:即使執行失敗或執行過程發生中斷,也會嘗試再次執行
  • 快速:一個單進程的 Celery 每分鍾可以執行上百萬個任務
  • 拓展性強:Celery 的各個組件都可以拓展和自定制

Celery 構成

Celery 主要模塊:

  • 任務模塊 Task:異步和定時任務
  • 消息中間件 Broker:即任務調度隊列,接收生產者發來的任務,將任務存入隊列。Celery 本身不提供隊列服務,官方推薦 RabbitMQ 或 Redis 等
  • 任務執行單元 Worker:處理任務,實時監控消息隊列,獲取隊列中調度的任務,並執行它。
  • 結果存儲 Backend:存儲任務執行結果,以便查詢,與中間件一樣,也可以使用 RabbitMQ、Redis 或 MongoDB 存儲

2. 異步任務

實現異步任務步驟:

  • 創建一個 Celery 實例
  • 啟動 Celery Worker
  • 應用程序調用異步任務

1、安裝

pip3 install 'celery[redis]'
pip3 install celery

2、創建 Celery 實例

C1/tasks.py

# -*- coding: utf-8 -*-
 
import time
from celery import Celery
 
broker = 'redis://127.0.0.1:6379'		# 消息中間件
backend = 'redis://127.0.0.1:6379/0'	# backend ,存儲結果
 
app = Celery('my_task', broker=broker, backend=backend)		# 創建實例
 
# 創建一個任務,5s 后執行
@app.task(name='tasks.add')
def add(x, y):
    time.sleep(5) 	# 模擬耗時操作
    return x + y

3、啟動 Celery Worker

打開 Ubuntu 終端,輸入:celery worker -A C1.tasks --loglevel=info,看到如下圖就表示啟動成功了:

參數:

  • A:指定實例所在位置
  • --loglevel:指定日志級別,有:warning、debug、info、error、fatal ,默認 warning

4、調用任務

另起一個終端,進入 Python 環境,執行任務:

# Celery 提供兩種方法來調用任務,delay() 或 apply_async() 方法
python3
>>> from tasks import add
>>> add.delay(6, 8)		# 調用任務,並返回一個任務 ID
<AsyncResult: 194e99af-d0bd-481b-a500-433ec19117e4>

判斷任務是否完成:

>>> result = add.delay(6, 8)
>>> result.ready()			# True 表示已完成
True

獲取任務結果:

>>> result.get()
14

踩坑:在調用任務時出現Received unregistered task of type 'tasks.add'.

  • 原因:Celery 沒有找到讀取到任務
  • 解決辦法:在裝飾器出加上 name='tasks.add'

參考博客:Received unregistered task of type ‘XXX’ Celery報錯

3. 項目中使用 celery

celery 還可以配置成一個應用,放置在項目中使用,其結構為:

Tips:

  • 項目應該是個包文件
  • 必須命名為 celery.py,否則報錯 AttributeError:module 'proj' has no attribute 'celery'

1、proj/celery.py

from __future__ import absolute_import, unicode_literals		# 將相對路徑轉換為絕對路徑
from celery import Celery
# 創建一個Celery的實例
app = Celery('tasks',
             # redis://:password@hostname:port/db_number  有密碼認證的連接
             broker='redis://127.0.0.1:6379',
             # broker='redis://:密碼@192.168.2.105:6379/0',
             backend='redis://127.0.0.1:6379/0',  # 用於Celery的返回結果的接收
             include=['proj.tasks']       # 用於聲明Celery要執行的tasks任務的位置
             )
# 配置結果超時時間
app.conf.update(
    result_expires=3600,   # Celery結果存在中間件Redis的超時時間[僅針對當前的Celery的App]
)
if __name__ == '__main__':
    app.start()

2、proj/tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app  # 從我的Celery中導入App
import time

@app.task(name='tasks.add')		# 需要配置 name='tasks.add',否則報 Received unregistered task of type 'app.tasks.add'.
def add(x, y):
    time.sleep(10)
    return x + y


@app.task(name='tasks.mul')
def mul(x, y):
    time.sleep(10)
    return x * y

3、啟動 worker,分為前台和后台啟動(無需關心起行為):

# 前台
celery -A proj worker -l info

運行結果如下:

4、調用任務:

# 在這里使用終端調用,也可以再項目中調用
>>> from proj.tasks import add, mul

>>> result1 = add.delay(5, 8)
>>> result2 = mul.delay(5, 8)
>>> result1.get()		# 取值
13
>>> result2.get()
40

worker 放在后台繼續運行,我們可以繼續做別的事情:

# w1:worker
celery multi start w1 -A proj -l info		# 啟動 worker
celery multi restart w1 -A proj -l info		# 重啟
celery multi stop w1 -A proj -l info		# 關閉
ps -ef | grep celery						# 查看目前還有幾個 worker 正在運行


參考文章

4. 定時任務

celery 通過 celery beat 模塊即可實現定時任務功能。

4.1 小試牛刀

1、新建一個 c1\task1.py,編輯如下:

from celery import Celery
from celery.schedules import crontab
 
app = Celery()
 
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每過 10 s,執行一次 hello
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
 
    # 每過 30 s,執行一次 world
    sender.add_periodic_task(30.0, test.s('world'), expires=10)
 
    # 每周一七點三十執行一次 Happy Mondays!
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )
 
@app.task
def test(arg):
    print(arg)

也可以配置成下面這樣,或許更好理解:

# 可以配置多個
app.conf.beat_schedule = {
    'add-every-30-seconds': {			# 任務名字
        'task': 'tasks.add',			# 執行 tasks 中的 add 函數
        'schedule': 30.0,				# 時間,也可以用 timedelta(seconds=20),
        'args': (16, 16)				# 參數
    },
}
app.conf.timezone = 'UTC'				# 時區

2、啟動 beat 進程,監控是否有任務:

hj@hj:~/桌面/c1$ celery -A task1 beat

3、啟動 worker 執行任務:

hj@hj:~/桌面/c1$ celery -A task1 worker

從上圖中可以看到,每過 10s,就會輸出一個 hello,每過 30s 輸出一個 world,當然這只是幾個比較簡單的任務示例。

beat 需要將任務的最后運行時間存儲在本地數據庫文件中(默認名稱為 celerybeat-schedule),因此需要訪問當前目錄中的寫入,或者您可以為此文件指定自定義位置:

# beat 運行時,會產生幾個文件
hj@hj:~/桌面/c1$ ls
celerybeat.pid  celerybeat-schedule  __pycache__  task1.py

# 指定文件位置
celery -A task1 beat -s /home/celery/var/run/celerybeat-schedule

4.2 使用 crontab 構建復雜定時任務

如果你只是想每過多少秒輸出一個 hello 的話,那么上面的功能就能實現。但是若你想每周一的早上七點半定時發送一封郵件或提醒做什么事的話,那么就只能使用 crontab 才能實現(與 Linux 自帶的 crontab功能是一樣的)。

from celery.schedules import crontab
from datetime import timedelta


app.conf.beat_schedule = {
     # 任務一
    'sum-task':{				# 任務名
        'task':'tasks.add',		# 執行 tasks.py 中的 add 函數
        'schedule':timedelta(seconds=20),		# 時間
        'args':(5, 6)			# 參數
    },
    # 任務二
    'multi-task': {
        'task': 'tasks.multi',
        'schedule': crontab(hour=4, minute=30, day_of_week=1),
        'args': (3, 4)
    }
}

更多關於 crontab

示例 說明
crontab() 每分鍾執行一次
crontab(minute=0, hour=0) 每天午夜執行
crontab(minute=0, hour='*/3') 每三個小時執行一次
crontab(minute=0,``hour='0,3,6,9,12,15,18,21') 與上面相同
crontab(minute='*/15') 每 15min執行一次
crontab(day_of_week='sunday') 周日每分鍾執行一次
crontab(minute='*',``hour='*',``day_of_week='sun') 與上面相同
crontab(minute='*/10',``hour='3,17,22',``day_of_week='thu,fri') 每周四或周五凌晨3-4點,下午5-6點和晚上10-11點
crontab(minute=0,hour='*/2,*/3') 每過一個小時執行一次, 以下時間除外: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') 執行小時可被5整除,比如下午三點(十五點)觸發
crontab(minute=0, hour='*/3,8-17') 執行時間能被 2整除,在辦公時間 8-17點,每小時執行一次
crontab(0, 0,day_of_month='2') 每個月第二天執行
crontab(0, 0,``day_of_month='2-30/3') 每個偶數日執行
crontab(0, 0,``day_of_month='1-7,15-21') 在本月的第一周和第三周執行
crontab(0, 0,day_of_month='11',``month_of_year='5') 每年5月11日執行
crontab(0, 0,``month_of_year='*/3') 每個季度第一個月執行

參考文章

5. Django 中使用 Celery

5.1 構建簡單的異步任務

- project/			# 項目主目錄
  - app/			# app
    	- urls.py
        - views.py
        - tasks.py	# celery 任務,名字必須是 tasks.py
  - project/			# 項目文件
		- __init__.py
  		- settings.py
  		- urls.py
        - celery.py		# 創建 Celery 實例,加載 redis 配置文件
  - manage.py

在 Django 中使用 Celery ,依賴 django_celery_beat,因此先要安裝它:

pip3 install django_celery_beat

並將其添加到 settings.py 中:

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app',
    'django_celery_beat',
]

...
# redis  連接
CELERY_BROKER_URL = 'redis://127.0.0.1:6397'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6397/0'

1、project/celery

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, platforms

# 使用 Django 環境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Project.settings')

app = Celery('celery_task')

app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# 運行 root 用戶運行 celery
platforms.C_FORCE_ROOT = True

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

2、project/__init__.py

from __future__ import absolute_import, unicode_literals

# 確保導入應用,Django 啟動就能使用 app 

from .celery import app as celery_app

__all__ = ['celery_app']

3、創建任務 app/tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time 

@shared_task
def add(x, y):
    time.sleep(10)
    return x + y

@shared_task
def multi(x, y):
    time.sleep(10)
    return x * y

tasks.py 必須在各個 app 根目錄下,且只能叫 tasks.py

4、視圖中調用任務 views.py

  • ready():判斷任務是否執行完畢
  • get(timeout=1):獲取結果
  • traceback():獲取原始回溯信息
from django.shortcuts import render, HttpResponse
from celery.result import AsyncResult

def celery_test(request):
    # 調用任務
    task = add.delay(4,22)

    return HttpResponse(task.id)	# 獲取任務 id

def celery_res(request):
    # 獲取任務結果
    task_id = 'b3fbe0da-57bb-4055-aea2-160afd6ae801'
    res = AsyncResult(id=task_id)
    return HttpResponse(res.get())		# 獲取結果

5、路由配置 app/urls.py

path('celery_test/', views.celery_test, name='celery_test'),
path('celery_result/', views.celery_result, name='celery_result'),

6、打開終端啟動 worker

celery -A project worker -l info

訪問 127.0.0.1:8000/app/celery_test 調用執行任務:

訪問 127.0.0.1:8000/app/celery_result 查看任務結果:

因為這是異步處理的,所有再執行任務時,其他代碼照樣執行。

5.2 在 Django 中使用定時任務

在 Django 也能設置定時任務,依賴於 django_celery_beatcrontab

1、在 project/celery.py 添加定時任務:

from celery.schedules import crontab
from datetime import timedelta


app.conf.update(
        CELERYBEAT_SCHEDULE = {
            # 任務一
            'sum-task':{
                'task':'app.tasks.add',
                'schedule':timedelta(seconds=20),
                'args':(5, 6)
                },
            # 任務二
            'multi-task': {
                'task': 'app.tasks.multi',
                'schedule': crontab(hour=4, minute=30, day_of_week=1),
                'args': (3, 4)
                }
            }
        )

在上面添加了兩個定時任務 sum-taskmulti-task

  • sum-task :每過 20 s執行一次 add() 函數
  • multi-task:每周一的早上四點三十分執行一次 multi() 函數

啟動 celery beat ,celery 啟動一個 beat 進程不斷檢查是否有任務要執行:

celery -A project beat -l info

timedelta

timedelta 是datetime 的一個對象,需要引入 from datatime import timedelta,參數如下:

  • days:天
  • seconds:秒
  • microseconds:微秒
  • milliseconds:毫秒
  • minutes:分鍾
  • hours:小時

crontab

  • month_of_year:月份
  • day_of_month:日期
  • day_of_week:周
  • hour:小時
  • minute:分鍾

總結

  • 同時啟動異步任務和定時任務:celery -A project worker -b -l info
  • 使用 RabbitMQ,配置:broker='amqp://admin:admin@localhost'
  • Celery 長時間運行避免內存泄露,添加配置:CELERY_MAX_TASKS_PER_CHILD = 10


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM