Celery-4.1 用戶指南: Routing Tasks (路由任務)


注意:
   像主題和扇出之類的路由概念並不對所有傳輸介質都可用,請翻閱”傳輸比較表”。

基礎


自動路由


路由最簡單的方式是使用 task_create_missing_queues 設置(默認啟用)。

使用這個設置,一個還沒有在 task_queues 中定義的有名隊列將會自動被創建。這使得進行簡單的路由任務非常容易。

假如你有兩台服務器,x 和 y 處理常規任務,還有一台服務器 z,只處理feed消息源相關的任務。你可以使用這個配置:

task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}

使用這個路由使得導入消息源任務被路由到feeds隊列,而所有其他任務都將路由到默認隊列(由於歷史原因默認隊列名為 celery)。

或者,你可以使用glob模式匹配,甚至可以用正則表達式,來匹配feed.tasks命名空間里的所有任務:

app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}

如果匹配模式的順序很重要,你應該以項列表的格式聲明路由:

task_routes = ([
    ('feed.tasks.*', {'queue': 'feeds'}),
    ('web.tasks.*', {'queue': 'web'}),
    (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)

注意:
  task_routes 設置可以是一個字典,或者一個路由對象的列表,所以在上述情況下,你需要以一個包含列表的元組的方式聲明 task_routes

安裝好路由器后,你可以啟動服務器 z 用來專門處理 feeds 消息源隊列:

user@z:/$ celery -A proj worker -Q feeds

你可以聲明你需要的多個隊列,所以你也可以讓你的服務器處理默認隊列的消息:

user@z:/$ celery -A proj worker -Q feeds,celery

修改默認隊列的名稱

你可以使用下列配置修改默認隊列的名稱:

app.conf.task_default_queue = 'default'

隊列是如何被定義的

這個特性的重點在於為只有基本需求的用戶隱藏了復雜的 AMQP 協議。但是 - 你可能對隊列是如何聲明的仍然感興趣。

一個名為 video 的隊列將使用下列配置創建:

{'exchange': 'video',
'exchange_type': 'direct',
'routing_key': 'video'} 

AMQP 后端如 Redis 或者 SQS 不支持消息交換器,所以他們需要消息交換器與隊列同名。使用這種設計使得它可以在不會吃消息交換器的后端也能正常工作。

手動路由

假如你又兩台服務器 x 和 y 處理常規任務,另外一台服務器 z,用來專門處理消息源相關的任務,你可以使用如下配置:

from kombu import Queue

app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)
task_default_exchange = 'tasks'
task_default_exchange_type = 'topic'
task_default_routing_key = 'task.default'

task_queue 是一個隊列實例的列表。如果你沒有為一個 key 設置消息交換器或者交換器類型,這些信息將從 task_default_exchangetask_default_exchange_type 配置中獲取。

路由一個任務到 feed_tasks 隊列,你可以在 task_routes 配置種添加一個項:

task_routes = {
        'feeds.tasks.import_feed': {
            'queue': 'feed_tasks',
            'routing_key': 'feed.import',
        },
}

你可以使用Task.apply_async()方法或者 send_task() 方法的 routing_key 參數覆蓋這個路由行為:

>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
...                         queue='feed_tasks',
...                         routing_key='feed.import')

使服務器 z 只從 feed_tasks 隊列獲取消息,你可以啟動工作單元時使用 -Q 選項:

user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h

 服務器 x 和 y 必須配置成從default隊列獲取消息:

user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h

如果你願意,你甚至可以讓的消息源處理工作單元也處理常規任務,也許在有許多常規任務的時候:

user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h

如果你想添加在另外一個消息交換器上一個隊列,只要聲明自定義消息交換器及它的類型即可。

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('feed_tasks',    routing_key='feed.#'),
    Queue('regular_tasks', routing_key='task.#'),
    Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
                           routing_key='image.compress'),
)

如果你對這些術語有不清楚的地方,你應該去看看 AMQP

另見:
處理下面的 AMQP Primer,還有 Rabbits and Warrens 這個講述隊列和消息交換的優秀的博客。另外,還有一個 CloudAMQP tutorial,對於 RabbitMQ 用戶來說, RabbitMQ FAQ 將是非常有用的。

特殊的路由選項


RabbitMQ 消息優先級


supported transports:
RabbitMQ

4.0版本新特性。

隊列可以通過設置 x-max-priority 參數支持優先級:

from kombu import Exchange, Queue

app.conf.task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks',
          queue_arguments={'x-max-priority': 10},
]

所有隊列的優先級默認值使用 task_queue_max_priority 設置:

app.conf.task_queue_max_priority = 10

AMQP Primer


消息


消息包含消息頭和消息體。Celery 使用消息頭存儲消息的內容類型和內容編碼。內容類型通常是消息使用的序列化格式。消息體包含要執行的任務的名稱,任務的id(UUID),任務函數的參數以及一個附加的元信息 - 如重試次數或者 ETA。

下面是一個使用 python 字典表示的任務消息的示例:

{'task': 'myapp.tasks.add',
 'id': '54086c5e-6193-4575-8308-dbab76798756',
 'args': [4, 4],
 'kwargs': {}}

生產者,消費者,消息中間件


發送消息的客戶端通常被稱為發布者,或者生產者,而接收消息的實體被稱為消費者。

消息中間件是一個消息服務器,它將消息從生產者路由到消費者。

下面這些術語在 AMQP 相關的文檔里經常能看到。

Exchanges, 隊列, 路由鍵


  1. 消息是發送給消息交換器
  2. 消息交換器將消息路由到一個或者多個隊列。有幾種不同的消息交換器類型,他們提供不同的路由方式,或者實現不同的消息場景
  3. 消息在隊列中等待指導有人消費它
  4. 當消息被確認它將從隊列中刪除

收發消息的必要步驟包括:
1. 創建一個消息交換器
2. 創建一個隊列
3. 將隊列綁定到消息交換器

Celery 自動創建 task_queues 中定義的隊列所需要的實體(除非隊列的 auto_declare 設置為 False)。

下面是隊列配置示例包含三個隊列;Video 處理一個,images 處理一個,以及其他處理的 default 隊列:

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('videos',  Exchange('media'),   routing_key='media.video'),
    Queue('images',  Exchange('media'),   routing_key='media.image'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'

Exchange 類型


消息交換器類型定義了消息怎樣通過消息交換器路由。標准的消息交換器類型有 direct,topic,fanout以及headers。另外,非標准的消息交換器類型可以通過 RabbitMQ 插件的方式使用,例如 Michael Bridgen 寫的 last-value-cache plugin

Direct exchanges


直接消息交換類型通過精確的路由鍵匹配實現路由,所以一個被路由鍵 video 綁定的隊列只能收到這個帶這個路由鍵的消息。

Topic exchanges


主題消息交換類型使用 . 分隔單詞,wild-card 字符 *(匹配整個詞),字符#(匹配零個或多個詞) 的方式匹配路由鍵。

對於類似 usa.news, usa.weather, norway.news, 以及 norway.weather 的路由鍵,綁定可以是 *.news(all news),usa.# (all items in the USA),or usa.weather (all USA weather items)

相關 API 命令


  • exchange.declare(exchange_name, type, passive,
    durable, auto_delete, internal)

  使用名稱聲明一個消息交換器。

  查看 amqp:Channel.exchange_聲明。

關鍵字參數:

  passive – 被動意味着消息交換器不會被自動創建,你可以使用它來檢測消息交換器是否存在。
  durable – Durable 消息交換器是持久的 (即, 消息中間件重啟他們還存在)。
  auto_delete – 這意味着如果沒有隊列使用它,消息中間件將會自動刪除它。

  • queue.declare(queue_name, passive, durable, exclusive, auto_delete)
    使用名稱聲明一個隊列。

  查看 amqp:Channel.queue_聲明。

專用隊列只能在當前連接中被消費。專用也意味着 auto_delete

  • queue.bind(queue_name, exchange_name, routing_key)
    使用路由鍵將一個隊列綁定到一個消息交換器。

未綁定的隊列不會收到消息,所以這是必須的。

查看 amqp:Channel.queue_bind。

  • queue.delete(name, if_unused=False, if_empty=False)
    Deletes a queue and its binding.

查看 amqp:Channel.queue_delete

exchange.delete(name, if_unused=False)
Deletes an exchange.

查看 amqp:Channel.exchange_delete

注意:
聲明並不意味着“創建”。當你聲明,你只是斷言這個實體存在並且是可操作的。沒有規定說誰應該創建 exchange/queue/binding,不論是消費者或者生產者。通常誰先需要它誰就創建它。

API 動手實踐


Celery 有一個工具 celery amqp 用來從命令行訪問 AMQP API,使得可以訪問管理員的任務如創建/刪除隊列以及消息交換器,刪除隊列消息或者發送消息。對於非 AMQP 消息中間件它也可以使用,但是不同的實現可能沒有實現所有的命令。

你可以直接在 celery amqp 的命令行參數中編寫命令,或者不帶任何參數啟動進入到交互模式:

$ celery -A proj amqp
-> connecting to amqp://guest@localhost:5672/.
-> connected.
1>

  這里 1> 是一個提示符。數字 1 表示你當前已經執行的命令。鍵入 help 獲取可用命令的列表,它還支持自動補全,所以你可以開始鍵入命令,然后按tab鍵顯示可用的匹配。

下面創建一個隊列,你可以發送消息給它:

$ celery -A proj amqp
1> exchange.declare testexchange direct
ok.
2> queue.declare testqueue
ok. queue:testqueue messages:0 consumers:0.
3> queue.bind testqueue testexchange testkey
ok.

這里創建了一個直接類型的消息交互器 testexchange,以及一個名為 testqueue 的隊列。這個隊列使用路由鍵 testkey 綁定到消息交換器。

從此以后,所有帶路由鍵 testkey發送到消息交換器 testexchange 的消息都將遞送到這個隊列。你可以使用 basic.publish 命令發送一個消息:

4> basic.publish 'This is a message!' testexchange testkey
ok.

現在消息已經發送,你可以獲取它。你可以使用 basic.get 命令,它將以異步的方式從隊列中獲取消息(對於維護任務這是可行的,但是對於服務,你應該使用 basic.consume)。

從隊列中取出一個消息:

5> basic.get testqueue
{'body': 'This is a message!',
 'delivery_info': {'delivery_tag': 1,
                   'exchange': u'testexchange',
                   'message_count': 0,
                   'redelivered': False,
                   'routing_key': u'testkey'},
 'properties': {}}

AMQP 使用確認機制來表示一個消息已經收到並且被成功處理。如果消息沒有被確認並且消費者通道關閉,那么消息將重新遞送到另一個消費者。

注意上述結構中的 delivery_tag, 在一個連接通道中,每個接收到的消息都有唯一的一個 delivery_tag,這個標記是用來確認消息的。另外,注意 delivery_tag 在不同連接通道中不是唯一的,所以在另一個客戶端,遞送標記 1 可能指向不同於這個通道的另一個消息。

你可以使用 basic.ack 確認你收到的消息:

6> basic.ack 1
ok.

清理我們測試會話的環境,你應該刪除掉你創建的實體:

7> queue.delete testqueue
ok. 0 messages deleted.
8> exchange.delete testexchange
ok.

路由任務


定義隊列


在 Celery 中,可用的隊列是通過 task_queue 設置的。

下面是隊列配置示例包含三個隊列;Video 處理一個,images 處理一個,以及其他處理的 default 隊列:

default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')

app.conf.task_queues = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image')
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'

這里,task_default_queue 將會被用來路由沒有顯示路由的任務。

默認消息交互器、消息交換類型以及路由鍵將會用作任務的默認路由值,並且作為 task_queues 中定義的隊列的默認配置值。

一個隊列多個綁定也是支持的。下面示例中兩個路由鍵都綁定到了同一個隊列:

from kombu import Exchange, Queue, binding

media_exchange = Exchange('media', type='direct')

CELERY_QUEUES = (
    Queue('media', [
        binding(media_exchange, routing_key='media.video'),
        binding(media_exchange, routing_key='media.image'),
    ]),
)

聲明任務目的地


任務的目的是由下列因素決定(按順序)
1. task_routes 中定義的路由
2. Task.apply_async() 方法的路由參數
3. Task 本身定義的路由相關屬性

最佳實踐是不寫硬編碼這些設置,而是通過 Routers 將它作為配置選項;這是最靈活的方式,但是合理的默認值仍然可以設置稱任務屬性。

路由器


路由器是一個決定任務路由選項的函數。

定義一個路由器,你只需要定義簽名未 (name, args, kwargs, options, task=None, **kw) 的函數:

def route_task(name, args, kwargs, options, task=None, **kw):
        if name == 'myapp.tasks.compress_video':
            return {'exchange': 'video',
                    'exchange_type': 'topic',
                    'routing_key': 'video.compress'}

如果你返回隊列鍵,它將使用 ·task_queue 中該隊列的設置擴展:

{'queue': 'video', 'routing_key': 'video.compress'}

擴展為 ->

{'queue': 'video',
 'exchange': 'video',
 'exchange_type': 'topic',
 'routing_key': 'video.compress'}

你可以通過將路由添加到 task_routes 設置中來安裝路由類:

task_routes = (route_task,)

路由函數還可以通過名稱來添加:

task_routes = ('myapp.routers.route_task',)

對於上述這種簡單的任務名稱->路由的映射,你可以在 task_routes 設置中使用一個字典來達到同樣的效果:

task_routes = {
    'myapp.tasks.compress_video': {
        'queue': 'video',
        'routing_key': 'video.compress',
    },
}

路由器將按順序被遍歷,直到遇到第一個返回真值的路由器,並使用它作為任務的最終路由。

你可以在一個序列中定義多個路由器:

task_routes = [
    route_task,
    {
        'myapp.tasks.compress_video': {
            'queue': 'video',
            'routing_key': 'video.compress',
    },
]

路由器將被按順序訪問,首先返回值的將被選中。

廣播


Celery 還支持廣播路由。下列消息交換器 broadcast_task 將任務的拷貝遞送到連接它的所有工作單元:

from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
    'tasks.reload_cache': {
        'queue': 'broadcast_tasks',
        'exchange': 'broadcast_tasks'
    }
}

現在,tasks.reload_cache 任務將遞送到所有從這個隊列消費的工作單元。

下面是另一個廣播路由的示例,這次使用的是 celery beat 調度器:

from kombu.common import Broadcast
from celery.schedules import crontab

app.conf.task_queues = (Broadcast('broadcast_tasks'),)

app.conf.beat_schedule = {
    'test-task': {
        'task': 'tasks.reload_cache',
        'schedule': crontab(minute=0, hour='*/3'),
        'options': {'exchange': 'broadcast_tasks'}
    },
}

 廣播結果:


注意 Celery 結果沒有定義如果兩個任務有相同的任務 ID 將發生什么。如果相同的任務分發到多於一個工作單元,那么狀態歷史可能不會保留。

這種情況下,設置 task.ignore_result 屬性是一個不錯的注意。

 

轉自:https://blog.csdn.net/libing_thinking/article/details/78587375


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM