1、Celery
# tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379/0')
@app.task
def add(a, b):
print(a + b)
創建實例
app是celery的一個實例,第一個參數表示app的名稱,broker申明使用的broker是誰,這里用的是Redis。backend申明后端結果存儲在哪里。
@app.task表示這是app的一個任務。
啟動worker
接着啟動worker。到目錄下執行celery -A tasks worker --loglevel=info
。tasks為實例在哪個模塊中,這里在tasks.py中,celery會自己去找實例,你也可以指定tasks.app。-loglevel 指定了日志級別,也可以用-l info表示。在Windows中,celery4以上在這樣執行后續可能會報錯,那么可以使用celery -A tasks worker -pool=solo -l info
打開worker。或者在tasks.py中加一句os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
。
如果要停止worker,使用control + c。
把任務給worker
在啟動celery worker后,可以使用delay函數把任務給worker去異步執行。
from tasks import add
if __name__ == '__main__':
print("start add")
add.delay(1, 2)
print("end")
如果配置了backend,那么能接收delay的返回值result,使用result.ready()判斷是否執行完畢。
配置
celery允許我們進行相關的配置,比如app.conf.task_serializer = 'json'
。如果需要同時配置多條,可以使用update進行配置。
目錄結構:
proj/main.py
/config.py
/tasks.py
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
對於大型的工程,可以使用專門的配置模塊進行配置,app.config_from_object('celeryconfig')
,celeryconfig為配置模塊的名稱,我們可以在同級目錄下創建celeryconfig.py進行配置。
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
全部代碼:
# main.py
from tasks import add
if __name__ == '__main__':
print("start add")
add.delay(1, 2)
print("end")
# tasks.py
from celery import Celery
import os
os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
app = Celery('tasks')
app.config_from_object('config')
@app.task
def add(a, b):
print(a + b)
# config.py
broker_url='redis://localhost:6379'
result_backend='redis://localhost:6379'
關於PENDING隊列:
Retrieve list of tasks in a queue in Celery
Message Protocol
監控celery
使用events得到事件發生的信息,從而進行監控。
Monitoring and Management Guide
from celery import Celery
def my_monitor(app):
state = app.events.State()
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery(broker='amqp://guest@localhost//')
my_monitor(app)
2、Flower事件實時監控工具
打開本地監聽:flower --port=5555
或者從Celery運行:celery flower --address=127.0.0.1 --port=5555
如果要開放給外網:celery flower --address=0.0.0.0 --port=5555
。
可以在后面加上--basic_auth=name:password
用來進行簡單的登錄驗證。
打開后進入127.0.0.1:5555進入flower界面。
然后我們使用celery就能在flower頁面看到監控記錄了。
flower提供了HTTP API,可以通過此對worker進行遠程操控。