參考資料:
Celery 官網:http://www.celeryproject.org/
Celery 官方文檔英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文檔中文版:http://docs.jinkan.org/docs/celery/
Celery簡介
除Celery是一個異步任務的調度工具。 Celery 是 Distributed Task Queue,分布式任務隊列,分布式決定了可以有多個 worker 的存在,隊列表示其是異步操作,即存在一個產生任務提出需求的工頭,和一群等着被分配工作的碼農。
Broker
在 Python 中定義 Celery 的時候,我們要引入 Broker(消息中間件),中文翻譯過來就是“中間人”的意思,在這里 Broker 起到一個中間人的角色。在工頭提出任務的時候,把所有的任務放到 Broker 里面,在 Broker 的另外一頭,一群碼農等着取出一個個任務准備着手做。
Backend
這種模式注定了整個系統會是個開環系統,工頭對於碼農們把任務做的怎樣是不知情的。所以我們要引入 Backend 來保存每次任務的結果。這個 Backend 有點像我們的 Broker,也是存儲任務的信息用的,只不過這里存的是那些任務的返回結果。我們可以選擇只讓錯誤執行的任務返回結果到 Backend,這樣我們取回結果,便可以知道有多少任務執行失敗了。
Celery應用場景
1.你想對100台機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿着這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。
2.你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個短信祝福
Celery的特點
1.簡單:一單熟悉了celery的工作流程后,配置和使用還是比較簡單的
2.高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
3.快速:一個單進程的celery每分鍾可處理上百萬個任務
3.靈活: 幾乎celery的各個組件都可以被擴展及自定制
Celery工作基本流程
我們的項目
項目目錄:
proj/celery.py
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('proj', broker = 'amqp://', backend = 'amqp://', include = ['proj.tasks']) app.conf.update( result_expires = 3600 ) if __name__ == '__main__': app.start()
在這個模塊中創建了Celery
實例(通常稱為app
)
要在項目中使用Celery
只需要通過import
導入該實例就行了
broker
參數指定要使用的中間件的URLbackend
參數指定使用的result backend
用來跟蹤任務狀態和結果,雖然默認狀態下結果不可用。以上例子中使用
RPC result backend
。當然,不同的result backend
都有自己的好處和壞處,根據自己實際情況進行選擇,如果不需要最好禁用。通過設置@task(ignore_result=True)
選項來禁用耽擱任務)include
參數是當worker
啟動時導入的模塊列表需要在這里添加自己的任務莫誇這樣worker
就可以找到任務
proj/tasks.py
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
啟動worker
Celery
程序可以用來啟動worker
:
celery -A proj worker -l info
-------------- celery@centos6 v4.1.0 (latentcall) ---- **** ----- --- * *** * -- Linux-2.6.32-696.el6.x86_64-x86_64-with-centos-6.9-Final 2018-03-26 12:27:49 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: task:0x7fe5cfbd20d0 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: amqp:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] [2018-03-26 12:27:49,921: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// [2018-03-26 12:27:49,926: INFO/MainProcess] mingle: searching for neighbors [2018-03-26 12:27:49,499: INFO/MainProcess] mingle: sync with 1 nodes [2018-03-26 12:27:50,950: INFO/MainProcess] mingle: sync complete [2018-03-26 12:27:50,957: INFO/MainProcess] celery@centos6 ready.
broker
是在celery
模塊中指定的中間件參數的url,也可以在命令行中通過-b
選項指定不同的中間件Concurrent
是用於並行處理的任務的預創建worker
進程數量,當所有的任務都在忙於工作時,新的任務必須等待之前的執行完成才能處理
默認的並發數是機器上CPU的數量,可以通過
celery worker -c
選項指定自定義數量。沒有推薦值,最佳數量取決於很多因素,但是如果你的任務主要是I/O相關的,就可以增加這個數量。實驗表明,增加超過兩倍CPU數量效果很差,而且可能會降低性能
除了prefork pool
,Celery
還支持Eventlet
、Gevent
並且還能在單線程上運行
Event
是一個可選項,當啟用的時候,Celery
會發送監控(消息)來反映worker
的操作,也可以被用來監視像celery
、events
和Flower
(實時Celery
監控)這樣的程序。Queues
是worker
將使用的任務的隊列的集合,worker
可以一次接受幾個隊列,它用來將消息路由到特定的工作者以作為服務質量、關注點分離、和優化的一種方式
可以通過命令行獲取完整的列表————celery worker --help
停止worker
ctrl-c
后台
生產環境中一般將worker
放到后台,后台腳本使用celery multi
命令后台啟動一個或多個worker
celery multi start w1 -A proj -l info
控制台打印
celery multi v4.1.0 (latentcall) > Starting nodes... > w1@centos6: OK Stale pidfile exists - Removing it.
也可以重啟:
celery multi restart w1 -A proj -l info
celery multi v4.1.0 (latentcall) > Stopping nodes... > w1@centos6: TERM -> 23620 > Waiting for 1 node -> 23620..... > w1@centos6: OK > Restarting node w1@centos6: OK > Waiting for 1 node -> None...
停止:
celery multi stop w1 -A proj -l info
stop
命令是異步的所以它不會等待worker
關閉,可以使用stopwait
命令來確保當前執行都任務在退出前都已執行完畢
celery multi stopwait w1 -A proj -l info
celery multi
不會存儲關於worker
的信息,所以重啟的時候需要使用同樣的命令行參數。在停止時,必須使用相同的pidfile
和logfile
參數
默認情況下,程序將在當期目錄創建pid
和log
文件,為了防止多個worker
運行出錯,推薦將這些文件放在專門的目錄:
mkdir -p /var/run/celery mkdir -p /var/log/celery celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log
使用multi
指令可以啟動多個worker
,並且有一個強大的命令行語法來為不同的worker
指定參數:
celery multi start 10 -A proj -l info -Q:1-3 images, video -Q:4, 5 data -Q default -L:4,5 debug
--app
參數
--app
參數指定使用的Celery
應用實例,必須以module.path:attribute
的形式出現
但也支持快捷方式,只要包名指定了,就會嘗試在應用實例中搜索
使用--app=proj
:
- 名為
proj.app
的屬性 - 名為
proj.app
的屬性 - 模塊
proj
中的任何屬性都是一個Celery
應用程序,如果都沒有發現,它就會嘗試一個名為proj.celery
的子模塊 - 名為
proj.celery.app
的屬性 - 名為
proj.celery.celery
的屬性 - 模塊
proj.celery
中的任何屬性都是一個Celery
應用程序
任務調用
- 可以通過使用
delay()
方法來調用一個任務
add.delay(3, 3)
這個方法實際上是另一種叫做apply_async()
方法的快捷方式
add.applay_async((3, 3))
后者(applay_async()
)能夠指定執行選項,比如運行時間(倒計時)、應該發送的隊列等等:
add.apply_async((2, 2), queue='lopri', countdown=10)
上述案例中,任務會被發送給一個名為lopri
的隊列,該任務會在信息發送后十秒執行
直接應用該任務會在當前進程中執行任務,不會發送消息
add(3, 3)
result:6
三種方法delay()
、apply_async()
和應用__call__
,代表了Celery
調用API,也同樣用於簽名
-
每一個任務調用都有一個唯一的標識符(UUID),這個就是任務的id
-
delay()
和apply_async
方法會返回一個AsyncResult
實例,可以被用來跟蹤任務執行狀態,但是需要開啟result backend
這樣狀態才能被存儲在某處 -
Results
默認是禁用的,因為實際上沒有一個result backend
適用於每個應用程序,所以要考慮到每個獨立backend
的缺點來選擇一個使用。對於許多保持返回值的任務來說都不是很有用,所以這個默認的禁用是很明智的。還需要注意的是,result backend
並不用來監控任務和worker
,對於Celery
有專門的事件消息
如果配置了result backend
就可以接收到任務的返回值
result = add.delay(2, 2) res.get(timeout=1)
retult:4
- 可以通過查看
id
屬性找到任務的id
res.id
result:073c568d-ca88-4198-b735-0f98f861218b
-
如果任務拋出異常也可以檢查到異常,默認
result.get()
可以傳播任何錯誤 -
如果不希望錯誤傳播,可以通過
propagete
屬性禁用
res.get(propagate=False)
在這種情況下,它會返回所提出的異常實例,以便檢查任務是否成功或失敗,您將不得不在結果實例上使用相應的方法
res.failed() res.successful()
也可以通過state
找到任務的狀態
res.state
result:FAILUTE
- 一個任務只能有一個狀態,但是可以在幾個狀態中發展,典型任務階段可能是這樣
PENDING -> STARTED -> SUCCESS
STARTED
狀態是一個特殊的狀態,只有在task_track_started
設置啟用或者@task(track_started=True)
選項設置的時候才會被記錄下來
PENDING
狀態實際上不是記錄狀態,而是未知任務id的默認狀態
from proj.celery import app res = app.AsyncResult('this-id-does-not-exist') res.state
result:PENDING
- 如果重新嘗試這個任務可能會變得更復雜,對於一個嘗試過兩遍的任務來說階段可能是這樣:
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
Canvas:設計任務流
前面學習了通過delay
方法調用任務,通常這樣就夠了,但是有時可能需要將任務調用的簽名傳遞給另一個進程或者另一個函數的參數,對Celery
來說叫做signatures
簽名以某種方式包裝了單一任務調用的參數和執行選項,以便將其傳遞給函數,甚至序列化后發送。
可以使用參數(2, 2)
和十秒的計時器來為add
任務創建一個簽名
add.signature((2, 2), countdown=10)
也可以簡寫:
add.s(2, 2)
調用API
簽名的實例也支持調用API,意味着也可以有delay
和apply_async
方法
但是有一個區別,那就是簽名可能已經指定了一個參數簽名,add
任務接受兩個參數,所以一個制定了兩個參數的簽名將會形成一個完整的簽名
s1 = add.s(2, 2) res = s1.delay() res.get()
也可以使用不完成的簽名,叫做partials
:
s1 = add.s(2)
s2
現在是部分簽名,需要另一個參數才完整,則可以在調用signature
的時候處理
# resolves the partial: add(8, 2) res = s2.delay(8) res.get()
在這里,添加了參數8
,對已存在的參數2
組成了一個完整的簽名add(8, 2)
關鍵字參數也可以延遲添加,會和已存在的關鍵字參數合並,新參數優先(新參數覆蓋舊參數)
s3 = add.s(2, 2, debug=True) s3.delay(debug=False)
已聲明的簽名支持調用API:
- sig.apply_async(arg=(), kwargs={}, **options
使用可選部分參數和部分關鍵字參數調用簽名,也支持部分可執行選項 - sig.delay(*args, **kwargs)
apply_async
的星參版本,任何參數都會被預先記錄在簽名的參數你,關鍵字參數會和現有的keys
合並
基本體
- group
- chain
- chord
- map
- starmap
- chunks
這些基本體本身就是簽名對象,因此,它們可以以任何多種方式組合起來組成復雜的工作流
Group
一個group
同時調用任務列表,返回一個特殊結果實例,這樣可以以組的形式檢查結果,並按順序檢索返回值
from celery import group from proj.tasks import add group(add.s(i, i) for in in range(10))().get()
result:[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
- Partial group
g = group(add.s(i, i) for i in range(10)) g(10).get()
result:[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains
任務可以被相互連接起來,這樣在一個任務返回后另一個任務被調用
from celery import chain form proj.tasks import add, mul // 用法1 chian(add.s(4, 4) | mul.s(8))().get() // 用法2 g = chain(add.s(4) | mul.s(8)) g(4).get() // 用法3 (add.s(4, 4) | mul.s(8))().get()
Chords
chord
是一個有返回值的group
from celery import chord from proj.tasks import add, xsum // 用法1 (group(add.s(i, i) for i in range(10)) | xsum.s())().get() // 用法2 upload_document.s(file) | group(apply_filter.s() for filter in filters)
路由
Celery
支持AMQP提供的所有路由設施,但是它也支持簡單路由,將消息發送到指定的隊列
task_routes
設置可以是用戶按名稱對任務進行路由,並將一切集中在一個位置
app.conf.update{
task_routes = {
'proj.tasks.add': {'queue': 'hipri'}, } }
可以在運行時通過queue
參數指定隊列到apply_async
:
from proj.tasks import add add.apply_async((2,2), queue='hipri')
然后可以通過指定celery worker -Q
選項使worker
從隊列中消費
celery -A proj worker -Q hipri
也可以通過使用逗號分隔符(,
)來指定多個隊列
celery -A proj worker -Q hipri, celery
默認隊列因為歷史原因命名為:
celery
隊列的順序無關緊要,因為worker
會給隊列相同的權重
遠程控制
如果使用RabbitMQ(AMQP)
、Redis
或者Qpid
作為中間件就可以在運行時監視worker
- 查看
worker
當前執行的任務
celery -A proj inspect active
這是通過使用廣播消息實現的,因此,急群眾的每一個工作人員都能接收到所有遠程控制命令
- 也可以指令一個或多個
worker
使用--destination
選項請求行動,這是一個逗號分隔的worker
主機名列表
celery -A proj inspect active --destination=celery@example.com
如果沒有提供目標,那么每個worker
都會對請求做出反應並回復
celery inspect
命令包含的命令不會改變worker
的任何東西,它只會回復關於worker
內部發生的事情的信息和統計信息,可以執行命令檢查列表:
celery -A proj inspect --help
celery control
命令,包含在運行時實際改變worker
操作的命令
celery -A proj control --help
- 強制
worker
啟用事件消息(用於監視任務和工作人員)
celery -A proj control enable_events
當事件激活,可以啟動event dumper
查看worker
正在做什么
celery -A proj events --dump
或者
celery -A proj events
當完成監控可以再次禁用events
celery -A proj control disable_events
celery status
命令還能使用遠程控制命令,並顯示集群中的在線worker
列表
celery -A proj status
時區
所有的時間和日期、內部和消息多使用UTC時間區域
當worker
收到消息,例如使用倒計時設置,它將UTC時間轉換為本地時間。如果希望使用與系統時區不同的地區,那么必須要使用時區設置來配置該時區:
app.conf.timezone = 'Asia/Shanghai'
最優化
默認的配置並沒有針對吞吐量進行優化,它試圖在許多短任務和更少的長任務之間走中間路線,這是吞吐量和公平調度之間的折中