Celery-4.1 用戶指南: Signals (信號)


基礎


有多種類型的事件可以觸發信號,你可以連接到這些信號,使得在他們觸發的時候執行操作。

連接到 after_task_publish 信號的示例:

from celery.signals import after_task_publish

@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))

一些信號還帶有發送者,你可以用來過濾信號。例如,after_task_publish信號使用任務名稱作為發送者,所以通過給connect 方法提供 sender 參數,你可以使你的處理函數在每個名為 proj.tasks.add的任務發布的時候被回調:

@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))

信號的實現同 django.core.dispatch。所以默認情況下其他關鍵字參數(例如: signal)都被傳遞給所有信號處理函數。

信號處理的最佳實踐是接收任意關鍵字參數(即:**kwargs)。這樣的話當使用新的Celery 版本時只要添加新的參數而不需要更改你的代碼。

信號


任務信號


  • before_task_publish
    3.1版本新特性。

任務被發布之前分發的。注意它是在發送任務的進程中執行。
發送者是被發送的任務的名稱。

提供的參數:

- body

任務消息體

這是一個包含任務消息字段的映射,查看 Version2 和 Version1 獲取可以定義的可用字段的說明

- exchange

消息交換器的名稱或者對象實例

- routing_key

發送消息時使用的路由鍵

- headers

應用頭映射 (可以修改)

- properties

消息屬性 (可以修改)

- declare

消息發布前聲明的實體(消息交換器、隊列或者綁定,可以修改)

- retry_policy

重試選項映射。可以是 `kombu.Connection.ensure()` 方法的任意參數,可以修改
  • after_task_publish
    任務被分發到消息中間件之后。注意這是在發送任務的進程中執行。

提供的參數:

- headers

任務消息頭,查看 Version2 和 Version1 獲取可以定義的可用字段的說明

- body

任務消息體,查看 Version2 和 Version1 獲取可以定義的可用字段的說明

- exchange

消息交換器名稱或者消息交換器對象實例

- routing_key

使用的路由鍵
  • task_prerun

任務執行前

發送者是將要執行的任務

提供的參數:

- task_id

被執行的任務的ID

- task

被執行的任務

- args

任務位置參數

- kwargs

任務關鍵字參數
  • task_postrun

任務執行后分發

發送者是被執行的任務對象

提供的參數:

- task_id

被執行的任務的ID

- task

被執行的任務

- args

任務位置參數

- kwargs

任務關鍵字參數

- retval

任務的返回值

- state

結果狀態的名稱
  • task_retry

當任務將被重試時分發

發送者是任務對象

提供的參數:

- request

當前任務請求

- reason

重試的理由 (通常是一個異常實例,但總是能表示成字符串)

- einfo

詳細的異常信息,包括堆棧回溯信息(一個`billiard.einfo.ExceptionInfo` 對象)
  • task_success

任務執行成功時分發

發送者是被執行的任務對象

提供的參數:

- result
任務的返回值
  • task_failure

任務失敗時分發

發送者是被執行的任務對象

提供的參數:

- task_id

任務的ID

- 異常

拋出的異常實例

- args

任務調用時傳遞的位置參數

- kwargs

任務調用時傳遞的關鍵字參數

- traceback

堆棧回溯對象

- einfo

`billiard.einfo.ExceptionInfo` 實例對象
  • task_revoked

任務被工作單元取消或者中止時分發

提供的參數:

- request

這是一個 `Request` 實例,而不是 `task.request` 實例。當使用 prefork 池,這個信號由父進程分發,所以 `task.request` 是不可用的,也不應該使用。
取而代之,我們可以使用這個對象,因為他們有很多相同的字段。

- terminated

如果任務被中止,設置為真

- signum

用來中止任務的信號數值。如果它的值為 None,且設置了 terminated 為真,那么它的值將設置成 TERM。

- expired

如果任務過期,設置成真
  • task_unknown

當工作單元收到未注冊的任務的消息

發送者是工作單元消費者

提供的參數:

- name

任務的名稱

- id

消息中任務的id

- message

裸消息對象

- exc

發生的錯誤
  • task_rejected

當工作單元從任務隊列中收到未知類型的消息

發送者是工作單元消息者

提供的參數:

- message

裸消息對象

- exc

發生的錯誤(如果有)

應用信號


  • import_modules

當程序(工作單元、beat、shell)導入 Include 中的模塊,或者導入配置被導入時。

發送者是應用實例。

工作單元信號


  • celery_after_setup
    工作單元啟動但是在執行任務之前發送的信號。者意味着任意從 celery worker -Q 聲明的隊列都已經啟用,日志環境已經設置好,等等。

它可以用來添加除 celery worker -Q 選項聲明的隊列之外的自定義隊列,這些自定義隊列應該始終被消費。下面是給每個工作單元創建一個直接隊列的示例,這些隊列可以用來路由任務給指定的工作單元:

from celery.signals import celeryd_after_setup

@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
    queue_name = '{0}.dq'.format(sender)  # sender is the nodename of the worker
    instance.app.amqp.queues.select_add(queue_name)

提供的參數:

  • sender
    工作單元的節點名稱
  • instance
    這是要初始化的 celery.apps.worker.Worker 實例。注意,至今為止,只設置了 apphostname 屬性,並且 __init__ 函數的余下部分還沒有執行。
  • conf
    當前應用實例的配置。

  • celeryd_init

這是工作單元啟動后發送的第一個信號。sender是工作單元的主機名,所以這個信號可以用來設置工作單元的特殊配置:

from celery.signals import celeryd_init

@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
    conf.task_default_rate_limit = '10/m'

或者如果你想給多個工作單元設置配置,你連接該信號的時候可以忽略 sender 參數:

from celery.signals import celeryd_init

@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    if sender in ('worker1@example.com', 'worker2@example.com'):
        conf.task_default_rate_limit = '10/m'
    if sender == 'worker3@example.com':
        conf.worker_prefetch_multiplier = 0

提供的參數:
  - sender
    工作單元的節點名稱
  - instance
    這是要初始化的 celery.apps.worker.Worker 實例。注意,至今為止,只設置了 apphostname 屬性,並且 __init__ 函數的余下部分還沒有執行。
  - conf
    當前應用實例的配置。
  - options
    從命令行傳遞給工作單元的選項

  • worker_init
    任務開始前分發

  • worker_ready
    工作單元准備接收任務時分發

  • worker_init
    Celery 發送一個工作單元心跳時分發
    sendercelery.worker.heartbeat.Heart 實例

  • worker_shutting_down
    工作單元准備關閉進程時分發

提供的參數:
  - sig
    接收到的POSIX信號
  - how
    關閉方法,熱關閉或者冷關閉
  - exitcode
    主進程退出時將使用的退出碼

  • worker_process_init
    在所有池進程開始時分發

  注意這個信號綁定的處理函數不能阻塞多余4秒,否則進程會被認為開始失敗而被殺死

  • worker_process_shutdown
    在所有池進程將退出前分發

  注意:不能保證這個信號一定能分發,類似於finally 塊,不能保證處理函數會在關閉時進行調用,並且如果被調用也有可能中斷。

提供的參數:
  - pid
    將要關閉的子進程的進程ID
  - exitcode
    子進程關閉時將使用的退出碼

  • worker_shutdown
    工作單元將要關閉前分發

Beat 信號


  • beat_init
    celery beat啟動時分發 ( standalone 或者 embedded)

  Sendercelery.beat.Service instance

  • beat_embedded_init
    當celery beat 作為一個嵌入式進程啟動時除發送 beat_init信號外還將發送的信號

  Sendercelery.beat.Service instance

Eventlet 信號


  • eventlet_pool_started
    eventlet pool 啟動時分發

  Sendercelery.concurrency.eventlet.TaskPool實例

  • eventlet_pool_preshutdown
    當工作單元關閉,eventlet池等待剩余工作進程時發送

  Sendercelery.concurrency.eventlet.TaskPool實例

  • eventlet_pool_postshutdown
    當池已經被join,並且工作單元將關閉時分發

  Sendercelery.concurrency.eventlet.TaskPool 實例

  • eventlet_pool_apply
    當任務應用到池時分發

  Sendercelery.concurrency.eventlet.TaskPool 實例

提供的參數:

- target

    目標函數

- args

    位置參數

- kwargs

    關鍵字參數

日志信號


  • setup_logging
    如果這個信號被連接,celery不會配置日志器,所以你可以使用你自己的日志配置完全覆蓋原來配置。

如果你想修改celery設置的配置,你可以使用 after_setup_loggerafter_setup_task_logger signals 信號

提供的參數:

- loglevel

    日志對象的級別

- logfile
    日志文件的名稱

- format

    日志格式字符串

- colorize

    聲明日志消息是否標顏色
  • after_setup_logger
    每個全局日志器設置后分發(不是任務日志器)。用來修改日志配置。

提供的參數:

- logger

    日志器對象

- loglevel

    日志對象的級別

- logfile

    日志文件名稱

- format

    日志格式字符串

- colorize

    聲明日志是否標顏色
  • after_setup_task_logger
    每個任務日志器設置后分發。用來修改日志配置

提供的參數:

- logger

    日志器對象

- loglevel

    日志對象的級別

- logfile

    日志文件名稱

- format

    日志格式字符串

- colorize

    聲明日志是否標顏色

命令信號


  • user_preload_options
    Celery 命令行程序完成解析預處理選項時該信號將被分發。

它可以用來給 celery 命令添加附加的命令行參數:

from celery import Celery
from celery import signals
from celery.bin.base import Option

app = Celery()
app.user_options['preload'].add(Option(
    '--monitoring', action='store_true',
    help='Enable our external monitoring utility, blahblah',
))

@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
    if options['monitoring']:
        enable_monitoring()

SenderCommand實例,並且值依賴於調用的程序(例如: 對於總命令,他將是一個 CeleryCommand 對象)

提供的參數:

- app
    應用實例
- options
    被解析的預加載選項的映射(以及默認值)

廢棄的信號


    • task_sent
      這個信號已經被廢棄,請使用 after_task_publish

 

轉自:https://blog.csdn.net/libing_thinking/article/details/78606458


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM