Celery&Flower文檔筆記


1、Celery

# tasks.py
from celery import Celery 
app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379/0')  
  
@app.task  
def add(a, b):  
    print(a + b)

創建實例

app是celery的一個實例,第一個參數表示app的名稱,broker申明使用的broker是誰,這里用的是Redis。backend申明后端結果存儲在哪里。
@app.task表示這是app的一個任務。

啟動worker

接着啟動worker。到目錄下執行celery -A tasks worker --loglevel=info。tasks為實例在哪個模塊中,這里在tasks.py中,celery會自己去找實例,你也可以指定tasks.app。-loglevel 指定了日志級別,也可以用-l info表示。在Windows中,celery4以上在這樣執行后續可能會報錯,那么可以使用celery -A tasks worker -pool=solo -l info打開worker。或者在tasks.py中加一句os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
如果要停止worker,使用control + c。

把任務給worker

在啟動celery worker后,可以使用delay函數把任務給worker去異步執行。

from tasks import add

if __name__ == '__main__':  
    print("start add")  
    add.delay(1, 2)  
    print("end")

如果配置了backend,那么能接收delay的返回值result,使用result.ready()判斷是否執行完畢。

配置

celery允許我們進行相關的配置,比如app.conf.task_serializer = 'json'。如果需要同時配置多條,可以使用update進行配置。

目錄結構:
proj/main.py
	/config.py
	/tasks.py
app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

對於大型的工程,可以使用專門的配置模塊進行配置,app.config_from_object('celeryconfig'),celeryconfig為配置模塊的名稱,我們可以在同級目錄下創建celeryconfig.py進行配置。

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

全部代碼:

# main.py
from tasks import add  
  
if __name__ == '__main__':  
    print("start add")  
    add.delay(1, 2)  
    print("end")
# tasks.py
from celery import Celery  
import os  
  
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')  
app = Celery('tasks')  
app.config_from_object('config')  
  
@app.task  
def add(a, b):  
    print(a + b)
# config.py
broker_url='redis://localhost:6379'  
result_backend='redis://localhost:6379'

關於PENDING隊列:

Retrieve list of tasks in a queue in Celery
Message Protocol

監控celery

使用events得到事件發生的信息,從而進行監控。
Monitoring and Management Guide

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

2、Flower事件實時監控工具

打開本地監聽:flower --port=5555
或者從Celery運行:celery flower --address=127.0.0.1 --port=5555
如果要開放給外網:celery flower --address=0.0.0.0 --port=5555
可以在后面加上--basic_auth=name:password用來進行簡單的登錄驗證。
打開后進入127.0.0.1:5555進入flower界面。
然后我們使用celery就能在flower頁面看到監控記錄了。

flower提供了HTTP API,可以通過此對worker進行遠程操控。

API Reference
Configuration and defaults


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM