Celery-4.1 用戶指南: Workers Guide (Workers 指南)


啟動工作單元


你可以通過執行以下命令在前台啟動工作單元:

$ celery -A proj worker -l info

查看啟動工作單元的可用命令行選項,可以執行:

$ celery worker --help

你可以在同一台機器上啟動多個工作單元,只要確保給每個獨立的工作單元使用 --hostname 參數聲明一個節點名稱。

$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h

hostname 參數可以使用以下變量擴展:
   - %h: 主機名,包含域名
   - %n: 主機名
   - %d: 域名

如果單前主機名是 george.example.com,那么會擴展成:

Variable Template Result
%h worker1@%h worker1@george.example.com
%n worker1@%n worker1@george
%d worker1@%d worker1@example.com

注意對於supervisor用戶:
   % 符號需要被轉義: %%h

停止工作單元


停止可以通過使用 TERM 信號來實現。

當停止過程啟動后,工作單元會在實際停止前完成當前所有的任務。如果任務很重要,你應該在做一些極端操作,例如發送KILL信號,之前等待任務的完成。

如果工作單元在預期的時間里沒有停止,進入了無限的循環或者類似的情景,那么你可以發送 KILL 信號強制關閉工作單元:但是你應該知道這樣做會使當前正在執行的任務丟失(即,除非你的任務設置了 acks_late 選項)。

另外,進程不會覆蓋 KILL 信號,所以工作單元進程不會干掉他的子進程;手動確保都所有進程都終止了。以下命令通常能達到這個效果:

$ pkill -9 -f 'celery worker'

如果在你系統里沒有 pkill 命令,你可以使用另外一個長些的命令版本:

$ ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9

重啟工作單元


為了重啟工作單元,你應該發送 TERM 信號,並且啟動一個新的實例。在開發環境管理工作單元最簡單的方式是使用 celery multi:

$ celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid
$ celery multi restart 1 --pidfile=/var/run/celery/%n.pid

在生產環境中,你應該使用 init-script 或者一個進程管理系統(查看 Daemonization 這一節)。

除了停止工作單元,然后啟動一個新的工作單元的方式重啟外,你還可以使用 HUP 信號重啟工作單元。注意工作單元將負責重啟自己,所以容易產生問題,在生產環境還是不建議使用這種方式。

$ kill -HUP $pid

注意:
  只有在工作單元在后台作為守護進程運行時發送 HUP 信號重啟才有用(它沒有控制終端)。由於 macOS 平台的限制,HUP 信號在 macOS 中被禁用。

處理信號


工作單元主進程重寫了如下信號處理方式:

TERM Warm shutdown, wait for tasks to complete.
QUIT Cold shutdown, terminate ASAP
USR1 Dump traceback for all active threads.
USR2 Remote debug, see celery.contrib.rdb.

文件路徑中的變量


文件路徑參數 --logfile, --pidfile, 以及 --statedb 可以包含工作單元可以擴展的變量:

節點名稱替換
  - %p: 全節點名稱
   - %h: 主機名,包含域名
  - %n: 主機名
  - %d: 域名
  - %i: Prefork 進程池的進程索引,如果是主進程索引為0
  - %I: Prefork 帶連接符的進程池索引

例如,如果當前的主機名是 george@goo.example.com,那么將會擴展成:
   - --logfile-%p.log -> george@foo.example.com.log
   - --logfile=%h.log -> foo.example.com.log
   - --logfile=%n.log -> george.log
   - --logfile=%d -> example.com.log

Prefork 進程池索引


Prefork 進程池聲明符將根據最終需要打開的文件的進程擴展成不同的文件名稱。

這可以用來給每個子進程聲明一個日志文件。

注意數字將保持進程限制,即使進程已經退出或者使用了autoscale/maxtasksperchild/time 限制。也就是說,數字是進程索引而不是進程計數或者進程ID。

  • %i - 進程池中進程索引,如果是主進程則為0
    -n worker1@example.com -c2 -f %n-%i.log 命令將生成三個日志文件:

    • worker1-0.log (main process)
    • worker1-1.log (pool process 1)
    • worker1-2.log (pool process 2)
  • %I - 帶連接符號的進程池索引
    -n worker1@example.com -c2 -f %n%I.log 命令將生成三個日志文件:

    • worker1.log (main process)
    • worker1-1.log (pool process 1)
    • worker1-2.log (pool process 2)

並行


默認情況下,多進程被用來處理並發執行的任務,但是你可以使用 Eventlet。工作單元的進程/線程數可以使用 --concurrency 參數修改,默認是機器上可用的 CPU 核數。

進程的數量(multiprocessing/prefork pool):
進程數越多通常效果會更好,但是也存在一個分隔點,超過這個數目添加更多的進程反而會影響性能。有些證據證明同時啟動多個工作單元實例,可能比只啟動一個工作單元實例性能更佳。例如:3個工作單元,每個工作單元10個進程。你需要通過實驗找到對你最佳的進程數,因為影響的因素很多:應用、工作負載、任務執行時間以及其他因素。

遠程控制


2.0 版本新特性。

Celery 命令
celery 程序用來從命令行執行遠程控制命令。它支持所有下面列出的命令。查看Manageemnt Command-line Utilities(inspect/control)獲取更多的信息。

  池: prefork, eventlet, gevent
  支持: blocking: solo(查看注意)
  消息中間件: amqp, redis
  支持:

  工作單元可以使用一個高優先級的廣播消息隊列來執行遠程控制。控制命令將會發送給所有工作單元,或者聲明的一個工作單元的列表。

  命令也有回復。客戶端可以等待並收集這些回復。因為沒有一個中央集權機構知道在集群中有多少個工作單元,也沒法估計有多少個工作單元會發送回復,所以客戶端可以配置一個超時時間 - 回復到達的最后時間期限。這個超時時間默認是1秒鍾。如果工作單元沒有在超時時間之前回復,並不意味着工作單元沒有回復,或者更糟糕的是終止了,而是可能由於網絡延遲或者工作單元處理命令比較慢,所以對於超時事件要做相應的處理。

  除了超時時間,客戶端還可以聲明一個等待的最大回復數。如果目標節點已經聲明,則這個限制就設置為目標節點的數量。

注意:
  sole 池支持遠程控制命令,但是任何任務執行都會阻塞遠程控制命令,所以如果工作單元非常忙它在使用上會受到限制。在這種情況下,你必須增加等待命令回復的超時時間。

broadcast() 函數


這是一個客戶端發送命令給工作單元的函數。一些遠程控制命令還有高級別的接口,他們在后台使用 broadcast() 函數,就像 rate_limit()ping()

發送 rate_limit 命令還相應的參數:

>>> app.control.broadcast('rate_limit',
...                          arguments={'task_name': 'myapp.mytask',
...                                     'rate_limit': '200/m'})

這將異步發送命令,而不會等待回復。如果需要回復,你可以使用 reply 參數:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True)
[{'worker1.example.com': 'New rate limit set successfully'},
 {'worker2.example.com': 'New rate limit set successfully'},
 {'worker3.example.com': 'New rate limit set successfully'}]

通過使用destination 參數,你可以聲明接收命令的工作單元的列表:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask',
...     'rate_limit': '200/m'}, reply=True,
...                             destination=['worker1@example.com'])
[{'worker1.example.com': 'New rate limit set successfully'}]

當然,使用高級別的接口來設置速率限制要更加便利,但是有些命令只能使用 broadcast() 發送請求。

命令


revoke: Revoking tasks
  pool support: all, terminate only supported by prefork
  broker support: amqp, redis
  command: celery -A proj control revoke

所有的工作單元都保存了被取消的任務iD,在內存中或者持久化在硬盤里(查看 Persistent revoke 這一節)

當一個工作單元接收到一個任務取消請求,他將會放棄執行這個任務,但是它不會終止正在執行的任務,除非設置了 terminate 選項。

注意:
  terminate 選項是當任務動不了時對管理員最后的求助。它不是用於終止任務,而是用來終止執行任務的進程,並且當信號發送時這個進程可能已經開始處理另外一個任務,所以你永遠不應該在程序中使用它。

  如果設置了 terminate 選項,執行任務的工作單元子進程將被終止。默認發送的信號是 TERM,但是你可以通過 signal 參數顯示聲明要發送的信號。信號可以是任意在python標准庫 signal 模塊中定義的信號,信號名稱字符為大寫。

終止一個任務也會取消它。

示例:

>>> result.revoke()

>>> AsyncResult(id).revoke()

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True)

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True, signal='SIGKILL')

取消多個任務


3.1版本新特性。

任務取消方法還接收一個列表參數,使得可以同時取消多個任務。

示例:

>>> app.control.revoke([
...    '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
...    'f565793e-b041-4b2b-9ca4-dca22762a55d',
...    'd9d35e03-2997-42d0-a13e-64a66b88a618',
])

從3.1版本開始,GroupResult.revoke 方法開始采用這一特性。

任務取消持久化


  取消任務將發送一個廣播消息給所有的工作單元,工作單元將在內存中記錄一個被取消的任務的列表。當一個工作單元啟動,它將與集群中其他工作單元同步被取消的任務。

  被取消的任務保存在內存中,所以如果所有的工作單元都重啟,那么被取消的任務的列表也將會消失。如果你想在重啟后保留這個列表,你需要通過 --statedb 參數給工作單元聲明一個文件來保存這些。

$ celery -A proj worker -l info --statedb=/var/run/celery/worker.state

或者你可以使用 celery multi 命令為每一個工作單元實例創建一個文件,使用 %n 格式來擴展當前節點名稱。

celery multi start 2 -l info --statedb=/var/run/celery/%n.state

查看路徑中的變量這一節。

注意遠程控制命令必須能執行任務取消。遠程命令當前只被 RabbitMQ(amqp) 和 Redis 支持。

時間限制


2.0 版本新特性。

軟限制,或者硬限制?
時間限制通過兩個值設置,軟限制和硬限制。軟限制允許任務在被殺死之前捕獲一個異常來清理環境,硬限制超時時間是不可捕獲的,它將強制終止任務。

  pool support: prefork/gevent

一個任務可能永遠運行,如果你又很多任務等待不可能發生的事件,你將阻塞工作單元處理其他的任務。不讓這種情況發生的最佳方法就是設置時間限制。

時間限制(--time-limit)是一個任務終止前可以運行的最大時間秒數。你可以設置一個軟時間限制(–soft-time-limit),它將在硬時間限制到達強制殺死它之前拋出一個異常給任務,使得任務可以捕獲到並清理任務環境:

from myapp import app
from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        do_work()
    except SoftTimeLimitExceeded:
        clean_up_in_a_hurry()

時間限制還可以通過 task_time_limit/task_soft_time_limit 配置進行設置。

注意:
   時間限制目前在不支持 SIGUSR1 信號的平台上不可用。

在運行時修改時間限制


2.3版本新特性。

broker support: amqp, redis

有一個遠程控制命令可以修改一個任務的軟限制和硬限制 - time_limit

下面示例修改任務 tasks.crawl_the_web 的軟限制為 1 分鍾,硬限制為 2 分鍾:

>>> app.control.time_limit('tasks.crawl_the_web',
                           soft=60, hard=120, reply=True)
[{'worker1.example.com': {'ok': 'time limits set successfully'}}]

時間限制修改之后開始執行的任務才會被影響到。

速率限制


以下示例修改 myapp.mytask 任務的速率限制為每分鍾最多執行 200 個該類型的任務。

>>> app.control.rate_limit('myapp.mytask', '200/m')

以上示例沒有聲明目標節點名稱,所以這個修改請求將會影響集群中所有的工作單元實例。如果你只想影響指定的工作單元,你可以包含 destination 參數:

>>> app.control.rate_limit('myapp.mytask', '200/m',
...            destination=['celery@worker1.example.com'])

告警:
   這個命令不會影響到使能了 worker_disable_rate_limits 的工作單元。

每個孩子的最大任務數


2.0版本新特性。

pool support: prefork

使用這個選項你可以配置工作單元子進程在被一個新進程取代之前可以執行的最打任務數量。

如果你任務中有無法控制的內存泄露,例如使用了已經不再維護的C擴展,這將是很有用的一個特性。

這個選項可以通過工作單元的 --max-tasks-per-child 參數或者 worker_max_tasks_per_child 配置進行設置。

每個孩子的最大內存


4.0版本新特性。

pool support: prefork

使用這個選項,你可以設置工作單元子進程被替換之前可以使用的最大內存。

如果你任務中有無法控制的內存泄露,例如使用了已經不再維護的C擴展,這將是很有用的一個特性。

這個選項可以通過工作單元的 --max-memory-per-child 參數或者 worker_max_memory_per_child 配置進行設置。

自動擴展


2.2版本新特性。

pool support: prefork, gevent

自動擴展組件用來基於負載動態調整池的大小:
  - 當負載高時增加池中的進程數
   - 當負載低時去除多余的進程

它可以通過 --autoscale 選項啟用,需要兩個數值:池的進程的最大數量和最小數量:

--autoscale=AUTOSCALE
     Enable autoscaling by providing
     max_concurrency,min_concurrency.  Example:
       --autoscale=10,3 (always keep 3 processes, but grow to
      10 if necessary).

你可以通過繼承 Autoscaler 類來定義自己的擴展策略。一些依據的指標包括任務負載或者可用內存等。你可以通過 worker_autoscaler 設置聲明一個自定義的自動擴展器。

隊列


一個工作單元可以從任意數量的隊列中消費任務消息。默認情況下,它會從所有定義在 task_queues 配置種的隊列中消費消息(如果沒有設置,它將從默認隊列 celery 中消費消息)。

你可以在工作單元啟動時聲明從哪些隊列中消費消息,通過 -Q 選項可以聲明一個隊列的列表:

$ celery -A proj worker -l info -Q foo,bar,baz

如果隊列的名稱已經在 task_queues 中聲明,它將使用這個配置,但是如果沒有在隊列列表中聲明,那么Celery 將自動為你產生一個新的隊列(依賴於 task_create_missing_queues 選項)。

你還可以通過遠程控制命令 add_consumer 以及 cancel_consumer 讓工作單元在運行時開始或者停止從一個隊列中消費消息。

Queues: Adding consumers


add_consumer 遠程控制命令通知一個或多個工作單元從一個隊列中消費消息。這個操作是冪等的。

讓集群中的所有工作單元開始從隊列foo中消費消息,你可以如下操作:

$ celery -A proj control add_consumer foo
-> worker1.local: OK
    started consuming from u'foo'

如果你想聲明一個指定的工作單元節點,可以使用 --destination 參數:

$ celery -A proj control add_consumer foo -d celery@worker1.local

同樣的效果可以通過 app.control.add_consumer() 方法動態實現:

>>> app.control.add_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

>>> app.control.add_consumer('foo', reply=True,
...                          destination=['worker1@example.com'])
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

但現在為止,我們只列舉了使用自動隊列的示例,如果你想更多的控制,你可以聲明 exchangerouting_key 甚至更多的選項:

>>> app.control.add_consumer(
...     queue='baz',
...     exchange='ex',
...     exchange_type='topic',
...     routing_key='media.*',
...     options={
...         'queue_durable': False,
...         'exchange_durable': False,
...     },
...     reply=True,
...     destination=['w1@example.com', 'w2@example.com'])

Queues: Canceling consumers


你可以通過 cancel_consumer 命令終止從一個隊列中消費消息。

強制集群中所有的工作單元停止從一個隊列中消費消息,你可以使用 celery control 程序:

$ celery -A proj control cancel_consumer foo

如果你想聲明一個指定的工作單元節點,可以使用 --destination 參數:

$ celery -A proj control cancel_consumer foo -d celery@worker1.local

同樣的效果可以通過 app.control.cancel_consumer() 方法動態實現:

>>> app.control.cancel_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"no longer consuming from u'foo'"}}]

Queues: List of active queues


你可以使用 active_queues 控制命令獲取工作單元消費的隊列的列表:

$ celery -A proj inspect active_queues
[...]

就像所有其他遠程控制命令一樣,它也支持 --destination 參數,用來聲明應該回復請求的工作單元節點。

$ celery -A proj inspect active_queues -d celery@worker1.local
[...]

這也可以通過 app.control.inspect.active_queues() 方法動態實現:

>>> app.control.inspect().active_queues()
[...]

>>> app.control.inspect(['worker1.local']).active_queues()
[...]

探查工作單元


app.control.inspect 可以用來探查正在運行的工作單元。在內部它使用遠程控制命令來實現。

你也可以使用celery命令來探查工作單元,並且它支持與 app.control 接口相同的命令。

>>> # Inspect all nodes.
>>> i = app.control.inspect()

>>> # Specify multiple nodes to inspect.
>>> i = app.control.inspect(['worker1.example.com',
                            'worker2.example.com'])

>>> # Specify a single node to inspect.
>>> i = app.control.inspect('worker1.example.com')

Dump of registered tasks


你可以使用 registered() 方法獲取在工作單元中注冊的任務:

>>> i.registered()
[{'worker1.example.com': ['tasks.add',
                          'tasks.sleeptask']}]

Dump of currently executing tasks


你可以通過 active() 方法獲取激活任務的列表:

>>> i.active()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]

Dump of scheduled(ETA) tasks


你可以通過 scheduled() 方法獲取等待被調度的任務列表:

>>> i.scheduled()
[{'worker1.example.com':
    [{'eta': '2010-06-07 09:07:52', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '1a7980ea-8b19-413e-91d2-0b74f3844c4d',
        'args': '[1]',
        'kwargs': '{}'}},
     {'eta': '2010-06-07 09:07:53', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '49661b9a-aa22-4120-94b7-9ee8031d219d',
        'args': '[2]',
        'kwargs': '{}'}}]}]

注意:
  這些是帶有 ETA/countdown 參數的任務,不是周期任務。

Dump of reserved tasks


保留任務是已經被工作單元接收,但是還在等待被執行的任務。

你可以通過 reserved() 方法獲取保留任務的列表:

>>> i.reserved()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]

Statistics


遠程控制命令 inspect stats(或者 stats()) 將提供給你一個關於工作單元的有用的統計信息列表(或者可能對你無用):

$ celery -A proj inspect stats

輸出將包含下列字段:
   - broker
消息中間件相關的信息。
   - connect_timeout
以秒為單位的建立一個新連接的超時時間

- heartbeat
當前心跳值(由客戶端設置)

- hostname
遠程消息中間件的節點名稱

- insist
不再使用

- login_method
連接消息中間件的登錄方法

- port
遠程消息中間件的端口

- ssl
啟用/禁用 SSL

- transport
使用的傳輸層

- transport_options
傳輸層選項

- uri_prefix
一些傳輸層需要hostname是URL的形式。
```
redis+socket:///tmp/redis.sock
```
這個例子中 URI-prefix 是 redis。

- userid
連接消息中間的用戶 ID

- virtual_host
使用的虛擬主機
  • clock
    工作單元的邏輯時鍾值。這是一個正整數,每次你收到統計信息它的值會增加。

  • pid
    工作單元實例的進程ID

  • pool
    池相關的配置。

    • max-concurrency
      最大的進程/線程/green線程數量

    • max-tasks-per-child
      一個工作單元子線程/進程被回收前可以執行的最大任務數量

    • processes
      進程/線程id的列表

    • put-guarded-by-semaphore
      內部使用

    • timeouts
      時間限制的默認值

    • writes
      prefork 池的特殊配置,它顯示當使用異步 I/O 時池中每個進程寫操作的分布。

  • prefetch_count
    任務消費者的當前 prefetch 計數。

  • rusage
    系統使用統計信息。你系統平台的相關字段可能不同。

From getrusage(2):
- stime
  進程的內核態時間

- utime
進程的用戶態時間

- maxrss
進程使用的最大內存值(kilobytes計數)

- idrss
數據使用的非共享內存總數(執行的kilobytes次ticks計數)

- isrss
棧空間的非共享內存總數(執行的kilobytes次ticks計數)

- ixrss
與其他進程共享的內存總數(執行的kilobytes次ticks計數)

- inblock
文件系統為進程讀硬盤的次數

- oublock
文件系統為進程寫硬盤的次數

- majflt
進行 I/O 操作時出現的頁錯誤計數

- minflt
沒進程 I/O 操作時出現的頁錯誤計數

- msgrcv
接收到的 IPC 消息

- msgsnd
發送的 IPC 消息

- nvcsw
進程主動進行上下文切換的次數

- nivcsw
非進程主動進行的上下文切換的次數

- nsignals
收到的信號數

- nswap
進程被交換除內存的次數
  • total
    自從工作單元開始,任務名稱與接收的該類型的任務數量的映射。

附加命令


Remote shutdown


以下命令將遠程優雅地關閉工作單元:

>>> app.control.broadcast('shutdown') # shutdown all workers
>>> app.control.broadcast('shutdown', destination='worker1@example.com')

Ping


這個命令像或者的工作單元發送一個 Ping 請求。工作單元將回復一個 Pong,而不做其他事情。如果你沒有聲明一個自定義的超時時間,它就使用默認的1秒超時時間:

>>> app.control.ping(timeout=0.5)
[{'worker1.example.com': 'pong'},
 {'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

ping() 方法還支持 destination 參數,所以你可以聲明想要 ping 的工作單元:

>>> ping(['worker2.example.com', 'worker3.example.com'])
[{'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

Enable/disable events


你可以使用 enable_events, disable_events 命令啟用/禁用事件。這對於臨時監控一個使用 celery events/celerymon 的工作單元非常有用。

>>> app.control.enable_events()
>>> app.control.disable_events()

編寫自己的遠程控制命令


有兩種類型的遠程控制命令:

  • Inspect command
    沒有副作用,將只是返回工作單元中找到的值,如已注冊的任務的列表、激活的任務的列表,等等。

  • Control command
    有副作用,如給工作單元添加一個消費隊列。

遠程控制命令在控制面板中注冊,並且他們有一個參數:當前的 ControlDispatch 實例。在這里,如果你需要,你可以訪問激活的 Consumer

下面是一個控制命令的例子,它增加任務的 prefetch 計數:

from celery.worker.control import control_command

@control_command(
    args=[('n', int)],
    signature='[N=1]',  # <- used for help on the command-line.
)
def increase_prefetch_count(state, n=1):
    state.consumer.qos.increment_eventually(n)
    return {'ok': 'prefetch count incremented'}

確保你將這段代碼添加到一個模塊中,並且該模塊被工作單元導入:這可以在你定義 app 實例的模塊定義,或者你也可以使用 imports 設置從其他模塊導入。

重啟工作單元使控制命令注冊其中,現在你可以使用 celery control 工具執行你的命令:

$ celery -A proj control increase_prefetch_count 3

你還可以給 celery inspect 程序添加操作,例如讀取當前的 prefetch 計數:

from celery.worker.control import inspect_command

@inspect_command
def current_prefetch_count(state):
    return {'prefetch_count': state.consumer.qos.value}

重啟工作單元之后你可以通過 celery inspect 程序詢問這個值:

$ celery -A proj inspect current_prefetch_count

 

轉自:https://blog.csdn.net/libing_thinking/article/details/78579160

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM