Celery的實踐指南
celery原理:
celery實際上是實現了一個典型的生產者-消費者模型的消息處理/任務調度統,消費者(worker)和生產者(client)都可以有任意個,他們通過消息系統(broker)來通信。
典型的場景為:
- 客戶端啟動一個進程(生產者),當用戶的某些操作耗時較長或者比較頻繁時,考慮接入本消息系統,發送一個task任務給broker。
- 后台啟動一個worker進程(消費者),當發現broker中保存有某個任務到了該執行的時間,他就會拿過來,根據task類型和參數執行。
實踐中的典型場景:
- 簡單的定時任務:
- 替換crontab的celery寫法:
from celery import Celery
from celery.schedules import crontab
app = Celery("tasks", backend="redis://localhost", broker="redis://localhost")
app.conf.update(CELERYBEAT_SCHEDULE = {
"add": {
"task": "celery_demo.add",
"schedule": crontab(minute="*"),
"args": (16, 16)
},
})
@app.task
def add(x, y):
return x + y
- 運行celery的worker,讓他作為consumer運行,自動從broker上獲得任務並執行。
- `celery -A celery_demo worker`
- 運行celery的client,讓其根據schedule,自動生產出task msg,並發布到broker上。
- `celery -A celery_demo beat`
- 安裝並運行flower,方便監控task的運行狀態
- `celery flower -A celery_demo`
- 或者設置登錄密碼 `
celery flower -A celery_demo --basic_auth=user1:password1,user2:password2
- 替換crontab的celery寫法:
- 多同步任務-鏈式任務-
- 失敗自動重試的task
- 失敗重試方法: 將task代碼函數參數增加self,同時綁定bind。
- demo代碼:
-
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A(self):
try:
print("doing stuff here...")
except SomeNetworkException as e:
print("maybe do some clenup here....")
self.retry(e)
-
- 自動重試后,是否將任務重新入queue后排隊,還是等待指定的時間?可以通過self.retry()參數來指定。
- 派發到不同Queue隊列的task
- 一個task自動映射到多個queue中的方法, 通過配置task和queue的routing_key命名模式。
- 比如:把queue的exchange和routing_key配置成通用模式:
- 再定義task的routing_key的名稱:
- 可用的不同exchange策略:
- direct:直接根據定義routing_key
- topic:exchange會根據通配符來將一個消息推送到多個queue。
- fanout:將消息拆分,分別推送到不同queue,通常用於超大任務,耗時任務。
- 參考:http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
- 一個task自動映射到多個queue中的方法, 通過配置task和queue的routing_key命名模式。
- 高級配置
- result是否保存
- 失敗郵件通知:
- 關閉rate limit:
- auto_reload方法(*nix系統):
- celery通過監控源代碼目錄的改動,自動地進行reload
- 使用方法:1.依賴inotify(Linux) 2. kqueue(OS X / BSD)
- 安裝依賴:
$ pip install pyinotify
- (可選) 指定fsNotify的依賴:
$ env CELERYD_FSNOTIFY=stat celery worker -l info --autoreload
- 啟動: celery -A appname worker --autoreload
- auto-scale方法:
- 啟用auto-scale
- 臨時增加worker進程數量(增加consumer):
$ celery -A proj control add_consumer foo -d worker1.local
- 臨時減少worker進程數量(減少consumer):
- 將scheduled task的配置從app.conf變成DB的方法:
- 需要在啟動時指定custom schedule 類名,比如默認的是: celery.beat.PersistentScheduler 。
-
celery -A proj beat -S djcelery.schedulers.DatabaseScheduler
-
- 需要在啟動時指定custom schedule 類名,比如默認的是: celery.beat.PersistentScheduler 。
- 啟動停止worker的方法:
- 啟動 as daemon : http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing
- root用戶可以使用celeryd
- 非特權用戶:celery multi start worker1 -A appName —autoreload --pidfile="$HOME/run/celery/%n.pid" --logfile="$HOME/log/celery/%n.log"
- 或者 celery worker —detach
- 停止
-
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
- 啟動 as daemon : http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing
- 與Flask集成的方法
- 集成后flask將充當producer來創建並發送task給broker,在celery啟動的獨立worker進程將從broker中獲得task並執行,同時將結果返回。
- flask中異步地獲得task結果的方法:add.delay(x,y),有時需要對參數進行命名后傳遞 或者 add.apply_async(args=(x,y), countdown=30)
- flask獲得
- 與flask集成后的啟動問題
- 由於celery的默認routing_key是根據生產者在代碼中的import級別來設定的,所以worker端在啟動時應該注意其啟動目錄應該在項目頂級目錄上,否者會出現KeyError。
- 性能提升: eventlet 和 greenlet