Celery-4.1 用戶指南: Configuration and defaults (配置和默認值)


這篇文檔描述了可用的配置選項。

如果你使用默認的加載器,你必須創建 celeryconfig.py 模塊並且保證它在python路徑中。

配置文件示例

以下是配置示例,你可以從這個開始。它包括運行一個基本Celery應用的所有基礎設置。

## Broker settings.
broker_url = 'amqp://guest:guest@localhost:5672//'

# List of modules to import when the Celery worker starts.
imports = ('myapp.tasks',)

## Using the database to store task state and results.
result_backend = 'db+sqlite:///results.db'

task_annotations = {'tasks.add': {'rate_limit': '10/s'}}

新的小寫設置

4.0 版本引入了新的小寫設置名稱和機構環境。

與以前版本的不同,除了設置項名稱變為小寫字母外,還有一個前綴的重命名,例如 celerybeat_ 變為 beat_,celeryd_ 變為 worker,以及很多頂級 celery_ 設置重命名成了 task_ 前綴。

Celery 仍然能讀取老的配置文件,所以並不倉促遷移到新的設置格式。

Setting name Replace with
CELERY_ACCEPT_CONTENT accept_content
CELERY_ENABLE_UTC enable_utc
CELERY_IMPORTS imports
CELERY_INCLUDE include
CELERY_TIMEZONE timezone
CELERYBEAT_MAX_LOOP_INTERVAL beat_max_loop_interval
CELERYBEAT_SCHEDULE beat_schedule
CELERYBEAT_SCHEDULER beat_scheduler
CELERYBEAT_SCHEDULE_FILENAME beat_schedule_filename
CELERYBEAT_SYNC_EVERY beat_sync_every
BROKER_URL broker_url
BROKER_TRANSPORT broker_transport
BROKER_TRANSPORT_OPTIONS broker_transport_options
BROKER_CONNECTION_TIMEOUT broker_connection_timeout
BROKER_CONNECTION_RETRY broker_connection_retry
BROKER_CONNECTION_MAX_RETRIES broker_connection_max_retries
BROKER_FAILOVER_STRATEGY broker_failover_strategy
BROKER_HEARTBEAT broker_heartbeat
BROKER_LOGIN_METHOD broker_login_method
BROKER_POOL_LIMIT broker_pool_limit
BROKER_USE_SSL broker_use_ssl
CELERY_CACHE_BACKEND cache_backend
CELERY_CACHE_BACKEND_OPTIONS cache_backend_options
CASSANDRA_COLUMN_FAMILY cassandra_table
CASSANDRA_ENTRY_TTL cassandra_entry_ttl
CASSANDRA_KEYSPACE cassandra_keyspace
CASSANDRA_PORT cassandra_port
CASSANDRA_READ_CONSISTENCY cassandra_read_consistency
CASSANDRA_SERVERS cassandra_servers
CASSANDRA_WRITE_CONSISTENCY cassandra_write_consistency
CELERY_COUCHBASE_BACKEND_SETTINGS couchbase_backend_settings
CELERY_MONGODB_BACKEND_SETTINGS mongodb_backend_settings
CELERY_EVENT_QUEUE_EXPIRES event_queue_expires
CELERY_EVENT_QUEUE_TTL event_queue_ttl
CELERY_EVENT_QUEUE_PREFIX event_queue_prefix
CELERY_EVENT_SERIALIZER event_serializer
CELERY_REDIS_DB redis_db
CELERY_REDIS_HOST redis_host
CELERY_REDIS_MAX_CONNECTIONS redis_max_connections
CELERY_REDIS_PASSWORD redis_password
CELERY_REDIS_PORT redis_port
CELERY_RESULT_BACKEND result_backend
CELERY_MAX_CACHED_RESULTS result_cache_max
CELERY_MESSAGE_COMPRESSION result_compression
CELERY_RESULT_EXCHANGE result_exchange
CELERY_RESULT_EXCHANGE_TYPE result_exchange_type
CELERY_TASK_RESULT_EXPIRES result_expires
CELERY_RESULT_PERSISTENT result_persistent
CELERY_RESULT_SERIALIZER result_serializer
CELERY_RESULT_DBURI Use result_backend instead.
CELERY_RESULT_ENGINE_OPTIONS database_engine_options
[…]_DB_SHORT_LIVED_SESSIONS database_short_lived_sessions
CELERY_RESULT_DB_TABLE_NAMES database_db_names
CELERY_SECURITY_CERTIFICATE security_certificate
CELERY_SECURITY_CERT_STORE security_cert_store
CELERY_SECURITY_KEY security_key
CELERY_TASK_ACKS_LATE task_acks_late
CELERY_TASK_ALWAYS_EAGER task_always_eager
CELERY_TASK_ANNOTATIONS task_annotations
CELERY_TASK_COMPRESSION task_compression
CELERY_TASK_CREATE_MISSING_QUEUES task_create_missing_queues
CELERY_TASK_DEFAULT_DELIVERY_MODE task_default_delivery_mode
CELERY_TASK_DEFAULT_EXCHANGE task_default_exchange
CELERY_TASK_DEFAULT_EXCHANGE_TYPE task_default_exchange_type
CELERY_TASK_DEFAULT_QUEUE task_default_queue
CELERY_TASK_DEFAULT_RATE_LIMIT task_default_rate_limit
CELERY_TASK_DEFAULT_ROUTING_KEY task_default_routing_key
CELERY_TASK_EAGER_PROPAGATES task_eager_propagates
CELERY_TASK_IGNORE_RESULT task_ignore_result
CELERY_TASK_PUBLISH_RETRY task_publish_retry
CELERY_TASK_PUBLISH_RETRY_POLICY task_publish_retry_policy
CELERY_TASK_QUEUES task_queues
CELERY_TASK_ROUTES task_routes
CELERY_TASK_SEND_SENT_EVENT task_send_sent_event
CELERY_TASK_SERIALIZER task_serializer
CELERYD_TASK_SOFT_TIME_LIMIT task_soft_time_limit
CELERYD_TASK_TIME_LIMIT task_time_limit
CELERY_TRACK_STARTED task_track_started
CELERYD_AGENT worker_agent
CELERYD_AUTOSCALER worker_autoscaler
CELERYD_CONCURRENCY worker_concurrency
CELERYD_CONSUMER worker_consumer
CELERY_WORKER_DIRECT worker_direct
CELERY_DISABLE_RATE_LIMITS worker_disable_rate_limits
CELERY_ENABLE_REMOTE_CONTROL worker_enable_remote_control
CELERYD_HIJACK_ROOT_LOGGER worker_hijack_root_logger
CELERYD_LOG_COLOR worker_log_color
CELERYD_LOG_FORMAT worker_log_format
CELERYD_WORKER_LOST_WAIT worker_lost_wait
CELERYD_MAX_TASKS_PER_CHILD worker_max_tasks_per_child
CELERYD_POOL worker_pool
CELERYD_POOL_PUTLOCKS worker_pool_putlocks
CELERYD_POOL_RESTARTS worker_pool_restarts
CELERYD_PREFETCH_MULTIPLIER worker_prefetch_multiplier
CELERYD_REDIRECT_STDOUTS worker_redirect_stdouts
CELERYD_REDIRECT_STDOUTS_LEVEL worker_redirect_stdouts_level
CELERYD_SEND_EVENTS worker_send_task_events
CELERYD_STATE_DB worker_state_db
CELERYD_TASK_LOG_FORMAT worker_task_log_format
CELERYD_TIMER worker_timer
CELERYD_TIMER_PRECISION worker_timer_precision

配置指示

通用設置

  • accept_content
    默認值: {‘json’} (set, list, or tuple).
    允許的內容類型/序列化器的白名單

如果接收到一個消息,其內容類型不再上述列表中,它將會被丟棄並拋出一個錯誤。

默認情況下,任意內容類型都是啟用的,包括pickle以及yaml,所以確保不受信任的第三方不能訪問你的消息中間件。查看安全這一節獲取更多信息。

示例:

# using serializer name
accept_content = ['json']

# or the actual content-type (MIME)
accept_content = ['application/json']

時間與日期設置

  • enable_utc
    2.5 版本新特性。
    默認值:從 3.0 版本開始默認啟用

  一旦啟用,消息中的日期和時間將會轉化成 UTC 時區。

  注意2.5版本以下的工作單元將會認為所有消息都使用的本地時區,所以只有在所有的工作單元都升級了的情況下再啟用這個特性。

  • timezone
    2.5版本新特性
    默認值: “UTC”

  設置Celery使用一個自定義的時區。這個時區值可以是pytz庫支持的任意時區。

  如果沒有設置,UTC時區將被使用。為了向后兼容,還提供了一個 enable_utc設置,如果他設置成假,將使用系統本地時區。

任務設置

  • task_annotations
    這個設置可以用來在配置文件中重寫任意任務屬性。這個設置可以是一個字典,獲取一個annotation對象的列表,這個列表對任務進行過濾,對匹配的任務名稱起作用,並返回待更改屬性的一個映射。

以下將更改 tasks.add 任務的 rate_limit 屬性:

task_annotations = {‘tasks.add’: {‘rate_limit’: ‘10/s’}} 

或者對所有的任務更改:

task_annotations = {‘*’: {‘rate_limit’: ‘10/s’}} 


你還可以更改方法,例如 on_failure 處理函數:

def my_on_failure(self, exc, task_id, args, kwargs, einfo):
  print(‘Oh no! Task failed: {0!r}’.format(exc))

task_annotations = {‘*’: {‘on_failure’: my_on_failure}} 

如果你需要更靈活的控制,那么你可以使用對象而不是字典來選擇任務來進行注解:

class MyAnnotate(object):

    def annotate(self, task):
        if task.name.startswith('tasks.'):
            return {'rate_limit': '10/s'}

task_annotations = (MyAnnotate(), {other,})
  • task_compression
    默認值: None
    任務消息的默認壓縮算法。可以是 gzip、bzip2(如果可用),或者任意在 Kombu 壓縮模式注冊表中注冊的自定義壓縮算法。

   默認發送未壓縮的消息。

  • task_protocol
    默認值:2(從4.0版本開始)
    設置默認的任務消息協議版本。支持的協議:1 和 2

   協議 2 在 3.1.24 以及 4.x+ 被支持

  • task_serializer
    默認值:“json”(從4.0版本開始,更早:pickle)
    一個表示使用的默認序列化方法的字符串。可以是 json(默認)、pickle、 yaml、msgpack,或者任意在 kombu.serialization.registry 中注冊過的自定義序列化方法。

  另見:
    Serializers

  • task_publish_retry
    2.2版本新特性
    默認值:啟用

   決定當連接丟失或者其他連接錯誤時任務消息的發布是否會重試,查看 task_publish_retry_policy。

  • task_publish_retry_policy
    2.2版本新特性
    默認值:查看 Message Sending Retry。

   定義當連接丟失或者其他連接錯誤時任務消息的發布重試策略。

任務執行設置

  • task_always_eager
    默認值:禁用
    如果設置成 True,所有的任務都將在本地執行知道任務返回。apply_async() 以及Task.delay()將返回一個 EagerResult 實例,模擬AsyncResult實例的API和行為,除了這個結果是已經計算過的之外。

   也就是說,任務將會在本地執行而不是發送到隊列。

  • task_eager_propagates
    默認值:禁用
    如果設置成 True,本地執行的任務(使用 task.apply(),或者 task_always_eager 被啟用)將傳遞異常。

   這與使用 apply()throw=True 參數有同樣的效果。

  • task_remote_tracebacks
    默認值:禁用
    如果啟用了,當重新拋出任務錯誤時,任務結果將會包括工作單元的堆棧信息。

   它需要 tblib 庫,可以通過 pip 安裝:

    $ pip install celery[tblib] 


    查看 Bundles 獲取關於組合多個擴展需求的信息。

  • task_ignore_result
    默認值:禁用
    是否存儲任務返回值(tombstones)。如果你只是想在發生錯誤的時候記錄返回值,可以設置:task_store_errors_even_if_ignored

  • task_store_errors_even_if_ignored
    默認值:禁用
    如果設置了,即使 Task.ignore_result 啟用了,工作單元也會愛結果后端中存儲所有的任務錯誤。

  • task_track_started
    默認值:禁用
    如果設置成真,當任務被工作單元執行時,任務將報告它的狀態為started。默認值是假,因為通常行為是不做這種粒度級別的匯報。任務會處於 pending、finished 或者 waiting to be retried。當有長時間任務,並且需要知道當前正在運行什么任務時,有一個 started狀態將會很有用。

  • task_time_limit
    默認值:沒有時間限制
    任務的硬時間限制,以秒為單位。如果這個時間限制被超過,處理任務的工作單元進程將會被殺死並使用一個新的替代。

  • task_soft_time_limit
    默認值:沒有時間限制
    任務的軟時間限制,以秒為單位

  當這個時間限制超過后,SoftTimeLimitExceeded異常將會被拋出。例如,任務可以捕獲這個異常在硬時間限制到達之前對環境進行清理:

from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        return do_work()
    except SoftTimeLimitExceeded:
        cleanup_in_a_hurry()
  • task_acks_late
    默認值:禁用
    延遲確認意味着任務消息將在任務執行完成之后再進行確認,而不是剛開始時(默認行為)。

  另見:
    FAQ: Shoud I use retry or acks_late

  • task_reject_on_worker_lost
    默認值:禁用
    即使 task_acks_late 被啟用,當處理任務的工作單元異常退出或者收到信號而退出時工作單元將會確認任務消息。

   將這個設置成真可以讓消息重新入隊,所以任務將會被再執行,在同一個工作單元或者另外一個工作單元。

   告警:
    啟用這個可能導致消息循環;確保你知道你在做什么

  • task_default_rate_limit
    默認值:沒有速率限制
    任務的全局默認速率限制

   當任務沒有一個自定義的速率限制時,這個值將被使用

   另見:
     worker_disable_rate_limits 設置可以禁用所有的速率限制

任務結果后端設置

  • result_backend
    默認值:默認不啟用結果后端
    用來存儲結果的后端。可以是下列之一:

    1. rpc
      以 AMQP 消息形式發送結果。查看 RPC 后端設置

    2. database
      使用一個 SQLAlchemy 支持的結構化數據庫。查看數據庫后端設置

    3. redis
      使用 Redis 存儲結果。查看 Redis 后端設置

    4. cache
      使用 Memcached 存儲結果。查看 Cache 后端設置

    5. cassandra
      使用 Cassandra 存儲結果。查看 Cassandra 后端設置

    6. elasticsearch
      使用 Elasticsearch 存儲結果。查看 Elasticsearch 后端設置

    7. ironcache
      使用 IronCache 存儲結果。查看 IronCache 后端設置

    8. couchbase
      使用 Couchbase 存儲結果。查看 Couchbase 后端設置

    9. couchdb
      使用 CouchDB 存儲結果。查看 CouchDB 后端設置

    10. filesystem
      使用共享文件夾存儲結果。查看 File-system 后端設置

    11. consul
      使用 Consul K/V 存儲結果。查看 Consul K/V 后端設置

  • result_serializer
    默認值:從4.0版本開始使用 json(更早:pickle)
    查看 Serializers 獲取支持的序列化格式的信息。

  • result_compression
    默認值:無壓縮
    結果值得可選壓縮方法。支持 task_seralizer 設置相同的選項。

  • result_expires
    默認值:1天后過期
    存儲的結果被刪除的時間(秒數,或者一個 timedelta 對象)

   (有一個內建的周期性任務將刪除過期的任務結果(celery.backend_cleanup),前提是 celery beat 已經被啟用。這個任務每天上午4點運行。

   值 None 或者 0 意思是結果永不刪除(取決於后端聲明))

   注意:
    當前這個特性只支持 AMQP, database, cache, Redis 這些存儲后端。當使用 database 存儲后端,celery beat必須執行使得過期結果被刪除。

  • result_cache_max
    默認值:默認禁用
    啟用結果的客戶端緩存。

   對於老的 amqp 后端,存儲結果一旦被消費它將不再可用,此時這個特性將起到作用。

   這是老的結果被刪除之前總的結果緩存的數量。值 0 或者 None 意味着沒有限制,並且值 -1 將禁用緩存。

Database 后端設置

Database URL 示例

使用一個數據庫存儲后端,你必須配置 result_backend 設置為一個連接的URL,並且帶 db+ 前綴:

result_backend = 'db+scheme://user:password@host:port/dbname'

示例:

# sqlite (filename)
result_backend = 'db+sqlite:///results.sqlite'

# mysql
result_backend = 'db+mysql://scott:tiger@localhost/foo'

# postgresql
result_backend = 'db+postgresql://scott:tiger@localhost/mydatabase'

# oracle
result_backend = 'db+oracle://scott:tiger@127.0.0.1:1521/sidname'

查看 Supported Databases 獲取支持的數據庫的一個表,查看 Connection String 獲取相關的連接字符串(這是 db+ 前綴后帶的URI的一部分)

  • database_engine_options
    默認值:{} (空映射)
    你可以使用 sqlalchmey_engine_options 設置聲明額外的 SQLAchemy 數據庫引擎選項:
# echo enables verbose logging from SQLAlchemy.
app.conf.database_engine_options = {'echo': True}
  • database_short_lived_sessions
    默認值:默認禁用
    默認禁用短會話。如果啟用了,他們會急劇的降低性能,特別是對於處理很多任務的系統。當工作單元的流量很低,緩存的數據庫連接會由於空閑而變為無用,進而會導致工作單元出錯,這種情況下這個選項是有用的。例如:間歇性的錯誤如(OperationalError)(2006, ‘MySQL server has gone away’)通過啟用短會話能解決。這個選項只影響數據庫后端。

  • database_table_names
    默認值:{} (空映射)
    當 SQLAlchemy 設置成結果后端, Celery 自動創建兩個表來存儲任務的元數據。這個設置允許你自定義表名稱:

# use custom table names for the database result backend.
database_table_names = {
    'task': 'myapp_taskmeta',
    'group': 'myapp_groupmeta',
}

RPC 后端設置

  • result_persistent
    默認值:默認被禁用(瞬態消息)
    如果設置成 True,結果消息將被持久化。這意味着消息中間件重啟后消息不會丟失。

  配置示例:

result_backend = 'rpc://'
result_persistent = False

Cache 后端設置

注意:
  緩存后端支持 pylibmc 和 python-memcached 庫。后者只有在 pylibmc 沒有安裝時才會被使用。

使用一個 Memcached 服務器:

result_backend = 'cache+memcached://127.0.0.1:11211/'

使用多個 Memcached 服務器:

result_backend = """
    cache+memcached://172.19.26.240:11211;172.19.26.242:11211/
""".strip()

“memory” 后端只在內存中存儲緩存:

result_backend = 'cache'
cache_backend = 'memory'
  • cache_backend_options
    默認值:{} (空映射)
    你可以使用 cache_backend_options 設置 pylibmc 選項:
cache_backend_options = {
    'binary': True,
    'behaviors': {'tcp_nodelay': True},
}
  • cache_backend
    這個設置不再使用了,因為現在可以直接在 result_backend 中設置后端存儲。

Redis 后端設置

配置后端 URL

注意:
  Redis 后端需要 Redis 庫。

可以使用 pip 安裝這個包:

$ pip install celery[redis]

查看 Bundles 獲取組合多個擴展需求的信息

后端需要 result_backend 設置成一個 Redis URL:

result_backend = 'redis://:password@host:port/db'

例如:

result_backend = 'redis://localhost/0'

等同於:

result_backend = 'redis://'

URL 的字段如下定義:
1. password
  連接數據庫的密碼
2. host
  Redis 服務器的主機名或者IP地址(例如:localhost)
3. port
  Redis 服務器的端口。默認是 6379
4. db
  使用的數據庫編號。默認是0。db 可以包含一個可選的斜杠

  • redis_backend_us_ssl
    默認值:禁用
    Redis后端支持 SSL。這個選項的合法值與 broker_use_ssl 相同

  • redis_max_connections
    默認值:無顯示
    Redis 連接池的最大可用連接數,這些連接用來發送和接收結果

  • redis_socket_connect_timeout
    5.0.1版本新特性
    默認值:None

從存儲后端連接到Redis服務器的連接的Socket超時時間(以秒為單位,int/float)

  • redis_socket_timeout
    默認值:120秒
    對 Redis 服務器的讀寫操作的 Socket 超時時間(以秒為單位,int/float),由存儲后端使用

Cassandra 后端設置

注意:
  Cassandra 后端驅動 cassandra-driver。

使用 pip 安裝:

$ pip install celery[cassandra]

查看 Bundles 獲取關於組合擴展需求的信息。

后端需要配置下列配置指令

  • cassandra_servers
    默認值: [] (空列表)
    Cassandra 服務器列表。例如:
cassandra_servers = ['localhost']
  • cassandra_port
    默認值:9042.
    連接到Cassandra服務器的端口

  • cassandra_keyspace
    默認值: None.
    存儲結果的 key-space。例如:

cassandra_keyspace = 'tasks_keyspace'
  • cassandra_table
    默認值: None.
    存儲結果的表(列族)。例如:
cassandra_table = 'tasks'
  • cassandra_read_consistency
    默認值: None.
    使用的讀一致性。值可以是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE

  • cassandra_write_consistency
    默認值: None.
    使用的寫一致性。值可以是 ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE

  • cassandra_entry_ttl
    默認值: None.
    狀態項的 Time-to-live。添加過后一段時間他們將會過期並且被刪除。值 None (默認) 意味着他們永不過期

  • cassandra_auth_provider
    默認值: None.
    使用的 cassandra.auth 模塊中的 AuthProvider。 值可以是 PlainTextAuthProvider 或者 SaslAuthProvider

  • cassandra_auth_kwargs
    默認值: {} (空映射)
    傳遞給 authentication provider 的命名參數。例如:

cassandra_auth_kwargs = {
    username: 'cassandra',
    password: 'cassandra'
}

配置示例:

cassandra_servers = ['localhost']
cassandra_keyspace = 'celery'
cassandra_table = 'tasks'
cassandra_read_consistency = 'ONE'
cassandra_write_consistency = 'ONE'
cassandra_entry_ttl = 86400

Elasticsearch 后端設置

使用 Elasticsearch 作為結果后端,你只需要將result_backend設置成正確的 URL。

配置示例:

result_backend = 'elasticsearch://example.com:9200/index_name/doc_type'
  • elasticsearch_retry_on_timeout
    默認值: False
    超時后是否應該觸發在另一個節點重試?

  • elasticsearch_max_retries
    默認值: 3
    異常被傳遞前的最大重試次數

  • elasticsearch_timeout
    默認值: 10.0 秒
    elasticsearch 使用的全局超時時間

Riak 后端設置

注意:
Riak 后端需要 riak 庫

使用 pip 進行安裝:

$ pip install celery[riak]

查看 Bundles 獲取組合多擴展需求的信息。

后端需要result_backend設置成一個 Riak URL:

result_backend = 'riak://host:port/bucket'

例如:

result_backend = 'riak://localhost/celery

等同於:

result_backend = 'riak://'

URL 的字段定義如下:
1. host
  Riak 服務器的主機名或者IP地址(例如 localhost)
2. port
  使用 protobuf 協議的Riak 服務器端口,默認是 8087
3. bucket
  使用的Bucket名稱。默認是 celery。bucket 名稱需要是一個只包含ASCII字符的字符串。

另外,這個后端可以使用如下配置指令進行配置:

  • riak_backend_settings
    默認值: {} (空映射)
    這是一個支持如下鍵的映射:

    1. host
      Riak 服務器的主機名或者IP地址(例如 localhost)

    2. port
      Riak 服務器端口。默認是 8087

    3. bucket
      使用的Bucket名稱。默認是 celery。bucket 名稱需要是一個只包含ASCII字符的字符串。

    4. protocol
      連接到 Riak 服務器使用的協議。這不可以通過 result_backend 配置

AWS DynamoDB 后端設置

注意:
  Dynamodb 后端需要 boto3 庫

使用 pip 進行安裝:

$ pip install celery[dynamodb]

查看 Bundles 獲取組合多擴展需求的信息。

后端需要 result_backend 設置成一個 DynamoDB URL:

result_backend = 'dynamodb://aws_access_key_id:aws_secret_access_key@region:port/table?read=n&write=m'

例如,聲明 AWS 區域以及表名稱:

result_backend = 'dynamodb://@us-east-1/celery_results

或者從環境中獲取 AWS 配置參數,使用默認表名稱(celery)以及聲明讀寫吞吐量:

result_backend = 'dynamodb://@/?read=5&write=5'

或者在本地使用 DynamoDB 的可下載版本:

result_backend = 'dynamodb://@localhost:8000

URL 中的字段如下定義:

  1. aws_access_key_id & aws_secret_access_key
    訪問 AWS API 資源的認證信息。這可以通過 boto3 從不同的源獲取到

  2. region
    AWS 區域,例如: us-east-1 或者本地版本的 localhost。查看 boto3 庫文檔獲取更多的信息。

  3. port
    如果你使用的本地版本,這是本地DynamoDB示例監聽的端口。如果你沒有把區域設置成 localhost,這個設置選項將無效

  4. table
    使用的表名。默認是 celery。查看 DynamoDB 命名規則獲取允許的字符以及表名長度的信息。

  5. read & write
    所創建的 DynamoBD 表的讀寫能力單元。默認的讀寫值都是 1。更多的細節可以從 Provisioned Throughput documentation 中獲取到。

IronCache 后端設置

注意:
IronCache 后端需要 iron_celery 庫:

使用 pip 進行安裝:

$ pip install iron_celery

IronCache 通過在 result_backend 中配置的 URL 進行聲明,例如:

result_backend = 'ironcache://project_id:token@'

或者更改緩存名稱:

ironcache:://project_id:token@/awesomecache

更多的信息,查看 https://github.com/iron-io/iron_celery

Couchbase 后端設置

注意:
Couchbase 后端需要 couchbase 庫

使用 pip 進行安裝:

$ pip install celery[couchbase]

查看 Bundle 獲取組合多擴展需求的步驟。

后端可以通過 result_backend 設置成一個 Couchbase URL:

result_backend = 'couchbase://username:password@host:port/bucket'
  • couchbase_backend_settings

   默認值:{} (空映射)

  這是一個支持如下鍵的映射:

  1. host
    Couchbase 服務器的主機名。默認是 localhost
  2. port
    Couchbase 服務器監聽的端口。默認是 8091
  3. bucket
    Couchbase 服務器默認寫入的桶。默認是default
  4. username
    Couchbase 服務器認證的用戶名(可選)
  5. password
    Couchbase 服務器認證的密碼(可選)

CouchDB 后端設置

注意:
CouchDB 后端需要 pycouchdb 庫:
使用 pip 安裝這個包:

$ pip install celery[couchdb]

查看 Bundles 獲取更多關於組合多擴展需求的信息

后端可以通過 result_backend 配置成一個 CouchDB URL:

result_backend = 'couchdb://username:password@host:port/container'

URL 由以下部分組成:

  1. username
    Couchbase 服務器認證的用戶名(可選)
  2. password
    Couchbase 服務器認證的密碼(可選)
  3. host
    Couchbase 服務器的主機名。默認是 localhost
  4. port
    Couchbase 服務器監聽的端口。默認是 8091
  5. container
    CouchDB 服務器寫入的默認容器。默認是 default

File-system 后端設置

后端可以通過一個文件 URL 配置,例如:

CELERY_RESULT_BACKEND = 'file:///var/celery/results'

配置的目錄需要被共享,並且所有使用該后端的服務器都可寫。

如果你在單獨的一個系統上使用 Celery,你不需要任何進一步的配置就可以簡單的使用這個后端。對於大型的集群,你可以使用 NFS、GlusterFS、CIFS、HDFS(使用FUSE),或者其他文件系統。

Consul K/V 存儲后端設置

Consul 后端可以通過 URL 配置:

CELERY_RESULT_BACKEND = ‘consul://localhost:8500/’

后端將在 Consul K/V 存儲中作為單獨鍵存儲結果

后端使用Consul 中的 TTLs 支持結果的自動過期

消息路由

  • task_queues
    默認值: None (默認隊列的配置)
    多數用戶不願聲明這個配置,而是使用 automatic routing facilites。

如果你真的需要配置高級路由,這個設置應該是一個 kombu.Queue 對象的列表,工作單元可以從中消費。

注意工作單元可以通過 -Q 選項覆蓋這個設置,或者這個列表中的單獨隊列可以通過 -X 選項進行排除。

查看 Basics 獲取更多的信息。

默認值是 celery 隊列的一個隊列/消息交換器/綁定的鍵,消息交互類型是direct。

查看 task_routes

  • task_routes
    默認值: None
    一個路由器的列表,或者單個路路由,用來路由任務到相應的隊列。當決定一個任務的最終目的,路由器將按聲明順序進行輪詢。

一個路由器可以通過如下方式聲明:

  1. 函數,簽名格式為 (name, args, kwargs, options, task=None, **kwargs)
  2. 字符串,提供到路由函數的路徑
  3. 字典,包含路由聲明,它將會轉化成一個 celery.routes.MapRoute 實例
  4. 一個 (pattern, route) 元組的列表,它將會轉化成一個 celery.routes.MapRoute 實例
    示例:
task_routes = {
    'celery.ping': 'default',
    'mytasks.add': 'cpu-bound',
    'feed.tasks.*': 'feeds',                           # <-- glob pattern
    re.compile(r'(image|video)\.tasks\..*'): 'media',  # <-- regex
    'video.encode': {
        'queue': 'video',
        'exchange': 'media'
        'routing_key': 'media.video.encode',
    },
}

task_routes = ('myapp.tasks.route_task', {'celery.ping': 'default})
其中,myapp.tasks.route_task 可以是:

def route_task(self, name, args, kwargs, options, task=None, **kw):
        if task == 'celery.ping':
            return {'queue': 'default'}

route_task 可以返回一個字符串或者一個字典。一個字符串表示 task_queues 中的一個隊列名,而字典表示一個自定義的路由。

當發送消息,路由被按順序詢問。第一個返回非 None 值得路由將被使用。消息選項此時將與找到的路由設置合並,其中路由器的設置要優先。

例如: apply_async() 有這些參數:

Task.apply_async(immediate=False, exchange='video',
                 routing_key='video.compress')

並且有一個路由器返回:

{'immediate': True, 'exchange': 'urgent'}

那么最終的消息選項將是:

immediate=True, exchange='urgent', routing_key='video.compress'

(以及Task類中定義的任意默認消息選項)

當進行合並時,task_routes 中定義的值會優先於 task_queues 中定義的值。

對於如下設置:

task_queues = {
    'cpubound': {
        'exchange': 'cpubound',
        'routing_key': 'cpubound',
    },
}

task_routes = {
    'tasks.add': {
        'queue': 'cpubound',
        'routing_key': 'tasks.add',
        'serializer': 'json',
    },
}

tasks.add 的最終路由選項將變為:

{'exchange': 'cpubound',
 'routing_key': 'tasks.add',
 'serializer': 'json'}

查看路由器獲取更多的示例。

  • task_queue_ha_policy
    消息中間件: RabbitMQ
    默認值:None
    這將設置一個隊列的HA策略,並且值可以是一個字符串(通常是 all)
task_queue_ha_policy = 'all'

使用 all 將復制隊列到所有的當前節點,或者你指定一個節點的列表:

task_queue_ha_policy = ['rabbit@host1', 'rabbit@host2']

使用一個列表將隱示設置 x-ha-policy為‘nodes,x-ha-policy-params` 為給定的節點列表

查看 http://www.rabbitmq.com/ha.html 獲取更多的信息

  • task_queue_max_priority
    消息中間件: RabbitMQ
    默認值: None
    查看 RabbitMQ Message Priorities

  • worker_direct
    默認值: 禁用

這個選項使得每個工作單元又一個專門的隊列,所以任務可以路由到指定的工作單元。

每個工作單元的隊列名稱是基於工作單元主機名和一個 .dq后綴自動產生的,使用 C.dq 消息交互器。

例如:節點名稱為 w1@example.com 的工作單元的隊列名稱為:

w1@example.com.dq

此時,你可以通過指定主機名為路由鍵並且使用 C.dq 消息交互器來將任務路由到指定的節點。

task_routes = {
    'tasks.add': {'exchange': 'C.dq', 'routing_key': 'w1@example.com'}
}
  • task_create_missing_queues
    默認值:啟用
    如果啟用(默認),任何聲明的未在 task_queues 中未定義的隊列都將自動被創建。查看 Automaci routing。

  • task_default_queue
    默認值: celery
    如果消息沒有聲明路由或者自定義的隊列,apply_async 默認使用的隊列名稱。

這個隊列必須在 task_queues 中。如果 task_queues 沒有聲明,那么他將自動創建一個隊列項,而這個設置值就作為隊列的名稱。

另見:
修改默認隊列的名稱

  • task_default_exchange
    默認值:”celery”
    當 task_queues 設置中指定鍵沒有聲明自定義的消息交互器,那么這個默認的消息交互器將被使用。

  • task_default_exchange_type
    默認值:”direct”
    當 task_queues 設置中指定鍵沒有聲明自定義的消息交互器類型,那么這個默認的消息交互器類型將被使用。

  • task_default_routing_key
    默認值:”celery”
    當 task_queues 設置中指定鍵沒有聲明自定義的路由鍵,那么這個默認的路由鍵將被使用。

  • task_default_delivery_mode
    默認值:”presistent”

  可以是瞬態的(消息不寫硬盤),或者持久的(寫硬盤)

消息中間件設置

  • broker_url
    默認值:”amqp://”
    默認的消息中間件URL。這必須是一個如下形式的URL:
transport://userid:password@hostname:port/virtual_host

其中只有模式部分是必須的,其余部分都是可選的,默認會設置為對應傳輸中間件的默認值。

傳輸部分是使用的消息中間件的實現,默認是 amqp,(如果安裝了librabbitmq會使用這個庫,否則使用pyamqp)。還有其他可用的選擇,包括 redis://、 sqs://、 qpid://。

模式部分可以是你自己的傳輸中間件實現的全限定路徑:

broker_url = 'proj.transports.MyTransport://localhost'

可以配置多個消息中間件,使用相同的傳輸協議也行。消息中間件可以通過當個字符串聲明,不同的消息中間件URL之間用冒號分隔:

broker_url = 'transport://userid:password@hostname:port//;transport://userid:password@hostname:port//'

或者作為一個列表:

broker_url = [
    'transport://userid:password@localhost:port//',
    'transport://userid:password@hostname:port//'
]

這些消息中間件將被用於broker_failover_strategy

查看Kombu 文檔中的 URLs 章節獲取更多的信息。

  • broker_read_url / broker_write_url
    默認值:broker_url的設置值
    這些設置可以配置而不用 broker_url 的設置,可以為消息中間件聲明不同的連接參數,用來消費和生成消息。

示例:

broker_read_url = 'amqp://user:pass@broker.example.com:56721'
broker_write_url = 'amqp://user:pass@broker.example.com:56722'

所有選項都可以聲明成一個列表,作為故障恢復的可選值,查看 broker_url 獲取更多的信息

  • broker_failover_strategy
    默認值:“round-robin”
    消息中間件連接對象的默認故障恢復策略。如果提供了,將映射到 kombu.connection.failover_strategies 中的一個鍵,或者引用任何方法,從給定的列表中產生一個項。

示例:

# Random failover strategy
def random_failover_strategy(servers):
    it = list(servers)  # don't modify callers list
    shuffle = random.shuffle
    for _ in repeat(None):
        shuffle(it)
        yield it[0]

broker_failover_strategy = random_failover_strategy
  • broker_heartbeat
    支持的傳輸層協議:pyamqp
    默認值:120.0(與服務器協商)

  注意:這個值只被工作單元使用,客戶端此時不使用心跳。

  因為單純使用 TCP/IP 並不總是及時探測到連接丟失,所以 AMQP 定義了心跳,客戶端和消息中間件用來檢測連接是否關閉。

  心跳會被監控,如果心跳值是 10 秒,那么檢測心跳的時間間隔是 10 除以broker_heartbeat_checkrate (默認情況下,這個值是心跳值的兩倍,所以對於10秒心跳,心跳每隔5秒檢測一次)

  • broker_heartbeat_checkrate
    支持的傳輸層協議:pyamqp
    默認值:2.0

工作單元會間隔監控消息中間件沒有丟失過多的心跳。這個檢測的速率是用 broker_heartbeat 值除以這個設置值得到的,所以如果心跳是 10.0 並且這個設置值是默認的2.0,那么這個監控將每隔5秒鍾執行一次(心跳發送速率的兩倍)

  • broker_use_ssl
    支持的傳輸層協議: pyamqp, redis
    默認值: 禁用

在消息中間件連接上使用SSL

這個選項的合法值依據使用的傳輸協議的不同而不同

  • pyamqp
    如果設置成True,連接將依據默認的SSL設置啟用SSL。如果設置成一個字典,將依據給定的策略配置SSL連接。使用的格式是 python 的 ssl.wrap_socket() 選項。

注意SSL套接字一般會在消息中間件的一個單獨的端口上服務。

以下示例提供了客戶端證書,並且使用一個自定義的認證授權來驗證服務器證書:

import ssl

broker_use_ssl = {
  'keyfile': '/var/ssl/private/worker-key.pem',
  'certfile': '/var/ssl/amqp-server-cert.pem',
  'ca_certs': '/var/ssl/myca.pem',
  'cert_reqs': ssl.CERT_REQUIRED
}

告警:
  使用 broker_use_ssl=True 時請小心。可能你的默認配置根本不會驗證服務器證書。請閱讀python的 ssl module security considerations。

  • redis
    設置必須是一個字典,包括如下鍵:
ssl_cert_reqs (required): one of the SSLContext.verify_mode values: 
ssl.CERT_NONE
ssl.CERT_OPTIONAL
ssl.CERT_REQUIRED
ssl_ca_certs (optional): path to the CA certificate
ssl_certfile (optional): path to the client certificate
ssl_keyfile (optional): path to the client key
  • broker_pool_limit
    2.3版本新特性

    默認值:10

    連接池中可以打開最大連接數。

    從2.5版本開始連接池被默認啟用,默認限制是10個連接。這個數值可以依據使用一個連接的 threads/green-threads (eventlet/gevent) 數量進行更改。例如:運行 eventlet 啟動 1000 個 greenlets,他們使用一個連接到消息中間件,如果發生競態條件,那么你應該開始增加這個限制。

    如果設置成None或者0,連接池將會被禁用,並且每次使用連接都會重新建立連接並關閉。

  • broker_connection_timeout
    默認值:4.0
    放棄與AMQP服務器建立連接之前默認等待的超時時間。當使用 gevent 時該設置被禁用。

  • broker_connection_retry
    默認值:啟用
    如果與 AMQP 消息中間件的連接斷開,將自動重新建立連接

   每次重試中間等待的時間會遞增,並且在 broker_connection_max_retries 未達到之前會一只重試

  • broker_connection_max_retries
    默認值:100
    放棄與 AMQP 服務器重新建立連接之前的最大重試次數

   如果設置成 0 或者 None,將一直重試

  • broker_login_method
    默認值:AMQPLAIN
    設置自定義的 amqp 登陸方法

  • broker_transport_options
    2.2 版本新特性
    默認值:{} (空映射)

   傳遞給底層傳輸中間件的一個附加選項的字典

   設置可見超時時間的示例如下(Redis 與 SQS 傳輸中間件支持):

  broker_transport_options = {‘visibility_timeout’: 18000} # 5 hours 


工作單元

  • imports
    默認值:[] (空列表)
    當工作單元啟動時導入的一系列模塊

   這用來聲明要導入的模塊,但是它還可用來導入信號處理函數和附加的遠程控制命令,等等。

   這些模塊將會以原來聲明的順序導入

  • include
    默認值:[] (空列表)
    語義上與 imports 相同,但是可以作為將不同導入分類的一種手段

這個設置中的模塊是在 imports 設置中的模塊導入之后才導入

  • worker_concurrency
    默認值:CPU核數
    執行任務的並發工作單元 process/threads/green 數量

   如果你大部分操作是I/O操作,你可以設置更多的進程(線程),但是大部分情況下都是以CPU數作為定界,嘗試讓這個值接近你機器的CPU核數。如果沒有設置,當前機器的 CPU核數將會被使用

  • worker_prefetch_multiplier
    默認值:4
    工作單元一次預獲取多少個消息是這個設置值乘以並發進程的數量。默認值是 4(每個進程4條消息)。但是,默認設置通常是好的選擇 - 如果你有長時間任務等待在隊列中,並且你必須啟動工作單元,注意第一個工作單元初始時將收到4倍的消息量。因此任務可能在工作單元間不會平均分布

   禁用這個選項,只要將 worker_prefetch_multiplier 設置成 1設置成 0 將允許工作單元持續消費它想要的盡可能多的消息

   更詳細的信息,請閱讀 Prefetch Limits

   注意:
    帶 ETA/countdown 的任務不會受 prefetch 限制的影響

  • worker_lost_wait
    默認值:10.0 秒
    有些情況下,工作單元可能在沒有適當清理的情況下就被殺死,並且工作單元可能在終止前已經發布了一個結果。這個值聲明了在拋出 WorkerLostError 異常之前我們會在丟失的結果值上等待多久

  • worker_max_tasks_per_child
    一個工作單元進程在被一個新的進程替代之前可以執行的最大任務數

  • worker_max_memory_per_child

    默認值:沒有限制。類型:int(kilobytes)

    一個工作單元進程在被一個新的進程替代之前可以消耗的最大預留內存(單位KB)。如果單獨一個任務就導致工作單元超過這個限制,當前的任務會執行完成,並且之后這個進程將會被更新替代。

    示例:

  worker_max_memory_per_child = 12000  # 12MB
  • worker_disable_rate_limits
    默認值:禁用(啟用速率限制)
    即使任務顯示設置了速率,仍然禁用所有速率限制

  • worker_state_db
    默認值:None
    存儲工作單元狀態的文件名稱(如取消的任務)。可以是相對或者絕對路徑,但是注意后綴.db 可能會被添加到文件名后(依賴於python 的版本)

   也可以通過celery worker –statedb 參數設置

  • worker_timer_precision
    默認值:1.0秒
    設置重新檢測調度器之前ETA調度器可以休息的最大秒數

   設置成1意味着調度器精度將為1秒。如果你需要毫秒精度,你可以設置成 0.1

  • worker_enable_remote_control
    默認值:默認啟用
    聲明工作單元的遠程控制是否啟用

事件

  • worker_send_task_events
    默認值:默認禁用
    發送任務相關的事件,使得任務可以使用類似flower 的工作監控到。為工作單元的 -E 參數設置默認值

  • task_send_sent_event
    2.2 版本新特性
    默認值:默認禁用

   如果啟用,對於每個任務都將有一個 task-sent 事件被發送,因此任務在被消費前就能被追蹤。

  • event_queue_ttl
    支持的傳輸中間件: amqp
    默認值:5.0 秒
    發送到一個監控客戶端事件隊列的消息的過期時間(x-message-ttl),以秒為單位(int/float)。

   例如:如果這個值設置為10,被遞送到這個隊列的消息將會在10秒后被刪除

  • event_queue_expires
    支持的傳輸中間件: amqp
    默認值:60.0 秒
    一個監控客戶端事件隊列被刪除前的過期時間(x-expires)。

  • event_queue_prefix
    默認值: “celeryev”.
    事件接收隊列名稱的前綴

  • event_serializer
    默認值: “json”.
    當發送事件消息時使用的消息序列化格式

遠程控制命令

  • control_queue_ttl
    默認值: 300.0

  • control_queue_expires
    默認值: 10.0

日志

  • worker_hijack_root_logger
    2.2 版本新特性
    默認值: 默認啟用 (hijack root logger).

   默認情況下,任意前面配置的根日志器的處理函數都將被移除。如果你想自定義日志處理函數,那么你可以通過設置 worker_hijack_root_logger = False 來禁用這個行為。

   注意:
     日志可以通過連接到 celery.signals.setup_logging 進行定制化

  • worker_log_color
    默認值: 如果應用實例日志輸出到一個終端,這個將啟用
    啟用/禁用Celery 應用日志輸出的顏色

  • worker_log_format
    默認值:

    [%(asctime)s: %(levelname)s/%(processName)s] %(message)s  日志信息的格式
    

  查看python 日志模塊獲取更多關於日志的信息

  • worker_task_log_format
    默認值:
[%(asctime)s: %(levelname)s/%(processName)s]
    [%(task_name)s(%(task_id)s)] %(message)s

任務中記錄日志使用的格式。查看python 日志模塊獲取更多關於日志的信息

  • worker_redirect_stdouts
    默認值: 默認啟用
    如果啟用來,標准輸出和標准錯誤輸出將重定向到當前日志器

工作單元和 beat 將使用到

  • worker_redirect_stdouts_level
    默認值:WARNING
    標准輸出和標准錯誤輸出的日志級別。可以是DEBUG, INFO, WARNING, ERROR, or CRITICAL

安全

  • security_key
    默認值: None.
    2.5 版本新特性

包含私鑰的文件的相對或者絕對路徑,私鑰用來在使用消息簽名時對消息進行簽名。

  • security_certificate
    默認值:None.
    2.5 版本新特性

  包含X.509認證的文件的相對或者絕對路徑,認證用來在使用消息簽名時對消息進行簽名。

  • security_cert_store
    默認值:None.
    2.5 版本新特性

  包含用來進行消息簽名的X.509認證的目錄。可以使用文件名模式匹配(例如:/etc/certs/*.pem)

自定義組件類 (高級)

  • worker_pool
    默認值:”prefork” (celery.concurrency.prefork:TaskPool).
    工作單元使用的池類的名稱

  • Eventlet/Gevent
    永遠不要使用這個選項來選擇用eventlet 還是 gevent。你必須對工作單元使用-P選項,確保應急補丁不會應用過遲,導致出現奇怪的現象。

  • worker_pool_restarts
    默認值:默認禁用

   如果啟用,工作單元池可以使用 pool_restart 遠程控制命令進行重啟

  • worker_autoscaler
    2.2 版本新特性
    默認值: “celery.worker.autoscale:Autoscaler”.

使用的自動擴展類的名稱

  • worker_consumer
    默認值:”celery.worker.consumer:Consumer”.
    工作單元使用的消費類的名稱

  • worker_timer
    默認值:”kombu.async.hub.timer:Timer”.
    工作單元使用的 ETA 調度器類的名稱。默認值是被池具體實現設置。

Beat 設置 (celery beat)

  • beat_schedule
    默認值: {} (空映射)
    beat調度的周期性任務。查看Entries

  • beat_scheduler
    默認值:”celery.beat:PersistentScheduler”.
    默認的調度器類。如果同時使用django-celery-beat擴展,可以設置成 “django_celery_beat.schedulers:DatabaseScheduler”

也可以通過celery beat 的 -S 參數進行設置

  • beat_schedule_filename
    默認值: “celerybeat-schedule”.
    存儲周期性任務最后運行時間的文件的名稱,這個文件被PersistentScheduler使用。可以是相對或者絕對路徑,但是注意后綴.db可能添加到文件名后(依賴於python版本)

   也可以通過 celery beat 的 –schedule 參數進行設置

  beat_sync_every
  默認值:0.
  另一個數據庫同步發起前可以執行的周期性任務的數量。值0(默認)表示基於時間同步 - 默認是3分鍾,由scheduler.sync_every確定。如果設置成1,beat將在每個任務消息發送后發起同步。

  beat_max_loop_interval
   默認值: 0.

 

轉自:https://blog.csdn.net/libing_thinking/article/details/78812472


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM