celery是什么?
我的理解比較簡單,它是一個「任務隊列」,我主要拿他來做兩件事情:
1.處理異步任務
2.處理定時任務
一個簡單任務
安裝相應的pip包
pip install celery[redis]
准備項目文件
項目文件結構如下:
.
├── caller.py
├── tasks.py
tasks.py
中存放任務函數:
import time
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
time.sleep(30)
print(f'add end, res:{x+y}')
return x + y
caller.py
中存放調用函數:
from tasks import add
for i in range(0, 20):
add.delay(4, 4)
運行celery
在當前目錄下運行一個worker:
celery -A tasks worker --loglevel=info
如何查看任務狀態
一些原理
此處使用redis做celery的消息隊列(broker),函數觸發一次任務調用時,發送消息給redis,woker從redis中獲取任務進行處理,如果任務過多,worker處理不過來,worker不會領取任務,任務會先存放在redis中。
默認運行的worker啟動的進程數等於cpu核數(引),例如我這里是4。也就是說此處的celery一次只能處理4個任務,如果一次有多個任務發送過來,有的任務就需要排隊。此處將add
函數的處理時間模擬為30s。
celery有一個「prefetch」的動作。例如上面的例子,雖然這個worker一次只能處理4個任務,但是這個worker除了接收它能處理的4個任務立即去執行,還要再接收一些任務准備運行,它打算再接收多少任務預備着呢?這取決於一個配置參數:worker_prefetch_multiplier
,這個參數默認是4(引),那么他會多接收的數量為:worker_prefetch_multiplier * 並發進程數
,放在此處就是:4 * 4 = 16
。也就是說,如果redis中有很多等待處理的任務,其實worker運行起來會一次拿走4 + 16 = 20
個任務。
測試
查看有多少任務在消息隊列中
celery在redis中的存放任務的隊列key默認名稱是celery
(引),這個key只有當redis中有積壓任務時才會存在,如果它不存在就代表當前消息隊列中無消息。
運行caller.py
,然后登陸redis查詢:
127.0.0.1:6379> llen celery
(integer) 0
可以看到,盡管celery一次只能處理4個任務,但它把20個任務全領走了。由於我們的任務要30s才能處理完成,立即再運行一次caller.py
,然后redis查詢:
127.0.0.1:6379> llen celery
(integer) 20
可以看到,有20個任務還在消息隊列中等待處理。
查看有多少任務正在運行
celery -A tasks inspect active
查看有多少任務接收了但還未運行
這種任務在celery中叫做reserved task
。
celery -A tasks inspect reserved
# 統計個數(數量為下面的結果-1)
celery -A tasks inspect reserved | wc -l
查看worker狀態
運行:
$ celery -A tasks status
celery@itscs-MacBook-Pro.local: OK
1 node online.
可以看到,提示有一個worker(node)是在線的(online)。
使用flower在線查看
flower可以實時監控celery的狀態,並且還能修改一些配置(生產環境慎用)。(引)
pip install flower
celery -A tasks flower
瀏覽器打開http://localhost:5555
即可看到一個現實celery狀態的網頁。