啟動工作單元
你可以通過執行以下命令在前台啟動工作單元:
$ celery -A proj worker -l info
查看啟動工作單元的可用命令行選項,可以執行:
$ celery worker --help
你可以在同一台機器上啟動多個工作單元,只要確保給每個獨立的工作單元使用 --hostname
參數聲明一個節點名稱。
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h
hostname
參數可以使用以下變量擴展:
- %h: 主機名,包含域名
- %n: 主機名
- %d: 域名
如果單前主機名是 george.example.com
,那么會擴展成:
Variable | Template | Result |
---|---|---|
%h | worker1@%h | worker1@george.example.com |
%n | worker1@%n | worker1@george |
%d | worker1@%d | worker1@example.com |
注意對於supervisor
用戶:
% 符號需要被轉義: %%h
停止工作單元
停止可以通過使用 TERM
信號來實現。
當停止過程啟動后,工作單元會在實際停止前完成當前所有的任務。如果任務很重要,你應該在做一些極端操作,例如發送KILL
信號,之前等待任務的完成。
如果工作單元在預期的時間里沒有停止,進入了無限的循環或者類似的情景,那么你可以發送 KILL
信號強制關閉工作單元:但是你應該知道這樣做會使當前正在執行的任務丟失(即,除非你的任務設置了 acks_late
選項)。
另外,進程不會覆蓋 KILL
信號,所以工作單元進程不會干掉他的子進程;手動確保都所有進程都終止了。以下命令通常能達到這個效果:
$ pkill -9 -f 'celery worker'
如果在你系統里沒有 pkill
命令,你可以使用另外一個長些的命令版本:
$ ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
重啟工作單元
為了重啟工作單元,你應該發送 TERM
信號,並且啟動一個新的實例。在開發環境管理工作單元最簡單的方式是使用 celery multi
:
$ celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid $ celery multi restart 1 --pidfile=/var/run/celery/%n.pid
在生產環境中,你應該使用 init-script
或者一個進程管理系統(查看 Daemonization
這一節)。
除了停止工作單元,然后啟動一個新的工作單元的方式重啟外,你還可以使用 HUP
信號重啟工作單元。注意工作單元將負責重啟自己,所以容易產生問題,在生產環境還是不建議使用這種方式。
$ kill -HUP $pid
注意:
只有在工作單元在后台作為守護進程運行時發送 HUP
信號重啟才有用(它沒有控制終端)。由於 macOS 平台的限制,HUP
信號在 macOS 中被禁用。
處理信號
工作單元主進程重寫了如下信號處理方式:
TERM | Warm shutdown, wait for tasks to complete. |
---|---|
QUIT | Cold shutdown, terminate ASAP |
USR1 | Dump traceback for all active threads. |
USR2 | Remote debug, see celery.contrib.rdb. |
文件路徑中的變量
文件路徑參數 --logfile, --pidfile, 以及 --statedb
可以包含工作單元可以擴展的變量:
節點名稱替換
- %p: 全節點名稱
- %h: 主機名,包含域名
- %n: 主機名
- %d: 域名
- %i: Prefork
進程池的進程索引,如果是主進程索引為0
- %I: Prefork
帶連接符的進程池索引
例如,如果當前的主機名是 george@goo.example.com
,那么將會擴展成:
- --logfile-%p.log -> george@foo.example.com.log
- --logfile=%h.log -> foo.example.com.log
- --logfile=%n.log -> george.log
- --logfile=%d -> example.com.log
Prefork 進程池索引
Prefork
進程池聲明符將根據最終需要打開的文件的進程擴展成不同的文件名稱。
這可以用來給每個子進程聲明一個日志文件。
注意數字將保持進程限制,即使進程已經退出或者使用了autoscale/maxtasksperchild/time
限制。也就是說,數字是進程索引而不是進程計數或者進程ID。
-
%i - 進程池中進程索引,如果是主進程則為0
-n worker1@example.com -c2 -f %n-%i.log
命令將生成三個日志文件:- worker1-0.log (main process)
- worker1-1.log (pool process 1)
- worker1-2.log (pool process 2)
-
%I - 帶連接符號的進程池索引
-n worker1@example.com -c2 -f %n%I.log
命令將生成三個日志文件:- worker1.log (main process)
- worker1-1.log (pool process 1)
- worker1-2.log (pool process 2)
並行
默認情況下,多進程被用來處理並發執行的任務,但是你可以使用 Eventlet
。工作單元的進程/線程數可以使用 --concurrency
參數修改,默認是機器上可用的 CPU 核數。
進程的數量(multiprocessing/prefork pool):
進程數越多通常效果會更好,但是也存在一個分隔點,超過這個數目添加更多的進程反而會影響性能。有些證據證明同時啟動多個工作單元實例,可能比只啟動一個工作單元實例性能更佳。例如:3個工作單元,每個工作單元10個進程。你需要通過實驗找到對你最佳的進程數,因為影響的因素很多:應用、工作負載、任務執行時間以及其他因素。
遠程控制
2.0 版本新特性。
Celery 命令
celery
程序用來從命令行執行遠程控制命令。它支持所有下面列出的命令。查看Manageemnt Command-line Utilities(inspect/control)
獲取更多的信息。
池: prefork, eventlet, gevent
支持: blocking: solo(查看注意)
消息中間件: amqp, redis
支持:
工作單元可以使用一個高優先級的廣播消息隊列來執行遠程控制。控制命令將會發送給所有工作單元,或者聲明的一個工作單元的列表。
命令也有回復。客戶端可以等待並收集這些回復。因為沒有一個中央集權機構知道在集群中有多少個工作單元,也沒法估計有多少個工作單元會發送回復,所以客戶端可以配置一個超時時間 - 回復到達的最后時間期限。這個超時時間默認是1秒鍾。如果工作單元沒有在超時時間之前回復,並不意味着工作單元沒有回復,或者更糟糕的是終止了,而是可能由於網絡延遲或者工作單元處理命令比較慢,所以對於超時事件要做相應的處理。
除了超時時間,客戶端還可以聲明一個等待的最大回復數。如果目標節點已經聲明,則這個限制就設置為目標節點的數量。
注意:
sole
池支持遠程控制命令,但是任何任務執行都會阻塞遠程控制命令,所以如果工作單元非常忙它在使用上會受到限制。在這種情況下,你必須增加等待命令回復的超時時間。
broadcast()
函數
這是一個客戶端發送命令給工作單元的函數。一些遠程控制命令還有高級別的接口,他們在后台使用 broadcast()
函數,就像 rate_limit()
和 ping()
。
發送 rate_limit
命令還相應的參數:
>>> app.control.broadcast('rate_limit', ... arguments={'task_name': 'myapp.mytask', ... 'rate_limit': '200/m'})
這將異步發送命令,而不會等待回復。如果需要回復,你可以使用 reply
參數:
>>> app.control.broadcast('rate_limit', { ... 'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True) [{'worker1.example.com': 'New rate limit set successfully'}, {'worker2.example.com': 'New rate limit set successfully'}, {'worker3.example.com': 'New rate limit set successfully'}]
通過使用destination
參數,你可以聲明接收命令的工作單元的列表:
>>> app.control.broadcast('rate_limit', { ... 'task_name': 'myapp.mytask', ... 'rate_limit': '200/m'}, reply=True, ... destination=['worker1@example.com']) [{'worker1.example.com': 'New rate limit set successfully'}]
當然,使用高級別的接口來設置速率限制要更加便利,但是有些命令只能使用 broadcast()
發送請求。
命令
revoke: Revoking tasks
pool support: all, terminate only supported by prefork
broker support: amqp, redis
command: celery -A proj control revoke
所有的工作單元都保存了被取消的任務iD,在內存中或者持久化在硬盤里(查看 Persistent revoke 這一節)
當一個工作單元接收到一個任務取消請求,他將會放棄執行這個任務,但是它不會終止正在執行的任務,除非設置了 terminate
選項。
注意:
terminate
選項是當任務動不了時對管理員最后的求助。它不是用於終止任務,而是用來終止執行任務的進程,並且當信號發送時這個進程可能已經開始處理另外一個任務,所以你永遠不應該在程序中使用它。
如果設置了 terminate
選項,執行任務的工作單元子進程將被終止。默認發送的信號是 TERM
,但是你可以通過 signal
參數顯示聲明要發送的信號。信號可以是任意在python標准庫 signal
模塊中定義的信號,信號名稱字符為大寫。
終止一個任務也會取消它。
示例:
>>> result.revoke() >>> AsyncResult(id).revoke() >>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed') >>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', ... terminate=True) >>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', ... terminate=True, signal='SIGKILL')
取消多個任務
3.1版本新特性。
任務取消方法還接收一個列表參數,使得可以同時取消多個任務。
示例:
>>> app.control.revoke([ ... '7993b0aa-1f0b-4780-9af0-c47c0858b3f2', ... 'f565793e-b041-4b2b-9ca4-dca22762a55d', ... 'd9d35e03-2997-42d0-a13e-64a66b88a618', ])
從3.1版本開始,GroupResult.revoke
方法開始采用這一特性。
任務取消持久化
取消任務將發送一個廣播消息給所有的工作單元,工作單元將在內存中記錄一個被取消的任務的列表。當一個工作單元啟動,它將與集群中其他工作單元同步被取消的任務。
被取消的任務保存在內存中,所以如果所有的工作單元都重啟,那么被取消的任務的列表也將會消失。如果你想在重啟后保留這個列表,你需要通過 --statedb
參數給工作單元聲明一個文件來保存這些。
$ celery -A proj worker -l info --statedb=/var/run/celery/worker.state
或者你可以使用 celery multi
命令為每一個工作單元實例創建一個文件,使用 %n
格式來擴展當前節點名稱。
celery multi start 2 -l info --statedb=/var/run/celery/%n.state
查看路徑中的變量這一節。
注意遠程控制命令必須能執行任務取消。遠程命令當前只被 RabbitMQ(amqp) 和 Redis 支持。
時間限制
2.0 版本新特性。
軟限制,或者硬限制?
時間限制通過兩個值設置,軟限制和硬限制。軟限制允許任務在被殺死之前捕獲一個異常來清理環境,硬限制超時時間是不可捕獲的,它將強制終止任務。
pool support: prefork/gevent
一個任務可能永遠運行,如果你又很多任務等待不可能發生的事件,你將阻塞工作單元處理其他的任務。不讓這種情況發生的最佳方法就是設置時間限制。
時間限制(--time-limit
)是一個任務終止前可以運行的最大時間秒數。你可以設置一個軟時間限制(–soft-time-limit),它將在硬時間限制到達強制殺死它之前拋出一個異常給任務,使得任務可以捕獲到並清理任務環境:
from myapp import app from celery.exceptions import SoftTimeLimitExceeded @app.task def mytask(): try: do_work() except SoftTimeLimitExceeded: clean_up_in_a_hurry()
時間限制還可以通過 task_time_limit/task_soft_time_limit
配置進行設置。
注意:
時間限制目前在不支持 SIGUSR1
信號的平台上不可用。
在運行時修改時間限制
2.3版本新特性。
broker support: amqp, redis
有一個遠程控制命令可以修改一個任務的軟限制和硬限制 - time_limit
。
下面示例修改任務 tasks.crawl_the_web
的軟限制為 1 分鍾,硬限制為 2 分鍾:
>>> app.control.time_limit('tasks.crawl_the_web', soft=60, hard=120, reply=True) [{'worker1.example.com': {'ok': 'time limits set successfully'}}]
時間限制修改之后開始執行的任務才會被影響到。
速率限制
以下示例修改 myapp.mytask
任務的速率限制為每分鍾最多執行 200 個該類型的任務。
>>> app.control.rate_limit('myapp.mytask', '200/m')
以上示例沒有聲明目標節點名稱,所以這個修改請求將會影響集群中所有的工作單元實例。如果你只想影響指定的工作單元,你可以包含 destination
參數:
>>> app.control.rate_limit('myapp.mytask', '200/m', ... destination=['celery@worker1.example.com'])
告警:
這個命令不會影響到使能了 worker_disable_rate_limits
的工作單元。
每個孩子的最大任務數
2.0版本新特性。
pool support: prefork
使用這個選項你可以配置工作單元子進程在被一個新進程取代之前可以執行的最打任務數量。
如果你任務中有無法控制的內存泄露,例如使用了已經不再維護的C擴展,這將是很有用的一個特性。
這個選項可以通過工作單元的 --max-tasks-per-child
參數或者 worker_max_tasks_per_child
配置進行設置。
每個孩子的最大內存
4.0版本新特性。
pool support: prefork
使用這個選項,你可以設置工作單元子進程被替換之前可以使用的最大內存。
如果你任務中有無法控制的內存泄露,例如使用了已經不再維護的C擴展,這將是很有用的一個特性。
這個選項可以通過工作單元的 --max-memory-per-child
參數或者 worker_max_memory_per_child
配置進行設置。
自動擴展
2.2版本新特性。
pool support: prefork, gevent
自動擴展組件用來基於負載動態調整池的大小:
- 當負載高時增加池中的進程數
- 當負載低時去除多余的進程
它可以通過 --autoscale
選項啟用,需要兩個數值:池的進程的最大數量和最小數量:
--autoscale=AUTOSCALE Enable autoscaling by providing max_concurrency,min_concurrency. Example: --autoscale=10,3 (always keep 3 processes, but grow to 10 if necessary).
你可以通過繼承 Autoscaler
類來定義自己的擴展策略。一些依據的指標包括任務負載或者可用內存等。你可以通過 worker_autoscaler
設置聲明一個自定義的自動擴展器。
隊列
一個工作單元可以從任意數量的隊列中消費任務消息。默認情況下,它會從所有定義在 task_queues
配置種的隊列中消費消息(如果沒有設置,它將從默認隊列 celery
中消費消息)。
你可以在工作單元啟動時聲明從哪些隊列中消費消息,通過 -Q
選項可以聲明一個隊列的列表:
$ celery -A proj worker -l info -Q foo,bar,baz
如果隊列的名稱已經在 task_queues
中聲明,它將使用這個配置,但是如果沒有在隊列列表中聲明,那么Celery 將自動為你產生一個新的隊列(依賴於 task_create_missing_queues
選項)。
你還可以通過遠程控制命令 add_consumer
以及 cancel_consumer
讓工作單元在運行時開始或者停止從一個隊列中消費消息。
Queues: Adding consumers
add_consumer
遠程控制命令通知一個或多個工作單元從一個隊列中消費消息。這個操作是冪等的。
讓集群中的所有工作單元開始從隊列foo
中消費消息,你可以如下操作:
$ celery -A proj control add_consumer foo -> worker1.local: OK started consuming from u'foo'
如果你想聲明一個指定的工作單元節點,可以使用 --destination
參數:
$ celery -A proj control add_consumer foo -d celery@worker1.local
同樣的效果可以通過 app.control.add_consumer()
方法動態實現:
>>> app.control.add_consumer('foo', reply=True) [{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}] >>> app.control.add_consumer('foo', reply=True, ... destination=['worker1@example.com']) [{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]
但現在為止,我們只列舉了使用自動隊列的示例,如果你想更多的控制,你可以聲明 exchange
、routing_key
甚至更多的選項:
>>> app.control.add_consumer( ... queue='baz', ... exchange='ex', ... exchange_type='topic', ... routing_key='media.*', ... options={ ... 'queue_durable': False, ... 'exchange_durable': False, ... }, ... reply=True, ... destination=['w1@example.com', 'w2@example.com'])
Queues: Canceling consumers
你可以通過 cancel_consumer
命令終止從一個隊列中消費消息。
強制集群中所有的工作單元停止從一個隊列中消費消息,你可以使用 celery control
程序:
$ celery -A proj control cancel_consumer foo
如果你想聲明一個指定的工作單元節點,可以使用 --destination
參數:
$ celery -A proj control cancel_consumer foo -d celery@worker1.local
同樣的效果可以通過 app.control.cancel_consumer()
方法動態實現:
>>> app.control.cancel_consumer('foo', reply=True) [{u'worker1.local': {u'ok': u"no longer consuming from u'foo'"}}]
Queues: List of active queues
你可以使用 active_queues
控制命令獲取工作單元消費的隊列的列表:
$ celery -A proj inspect active_queues [...]
就像所有其他遠程控制命令一樣,它也支持 --destination
參數,用來聲明應該回復請求的工作單元節點。
$ celery -A proj inspect active_queues -d celery@worker1.local [...]
這也可以通過 app.control.inspect.active_queues()
方法動態實現:
>>> app.control.inspect().active_queues() [...] >>> app.control.inspect(['worker1.local']).active_queues() [...]
探查工作單元
app.control.inspect
可以用來探查正在運行的工作單元。在內部它使用遠程控制命令來實現。
你也可以使用celery
命令來探查工作單元,並且它支持與 app.control
接口相同的命令。
>>> # Inspect all nodes. >>> i = app.control.inspect() >>> # Specify multiple nodes to inspect. >>> i = app.control.inspect(['worker1.example.com', 'worker2.example.com']) >>> # Specify a single node to inspect. >>> i = app.control.inspect('worker1.example.com')
Dump of registered tasks
你可以使用 registered()
方法獲取在工作單元中注冊的任務:
>>> i.registered() [{'worker1.example.com': ['tasks.add', 'tasks.sleeptask']}]
Dump of currently executing tasks
你可以通過 active()
方法獲取激活任務的列表:
>>> i.active() [{'worker1.example.com': [{'name': 'tasks.sleeptask', 'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf', 'args': '(8,)', 'kwargs': '{}'}]}]
Dump of scheduled(ETA) tasks
你可以通過 scheduled()
方法獲取等待被調度的任務列表:
>>> i.scheduled() [{'worker1.example.com': [{'eta': '2010-06-07 09:07:52', 'priority': 0, 'request': { 'name': 'tasks.sleeptask', 'id': '1a7980ea-8b19-413e-91d2-0b74f3844c4d', 'args': '[1]', 'kwargs': '{}'}}, {'eta': '2010-06-07 09:07:53', 'priority': 0, 'request': { 'name': 'tasks.sleeptask', 'id': '49661b9a-aa22-4120-94b7-9ee8031d219d', 'args': '[2]', 'kwargs': '{}'}}]}]
注意:
這些是帶有 ETA/countdown
參數的任務,不是周期任務。
Dump of reserved tasks
保留任務是已經被工作單元接收,但是還在等待被執行的任務。
你可以通過 reserved()
方法獲取保留任務的列表:
>>> i.reserved() [{'worker1.example.com': [{'name': 'tasks.sleeptask', 'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf', 'args': '(8,)', 'kwargs': '{}'}]}]
Statistics
遠程控制命令 inspect stats(或者 stats())
將提供給你一個關於工作單元的有用的統計信息列表(或者可能對你無用):
$ celery -A proj inspect stats
輸出將包含下列字段:
- broker
消息中間件相關的信息。
- connect_timeout
以秒為單位的建立一個新連接的超時時間
- heartbeat 當前心跳值(由客戶端設置) - hostname 遠程消息中間件的節點名稱 - insist 不再使用 - login_method 連接消息中間件的登錄方法 - port 遠程消息中間件的端口 - ssl 啟用/禁用 SSL - transport 使用的傳輸層 - transport_options 傳輸層選項 - uri_prefix 一些傳輸層需要hostname是URL的形式。 ``` redis+socket:///tmp/redis.sock ``` 這個例子中 URI-prefix 是 redis。 - userid 連接消息中間的用戶 ID - virtual_host 使用的虛擬主機
-
clock
工作單元的邏輯時鍾值。這是一個正整數,每次你收到統計信息它的值會增加。 -
pid
工作單元實例的進程ID -
pool
池相關的配置。-
max-concurrency
最大的進程/線程/green線程數量 -
max-tasks-per-child
一個工作單元子線程/進程被回收前可以執行的最大任務數量 -
processes
進程/線程id的列表 -
put-guarded-by-semaphore
內部使用 -
timeouts
時間限制的默認值 -
writes
prefork
池的特殊配置,它顯示當使用異步 I/O 時池中每個進程寫操作的分布。
-
-
prefetch_count
任務消費者的當前prefetch
計數。 -
rusage
系統使用統計信息。你系統平台的相關字段可能不同。
From getrusage(2):
- stime
進程的內核態時間
- utime 進程的用戶態時間 - maxrss 進程使用的最大內存值(kilobytes計數) - idrss 數據使用的非共享內存總數(執行的kilobytes次ticks計數) - isrss 棧空間的非共享內存總數(執行的kilobytes次ticks計數) - ixrss 與其他進程共享的內存總數(執行的kilobytes次ticks計數) - inblock 文件系統為進程讀硬盤的次數 - oublock 文件系統為進程寫硬盤的次數 - majflt 進行 I/O 操作時出現的頁錯誤計數 - minflt 沒進程 I/O 操作時出現的頁錯誤計數 - msgrcv 接收到的 IPC 消息 - msgsnd 發送的 IPC 消息 - nvcsw 進程主動進行上下文切換的次數 - nivcsw 非進程主動進行的上下文切換的次數 - nsignals 收到的信號數 - nswap 進程被交換除內存的次數
- total
自從工作單元開始,任務名稱與接收的該類型的任務數量的映射。
附加命令
Remote shutdown
以下命令將遠程優雅地關閉工作單元:
>>> app.control.broadcast('shutdown') # shutdown all workers >>> app.control.broadcast('shutdown', destination='worker1@example.com')
Ping
這個命令像或者的工作單元發送一個 Ping 請求。工作單元將回復一個 Pong
,而不做其他事情。如果你沒有聲明一個自定義的超時時間,它就使用默認的1秒超時時間:
>>> app.control.ping(timeout=0.5) [{'worker1.example.com': 'pong'}, {'worker2.example.com': 'pong'}, {'worker3.example.com': 'pong'}]
ping()
方法還支持 destination
參數,所以你可以聲明想要 ping
的工作單元:
>>> ping(['worker2.example.com', 'worker3.example.com']) [{'worker2.example.com': 'pong'}, {'worker3.example.com': 'pong'}]
Enable/disable events
你可以使用 enable_events, disable_events
命令啟用/禁用事件。這對於臨時監控一個使用 celery events/celerymon
的工作單元非常有用。
>>> app.control.enable_events() >>> app.control.disable_events()
編寫自己的遠程控制命令
有兩種類型的遠程控制命令:
-
Inspect command
沒有副作用,將只是返回工作單元中找到的值,如已注冊的任務的列表、激活的任務的列表,等等。 -
Control command
有副作用,如給工作單元添加一個消費隊列。
遠程控制命令在控制面板中注冊,並且他們有一個參數:當前的 ControlDispatch
實例。在這里,如果你需要,你可以訪問激活的 Consumer
。
下面是一個控制命令的例子,它增加任務的 prefetch
計數:
from celery.worker.control import control_command @control_command( args=[('n', int)], signature='[N=1]', # <- used for help on the command-line. ) def increase_prefetch_count(state, n=1): state.consumer.qos.increment_eventually(n) return {'ok': 'prefetch count incremented'}
確保你將這段代碼添加到一個模塊中,並且該模塊被工作單元導入:這可以在你定義 app 實例的模塊定義,或者你也可以使用 imports 設置從其他模塊導入。
重啟工作單元使控制命令注冊其中,現在你可以使用 celery control
工具執行你的命令:
$ celery -A proj control increase_prefetch_count 3
你還可以給 celery inspect
程序添加操作,例如讀取當前的 prefetch
計數:
from celery.worker.control import inspect_command @inspect_command def current_prefetch_count(state): return {'prefetch_count': state.consumer.qos.value}
重啟工作單元之后你可以通過 celery inspect
程序詢問這個值:
$ celery -A proj inspect current_prefetch_count
轉自:https://blog.csdn.net/libing_thinking/article/details/78579160