基礎
有多種類型的事件可以觸發信號,你可以連接到這些信號,使得在他們觸發的時候執行操作。
連接到 after_task_publish
信號的示例:
from celery.signals import after_task_publish @after_task_publish.connect def task_sent_handler(sender=None, headers=None, body=None, **kwargs): # information about task are located in headers for task messages # using the task protocol version 2. info = headers if 'task' in headers else body print('after_task_publish for task id {info[id]}'.format( info=info, ))
一些信號還帶有發送者,你可以用來過濾信號。例如,after_task_publish
信號使用任務名稱作為發送者,所以通過給connect 方法提供 sender 參數,你可以使你的處理函數在每個名為 proj.tasks.add
的任務發布的時候被回調:
@after_task_publish.connect(sender='proj.tasks.add') def task_sent_handler(sender=None, headers=None, body=None, **kwargs): # information about task are located in headers for task messages # using the task protocol version 2. info = headers if 'task' in headers else body print('after_task_publish for task id {info[id]}'.format( info=info, ))
信號的實現同 django.core.dispatch
。所以默認情況下其他關鍵字參數(例如: signal)都被傳遞給所有信號處理函數。
信號處理的最佳實踐是接收任意關鍵字參數(即:**kwargs)。這樣的話當使用新的Celery 版本時只要添加新的參數而不需要更改你的代碼。
信號
任務信號
- before_task_publish
3.1版本新特性。
任務被發布之前分發的。注意它是在發送任務的進程中執行。
發送者是被發送的任務的名稱。
提供的參數:
- body 任務消息體 這是一個包含任務消息字段的映射,查看 Version2 和 Version1 獲取可以定義的可用字段的說明 - exchange 消息交換器的名稱或者對象實例 - routing_key 發送消息時使用的路由鍵 - headers 應用頭映射 (可以修改) - properties 消息屬性 (可以修改) - declare 消息發布前聲明的實體(消息交換器、隊列或者綁定,可以修改) - retry_policy 重試選項映射。可以是 `kombu.Connection.ensure()` 方法的任意參數,可以修改
- after_task_publish
任務被分發到消息中間件之后。注意這是在發送任務的進程中執行。
提供的參數:
- headers
任務消息頭,查看 Version2 和 Version1 獲取可以定義的可用字段的說明
- body
任務消息體,查看 Version2 和 Version1 獲取可以定義的可用字段的說明
- exchange 消息交換器名稱或者消息交換器對象實例 - routing_key 使用的路由鍵
- task_prerun
任務執行前
發送者是將要執行的任務
提供的參數:
- task_id 被執行的任務的ID - task 被執行的任務 - args 任務位置參數 - kwargs 任務關鍵字參數
- task_postrun
任務執行后分發
發送者是被執行的任務對象
提供的參數:
- task_id 被執行的任務的ID - task 被執行的任務 - args 任務位置參數 - kwargs 任務關鍵字參數 - retval 任務的返回值 - state 結果狀態的名稱
- task_retry
當任務將被重試時分發
發送者是任務對象
提供的參數:
- request 當前任務請求 - reason 重試的理由 (通常是一個異常實例,但總是能表示成字符串) - einfo 詳細的異常信息,包括堆棧回溯信息(一個`billiard.einfo.ExceptionInfo` 對象)
- task_success
任務執行成功時分發
發送者是被執行的任務對象
提供的參數:
- result 任務的返回值
- task_failure
任務失敗時分發
發送者是被執行的任務對象
提供的參數:
- task_id 任務的ID - 異常 拋出的異常實例 - args 任務調用時傳遞的位置參數 - kwargs 任務調用時傳遞的關鍵字參數 - traceback 堆棧回溯對象 - einfo `billiard.einfo.ExceptionInfo` 實例對象
- task_revoked
任務被工作單元取消或者中止時分發
提供的參數:
- request 這是一個 `Request` 實例,而不是 `task.request` 實例。當使用 prefork 池,這個信號由父進程分發,所以 `task.request` 是不可用的,也不應該使用。 取而代之,我們可以使用這個對象,因為他們有很多相同的字段。 - terminated 如果任務被中止,設置為真 - signum 用來中止任務的信號數值。如果它的值為 None,且設置了 terminated 為真,那么它的值將設置成 TERM。 - expired 如果任務過期,設置成真
- task_unknown
當工作單元收到未注冊的任務的消息
發送者是工作單元消費者
提供的參數:
- name 任務的名稱 - id 消息中任務的id - message 裸消息對象 - exc 發生的錯誤
- task_rejected
當工作單元從任務隊列中收到未知類型的消息
發送者是工作單元消息者
提供的參數:
- message 裸消息對象 - exc 發生的錯誤(如果有)
應用信號
- import_modules
當程序(工作單元、beat、shell)導入 Include 中的模塊,或者導入配置被導入時。
發送者是應用實例。
工作單元信號
- celery_after_setup
工作單元啟動但是在執行任務之前發送的信號。者意味着任意從celery worker -Q
聲明的隊列都已經啟用,日志環境已經設置好,等等。
它可以用來添加除 celery worker -Q
選項聲明的隊列之外的自定義隊列,這些自定義隊列應該始終被消費。下面是給每個工作單元創建一個直接隊列的示例,這些隊列可以用來路由任務給指定的工作單元:
from celery.signals import celeryd_after_setup @celeryd_after_setup.connect def setup_direct_queue(sender, instance, **kwargs): queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker instance.app.amqp.queues.select_add(queue_name)
提供的參數:
- sender
工作單元的節點名稱 - instance
這是要初始化的celery.apps.worker.Worker
實例。注意,至今為止,只設置了app
和hostname
屬性,並且__init__
函數的余下部分還沒有執行。 -
conf
當前應用實例的配置。 -
celeryd_init
這是工作單元啟動后發送的第一個信號。sender
是工作單元的主機名,所以這個信號可以用來設置工作單元的特殊配置:
from celery.signals import celeryd_init @celeryd_init.connect(sender='worker12@example.com') def configure_worker12(conf=None, **kwargs): conf.task_default_rate_limit = '10/m'
或者如果你想給多個工作單元設置配置,你連接該信號的時候可以忽略 sender
參數:
from celery.signals import celeryd_init @celeryd_init.connect def configure_workers(sender=None, conf=None, **kwargs): if sender in ('worker1@example.com', 'worker2@example.com'): conf.task_default_rate_limit = '10/m' if sender == 'worker3@example.com': conf.worker_prefetch_multiplier = 0
提供的參數:
- sender
工作單元的節點名稱
- instance
這是要初始化的 celery.apps.worker.Worker
實例。注意,至今為止,只設置了 app
和 hostname
屬性,並且 __init__
函數的余下部分還沒有執行。
- conf
當前應用實例的配置。
- options
從命令行傳遞給工作單元的選項
-
worker_init
任務開始前分發 -
worker_ready
工作單元准備接收任務時分發 -
worker_init
Celery 發送一個工作單元心跳時分發
sender
是celery.worker.heartbeat.Heart
實例 -
worker_shutting_down
工作單元准備關閉進程時分發
提供的參數:
- sig
接收到的POSIX信號
- how
關閉方法,熱關閉或者冷關閉
- exitcode
主進程退出時將使用的退出碼
- worker_process_init
在所有池進程開始時分發
注意這個信號綁定的處理函數不能阻塞多余4秒,否則進程會被認為開始失敗而被殺死
- worker_process_shutdown
在所有池進程將退出前分發
注意:不能保證這個信號一定能分發,類似於finally
塊,不能保證處理函數會在關閉時進行調用,並且如果被調用也有可能中斷。
提供的參數:
- pid
將要關閉的子進程的進程ID
- exitcode
子進程關閉時將使用的退出碼
- worker_shutdown
工作單元將要關閉前分發
Beat 信號
- beat_init
celery beat
啟動時分發 ( standalone 或者 embedded)
Sender
是 celery.beat.Service instance
- beat_embedded_init
當celery beat 作為一個嵌入式進程啟動時除發送beat_init
信號外還將發送的信號
Sender
是 celery.beat.Service instance
Eventlet 信號
- eventlet_pool_started
當eventlet pool
啟動時分發
Sender
是 celery.concurrency.eventlet.TaskPool
實例
- eventlet_pool_preshutdown
當工作單元關閉,eventlet
池等待剩余工作進程時發送
Sender
是 celery.concurrency.eventlet.TaskPool
實例
- eventlet_pool_postshutdown
當池已經被join
,並且工作單元將關閉時分發
Sender
是 celery.concurrency.eventlet.TaskPool
實例
- eventlet_pool_apply
當任務應用到池時分發
Sender
是 celery.concurrency.eventlet.TaskPool
實例
提供的參數:
- target 目標函數 - args 位置參數 - kwargs 關鍵字參數
日志信號
- setup_logging
如果這個信號被連接,celery不會配置日志器,所以你可以使用你自己的日志配置完全覆蓋原來配置。
如果你想修改celery設置的配置,你可以使用 after_setup_logger
和 after_setup_task_logger signals
信號
提供的參數:
- loglevel 日志對象的級別 - logfile 日志文件的名稱 - format 日志格式字符串 - colorize 聲明日志消息是否標顏色
- after_setup_logger
每個全局日志器設置后分發(不是任務日志器)。用來修改日志配置。
提供的參數:
- logger 日志器對象 - loglevel 日志對象的級別 - logfile 日志文件名稱 - format 日志格式字符串 - colorize 聲明日志是否標顏色
- after_setup_task_logger
每個任務日志器設置后分發。用來修改日志配置
提供的參數:
- logger 日志器對象 - loglevel 日志對象的級別 - logfile 日志文件名稱 - format 日志格式字符串 - colorize 聲明日志是否標顏色
命令信號
- user_preload_options
Celery 命令行程序完成解析預處理選項時該信號將被分發。
它可以用來給 celery 命令添加附加的命令行參數:
from celery import Celery from celery import signals from celery.bin.base import Option app = Celery() app.user_options['preload'].add(Option( '--monitoring', action='store_true', help='Enable our external monitoring utility, blahblah', )) @signals.user_preload_options.connect def handle_preload_options(options, **kwargs): if options['monitoring']: enable_monitoring()
Sender
是Command
實例,並且值依賴於調用的程序(例如: 對於總命令,他將是一個 CeleryCommand
對象)
提供的參數:
- app 應用實例 - options 被解析的預加載選項的映射(以及默認值)
廢棄的信號
- task_sent
這個信號已經被廢棄,請使用after_task_publish
轉自:https://blog.csdn.net/libing_thinking/article/details/78606458