[源碼解析] 分布式任務隊列 Celery 之啟動 Consumer


[源碼解析] 分布式任務隊列 Celery 之啟動 Consumer

0x00 摘要

Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注於實時處理的異步任務隊列,同時也支持任務調度。本文我們來說說Celery Worker Consumer 組件的啟動。

首先會概述啟動過程,然后會選擇Consumer的三個有特點的 子組件(分別是配置網絡,啟動 tasks,開啟對task的消費)進一步講解其啟動過程。這樣大家就對 Consumer 的來龍去脈了解會更加深入。

0x01 綜述

Celery Worker是執行任務的消費者,通常會在多台服務器運行多個消費者來提高執行效率。那么在Woker之中,如何從broker取得消息。這就需要一個consumer。

我們首先給出一個 consumer 的圖例,讓大家有個初步印象。

我們已經知道,Kombu實現了Producer與Consumer兩個概念。因此我們可以推論,在Celery的實現中,必然使用到Kombu的 Producer與 Consumer。

1.1 kombu.consumer

我們回憶下 kombu . consumer 的功能:

Kombu . Consumer 以及其相關類的作用主要如下:

  • Exchange:MQ 路由,消息發送者將消息發至Exchange,Exchange負責將消息分發至隊列。
  • Queue:對應的隊列抽象,存儲着即將被應用消費掉的消息,Exchange負責將消息分發Queue,消費者從Queue接收消息;
  • Consumers 是接受消息的抽象類,consumer需要聲明一個queue,並將queue與指定的exchange綁定,然后從queue里面接收消息。就是說,從用戶角度,知道了一個 exchange,就可以從中讀取消息,具體這個消息就是從 queue 中讀取的。

在具體 Consumer 的實現中,它把 queue 與 channel 聯系起來。queue 里面有一個 channel,用來訪問redis,queue 里面 也有 Exchange,Exchange 知道訪問具體 redis 哪個key(就是queue對應的那個key)。

Consumer 消費消息是通過 Queue 來消費,然后 Queue 又轉嫁給 Channel。

就是說,在 exchange,queue都配置好的情況下,channel 就知道 redis 的哪個 key 對應了 哪個 consumer。如果 connection 里面有消息,就會調用到 consumer 的回調函數

1.2 Celery Consumer

注意的是:celery Consumer 組件不是Kombu的Consumer,而是利用了Kombu的Consumer從broker取得消息。

celery Consumer 組件的概念遠遠要大於Kombu的Consumer,不只是從broker取得消息,也包括消息的消費,分發,監控,心跳等一系列功能

可以說,除了消息循環引擎 被 hub 承擔,多進程被 Pool,Autoscaler 承擔,定時任務被 timer,beat 承擔之外,其他主要功能都被 Consumer 承擔。

0x02 start in worker

我們還得再回顧下前文,當Worker初始化完成之后,worker繼續調用start。

worker.start()

其代碼如下:

def start(self):
    try:
        self.blueprint.start(self)
    ......    

因此調用到blueprint.start(blueprint負責決定各個子模塊的執行順序)。因為Consumer是worker的組件之一,從而這里調用到Consumer的start。

堆棧如下:

start, consumer.py:300
start, bootsteps.py:365
start, bootsteps.py:116
start, worker.py:204
worker, worker.py:327
caller, base.py:132
new_func, decorators.py:21
invoke, core.py:610
invoke, core.py:1066
invoke, core.py:1259
main, core.py:782
start, base.py:358
worker_main, base.py:374

0x03 start in consumer

現在我們來到了Consumer 相關部分。

代碼位於:celery/worker/consumer/consumer.py

我們知道,Consumer內部也是有自己的Steps,具體如下:

class Consumer:
    """Consumer blueprint."""

    Strategies = dict

    #: Optional callback called the first time the worker
    #: is ready to receive tasks.
    init_callback = None

    #: The current worker pool instance.
    pool = None

    #: A timer used for high-priority internal tasks, such
    #: as sending heartbeats.
    timer = None

    class Blueprint(bootsteps.Blueprint):
        """Consumer blueprint."""

        name = 'Consumer'
        default_steps = [
            'celery.worker.consumer.connection:Connection',
            'celery.worker.consumer.mingle:Mingle',
            'celery.worker.consumer.events:Events',
            'celery.worker.consumer.gossip:Gossip',
            'celery.worker.consumer.heart:Heart',
            'celery.worker.consumer.control:Control',
            'celery.worker.consumer.tasks:Tasks',
            'celery.worker.consumer.consumer:Evloop',
            'celery.worker.consumer.agent:Agent',
        ]

因此,Woker調用Consumer啟動時,就調用到Consumer的start。

具體如下:

def start(self):
    blueprint = self.blueprint
    while blueprint.state not in STOP_CONDITIONS:
        maybe_shutdown()
        if self.restart_count:
            try:
                self._restart_state.step()
            except RestartFreqExceeded as exc:
                crit('Frequent restarts detected: %r', exc, exc_info=1)
                sleep(1)
        self.restart_count += 1
        
        try:
            blueprint.start(self) # 這里是關鍵
        except self.connection_errors as exc:

如下代碼調用到blueprint.start。

blueprint.start(self)

3.1 start consumer.blueprint

代碼位於:celery-master/celery/bootsteps.py

def start(self, parent):
    self.state = RUN
    if self.on_start:
        self.on_start()
    for i, step in enumerate(s for s in parent.steps if s is not None):
        self.started = i + 1
        step.start(parent)

因此遍歷每一個step,進行start。

我們現在大致說說這些 step 的作用。

  • 【1】Connection:管理和 broker 的 Connection 連接
  • 【3】Events:用於發送監控事件
  • 【2】Agent:cell actor
  • 【2】Mingle:不同 worker 之間同步狀態用的
  • 【1】Tasks:啟動消息 Consumer
  • 【3】Gossip:消費來自其他 worker 的事件
  • 【1】Heart:發送心跳事件(consumer 的心跳)
  • 【3】Control:遠程命令管理服務

在參考文章 1: Worker 啟動流程概述中提到:

這里我對所有的 Bootstep 都做了標號處理,標號的大小說明了這些服務對於我們代碼閱讀的重要程序,1 最重要,3 最不緊要。對於 Consumer 來說,

1 是基本功能,這些功能組成了一個簡單的非強壯的消息隊列框架;

2 一般重要,可以實現一個高級一點的功能;

3 屬於附加功能,同時也屬於一點分布式的功能。

因為每一個step對應的組件其實都很復雜,所以在后續文章中,我們會詳細講解。本文只是大致說明幾個最重要的step,基本就是消息循環組件。比如 讀取broker需要的connection組件,處理消息需要的task組件

3.2 Connection step 子組件

此子組件主要是處理網絡交互

很奇怪的是,Connection 這里沒有自己的邏輯,把事情都交給 Consumer類做了。

start 的參數 c 是consumer。所以start方法調用了consumer的connect方法,也作用在 consumer 的成員變量connection。

所以此時就建立了連接。即最終會創建celery.app.amqp.Connection實例,這里實際上是使用kombu庫的Connection與隊列連接。連接建立之后,會將Connection注冊進kombu庫的Transport的事件循環中。

這樣,Consumer 就同 broker 聯系了起來。

class Connection(bootsteps.StartStopStep):
    """Service managing the consumer broker connection."""

    def start(self, c):
        c.connection = c.connect()

3.2.1 connect in consumer

代碼在:celery/worker/consumer/consumer.py。

可以看出做了如下:

  • 使用心跳為參數,創建celery.app.amqp.Connection實例,即得到kombu的Connection,若沒有連接上,則建立連接。
  • 把得到的Connection配置為異步調用。
  • 返回得到的Connection。

代碼如下:

def connect(self):
    """Establish the broker connection used for consuming tasks.
    """
    conn = self.connection_for_read(heartbeat=self.amqheartbeat)      # 心跳
    if self.hub:
        conn.transport.register_with_event_loop(conn.connection, self.hub)# 使用異步調用
    return conn   # 返回conn

def connection_for_read(self, heartbeat=None):
    return self.ensure_connected(
        self.app.connection_for_read(heartbeat=heartbeat))# 確保連接

3.2.2 connect in celery

本小節的目的就是得到 Connection。

上面app就為celery,所以現在我們來到了Celery應用。

代碼在celery/app/base.py

def connection_for_read(self, url=None, **kwargs):
    """Establish connection used for consuming.
    """
    return self._connection(url or self.conf.broker_read_url, **kwargs)

進而來到

def _connection(self, url, userid=None, password=None,
                virtual_host=None, port=None, ssl=None,
                connect_timeout=None, transport=None,
                transport_options=None, heartbeat=None,
                login_method=None, failover_strategy=None, **kwargs):
    conf = self.conf
    return self.amqp.Connection(
        url,
        userid or conf.broker_user,
        password or conf.broker_password,
        virtual_host or conf.broker_vhost,
        port or conf.broker_port,
        transport=transport or conf.broker_transport,
        ssl=self.either('broker_use_ssl', ssl),
        heartbeat=heartbeat,
        login_method=login_method or conf.broker_login_method,
        failover_strategy=(
            failover_strategy or conf.broker_failover_strategy
        ),
        transport_options=dict(
            conf.broker_transport_options, **transport_options or {}
        ),
        connect_timeout=self.either(
            'broker_connection_timeout', connect_timeout
        ),
    )

可以看到,最終無論Celery應用的Connection或者是Consumer看到的Connection,都是amqp.Connection,最終就是'kombu.connection.Connection'

這里self.amqp變量如下,可以看到都是 kombu相關。

self.amqp = {AMQP} <celery.app.amqp.AMQP object at 0x7ffd556db7f0>
 BrokerConnection = {type} <class 'kombu.connection.Connection'>
 Connection = {type} <class 'kombu.connection.Connection'>
 Consumer = {type} <class 'kombu.messaging.Consumer'>
 Producer = {type} <class 'kombu.messaging.Producer'>
 app = {Celery} <Celery tasks at 0x7ffd557f3da0>
 argsrepr_maxsize = {int} 1024
 autoexchange = {NoneType} None
 default_exchange = {Exchange} Exchange celery(direct)
 default_queue = {Queue} <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>
 kwargsrepr_maxsize = {int} 1024
 producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x7ffd56788748>
 publisher_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x7ffd56788748>
 queues = {Queues: 1} {'celery': <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>}
 queues_cls = {type} <class 'celery.app.amqp.Queues'>
 router = {Router} <celery.app.routes.Router object at 0x7ffd56799898>
 routes = {tuple: 0} ()
 task_protocols = {dict: 2} {1: <bound method AMQP.as_task_v1 of <celery.app.amqp.AMQP object at 0x7ffd556db7f0>>, 2: <bound method AMQP.as_task_v2 of <celery.app.amqp.AMQP object at 0x7ffd556db7f0>>}
 utc = {bool} True

我們得到一個'kombu.connection.Connection'

<Connection: redis://localhost:6379// at 0x7ffd567827b8>

然后會連接。

def ensure_connected(self, conn):
    # Callback called for each retry while the connection
    # can't be established.
    def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
        if getattr(conn, 'alt', None) and interval == 0:
            next_step = CONNECTION_FAILOVER
        next_step = next_step.format(
            when=humanize_seconds(interval, 'in', ' '),
            retries=int(interval / 2),
            max_retries=self.app.conf.broker_connection_max_retries)
        error(CONNECTION_ERROR, conn.as_uri(), exc, next_step)

    # remember that the connection is lazy, it won't establish
    # until needed.
    if not self.app.conf.broker_connection_retry:
        # retry disabled, just call connect directly.
        conn.connect()
        return conn

    conn = conn.ensure_connection(
        _error_handler, self.app.conf.broker_connection_max_retries,
        callback=maybe_shutdown,
    )
    return conn

堆棧如下:

ensure_connected, consumer.py:414
connection_for_read, consumer.py:405
connect, consumer.py:398
start, connection.py:21
start, bootsteps.py:116
start, consumer.py:311
start, bootsteps.py:365
start, bootsteps.py:116
start, worker.py:204
worker, worker.py:327
caller, base.py:132
new_func, decorators.py:21
invoke, core.py:610
invoke, core.py:1066
invoke, core.py:1259
main, core.py:782
start, base.py:358
worker_main, base.py:374
<module>, myTest.py:26

最終得到一個有效連接。

                                                                                  For Read
                +------------------------------------------+       +------------------------------------------------------------------------+
+--------+      |                 [Consumer]   Connection+-------> |Connection: redis://localhost:6379// class 'kombu.connection.Connection'|
| Gossip +<-----+                                          |       +------------------------------------------------------------------------+
+--------+      |                                          |          +----------+
                |                                          +--------> |          |                  ^
                |                                          |          |  Events  |                  |
+-------+       |                                          |          |          |                  |
| Tasks | <-----+         Timer  Strategies  Blueprint     |          +----------+                  |
+-------+       |                                          |                                        |
                |                                          |                                        |
+-------+       | pool        hub          loop       app  |           +-------+                    |
| Heart | <-----+  +           +            +          +   +-------->  | Agent |                    |
+-------+       |  |           |            |          |   |           +-------+                    |
                |  |           |            |          |   |           +---------+                  |
                +------------------------------------------+-------->  | Mingle  |                  |
                   |           |            |          |               +---------+                  |
                   |           |            |          |                                            |
                   v           v            v          v                                            |
                                                                                                    |
   +-----------------+   +-----+ +----------------+ +---------------+           +--------------+    |
   | prefork.TaskPool|   | Hub | | loops.asynloop | | Celery        |           |    AMQP      |    |
   +-----------------+   +-----+ +----------------+ |               |           |              |    |
                                                    |        amqp +-----------> |  Connection+------+
                                                    +---------------+           +--------------+

手機如下:

3.2.3 使用異步調用

下面代碼使用kombu庫的Connection與隊列連接。連接建立之后,會將Connection注冊進kombu庫的Transport的事件循環中。

if self.hub:
    conn.transport.register_with_event_loop(conn.connection, self.hub)  # 使用異步調用

所以最終如下:

                                                                                  For Read
                +------------------------------------------+       +------------------------------------------------------------------------+
+--------+      |                 [Consumer]   Connection+-------> |Connection: redis://localhost:6379// class 'kombu.connection.Connection'|
| Gossip +<-----+                                          |       +------------------------------------------------------------------------+
+--------+      |                                          |          +----------+
                |                                          +--------> |          |                  ^                ^
                |                                          |          |  Events  |                  |                |
+-------+       |                                          |          |          |                  |                |
| Tasks | <-----+         Timer  Strategies  Blueprint     |          +----------+                  |                |
+-------+       |                                          |                                        |                |
                |                                          |                                        |                |
+-------+       | pool        hub          loop       app  |           +-------+                    |                |
| Heart | <-----+  +           +            +          +   +-------->  | Agent |                    |                |
+-------+       |  |           |            |          |   |           +-------+                    |                |
                |  |           |            |          |   |           +---------+                  |                |
                +------------------------------------------+-------->  | Mingle  |                  |                |
                   |           |            |          |               +---------+                  |                |
                   |           |            |          |                                            |                |
                   v           v            v          v                                            |                |
                                                                                                    |                |
   +-----------------+ +-------+ +----------------+ +---------------+           +--------------+    |                |
   | prefork.TaskPool| |       | | loops.asynloop | | Celery        |           |    AMQP      |    |                |
   +-----------------+ |  Hub  | +----------------+ |               |           |              |    |                |
                       |       |                    |        amqp +-----------> |  Connection+------+                |
                       |       |                    +---------------+           +--------------+                     |
                       +---+---+                                                                                     |
                           |                                                                                         |
                           +---------------------------------------------------------------------------------------->+

手機如下:

3.3 Tasks step 子組件

因為網絡連接已經配置好,所以本部分就引入了各種 tasks,我們先分析一下loop的開啟。

c 的內容為:<Consumer: celery (running)>,因此就針對Consumer進行操作。

我們首先要介紹下 celery 的 task 思路。

Celery 啟動之后,會查找代碼中,哪些類或者函數使用了 @task注解,然后就把這些 類或者函數注冊到全局回調集合中得倒了一個 全局 set :_on_app_finalizers,這個 set 用來收集所有的 任務 tasks 類

目前 Celery 知道了有哪些 task,並且把它們收集起來放在了 on_app_finalizers,但是還不知道它們的邏輯意義。或者可以這么認為,Celery 只是知道有哪些類,但是沒有這些類的實例,也需要建立聯系。

所以,Celery 把全局回調集合 _on_app_finalizers 中的回調函數運行,得到任務的實例,然后就把它們加入到 Celery 的任務列表 tasks

這個 tasks 就是后續消費消息時候使用的。根據 客戶端提供的 task 名字 得到具體 task 實例,然后處理。

self._tasks = {TaskRegistry: 10} 
 NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>
 'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x7fb652da5fd0>
 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7fb652da5fd0>
 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7fb652da5fd0>
 'celery.group' = {group} <@task: celery.group of myTest at 0x7fb652da5fd0>
 'celery.map' = {xmap} <@task: celery.map of myTest at 0x7fb652da5fd0>
 'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7fb652da5fd0>
 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7fb652da5fd0>
 'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7fb652da5fd0>
 'myTest.add' = {add} <@task: myTest.add of myTest at 0x7fb652da5fd0>
 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7fb652da5fd0>
 __len__ = {int} 10

所以我們下一步看看如何啟動 task 組件。

3.3.1 啟動 start

Task啟動如下:

  • 更新已知的任務;
  • 獲取到 kombu . consumer,就是 c . task_consumer
  • 開始消費;

具體如下:

class Tasks(bootsteps.StartStopStep):
    """Bootstep starting the task message consumer."""

    requires = (Mingle,)

    def __init__(self, c, **kwargs):
        c.task_consumer = c.qos = None
        super().__init__(c, **kwargs)

    def start(self, c):
        """Start task consumer."""
        c.update_strategies()    # 更新已知的任務

        # - RabbitMQ 3.3 completely redefines how basic_qos works..
        # This will detect if the new qos smenatics is in effect,
        # and if so make sure the 'apply_global' flag is set on qos updates.
        qos_global = not c.connection.qos_semantics_matches_spec

        # set initial prefetch count
        c.connection.default_channel.basic_qos(
            0, c.initial_prefetch_count, qos_global,
        )   # 設置計數

        c.task_consumer = c.app.amqp.TaskConsumer(
            c.connection, on_decode_error=c.on_decode_error,
        )   # 開始消費

        def set_prefetch_count(prefetch_count):
            return c.task_consumer.qos(
                prefetch_count=prefetch_count,
                apply_global=qos_global,
            )
        c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)      # 設置計數

3.3.2 策略

關於 task 運行其實是需要一定策略的,這也可以認為是一種負載均衡。其策略如下:

SCHED_STRATEGY_FCFS = 1
SCHED_STRATEGY_FAIR = 4

SCHED_STRATEGIES = {
    None: SCHED_STRATEGY_FAIR,
    'default': SCHED_STRATEGY_FAIR,
    'fast': SCHED_STRATEGY_FCFS,
    'fcfs': SCHED_STRATEGY_FCFS,
    'fair': SCHED_STRATEGY_FAIR,
}

Celery 會配置每個任務的回調策略以及回調方法,比如:'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>

3.3.3 基礎 default 策略

我們以基礎 default 策略為例,看看其作用。在其中,會根據 task 實例 構建一個 Request,從而把 broker 消息,consumer,多進程都聯系起來。具體在 Request. execute_using_pool 這里就會和多進程處理開始關聯,比如和 comsumer 的 pool 進程池聯系起來

代碼為:

def default(task, app, consumer,
        info=logger.info, error=logger.error, task_reserved=task_reserved,
        to_system_tz=timezone.to_system, bytes=bytes, buffer_t=buffer_t,
        proto1_to_proto2=proto1_to_proto2):
    """Default task execution strategy.

    Note:
        Strategies are here as an optimization, so sadly
        it's not very easy to override.
    """
    hostname = consumer.hostname                                      # 設置相關的消費者信息
    connection_errors = consumer.connection_errors                    # 設置錯誤值
    _does_info = logger.isEnabledFor(logging.INFO)

    # task event related
    # (optimized to avoid calling request.send_event)
    eventer = consumer.event_dispatcher                                             
    events = eventer and eventer.enabled
    send_event = eventer.send
    task_sends_events = events and task.send_events

    call_at = consumer.timer.call_at
    apply_eta_task = consumer.apply_eta_task
    rate_limits_enabled = not consumer.disable_rate_limits
    get_bucket = consumer.task_buckets.__getitem__
    handle = consumer.on_task_request
    limit_task = consumer._limit_task
    body_can_be_buffer = consumer.pool.body_can_be_buffer
    Req = create_request_cls(Request, task, consumer.pool, hostname, eventer)  # 返回請求類

    revoked_tasks = consumer.controller.state.revoked

    def task_message_handler(message, body, ack, reject, callbacks,
                             to_timestamp=to_timestamp):
        if body is None:
            body, headers, decoded, utc = (
                message.body, message.headers, False, True,
            )
            if not body_can_be_buffer:
                body = bytes(body) if isinstance(body, buffer_t) else body
        else:
            body, headers, decoded, utc = proto1_to_proto2(message, body)  # 解析接受的數據

        req = Req(
            message,
            on_ack=ack, on_reject=reject, app=app, hostname=hostname,
            eventer=eventer, task=task, connection_errors=connection_errors,
            body=body, headers=headers, decoded=decoded, utc=utc,
        )                                                                  # 實例化請求
        if (req.expires or req.id in revoked_tasks) and req.revoked():
            return

        if task_sends_events:
            send_event(
                'task-received',
                uuid=req.id, name=req.name,
                args=req.argsrepr, kwargs=req.kwargsrepr,
                root_id=req.root_id, parent_id=req.parent_id,
                retries=req.request_dict.get('retries', 0),
                eta=req.eta and req.eta.isoformat(),
                expires=req.expires and req.expires.isoformat(),
            )                                           # 如果需要發送接受請求則發送

        if req.eta:                                     # 時間相關處理
            try:
                if req.utc:
                    eta = to_timestamp(to_system_tz(req.eta))
                else:
                    eta = to_timestamp(req.eta, timezone.local)
            except (OverflowError, ValueError) as exc:
                req.reject(requeue=False)
            else:
                consumer.qos.increment_eventually()
                call_at(eta, apply_eta_task, (req,), priority=6)
        else:
            if rate_limits_enabled:                             # 速率限制
                bucket = get_bucket(task.name)
                if bucket:
                    return limit_task(req, bucket, 1)
            task_reserved(req)                                 
            if callbacks:
                [callback(req) for callback in callbacks] 
            handle(req)                                          # 處理接受的請求

    return task_message_handler

此時處理的handler就是在consumer初始化的時候傳入的w.process_task,

def _process_task(self, req):
    """Process task by sending it to the pool of workers."""
    try:
        req.execute_using_pool(self.pool)
    except TaskRevokedError:
        try:
            self._quick_release()   # Issue 877
        except AttributeError:
            pass

具體可以看到 Request. execute_using_pool 這里就會和多進程處理開始關聯,比如和 comsumer 的 pool 進程池聯系起來。

3.3.4 更新已知任務策略

啟動時候,會調用 update_strategies 更新已知的任務策略。

class Tasks(bootsteps.StartStopStep):
    """Bootstep starting the task message consumer."""

    def start(self, c):
        """Start task consumer."""
        c.update_strategies()

代碼如下:

def update_strategies(self):
    loader = self.app.loader
    for name, task in self.app.tasks.items():
        self.strategies[name] = task.start_strategy(self.app, self)
        task.__trace__ = build_tracer(name, task, loader, self.hostname,
                                      app=self.app)

self.app.tasks為應用啟動時收集的任務。此時需要再次看看是否需要更新策略。

變量如下:

self.app.tasks = {TaskRegistry: 10} 
 NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>
 'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x7ffe3ff08198>
 'myTest.add' = {add} <@task: myTest.add of myTest at 0x7ffe3ff08198>
 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7ffe3ff08198>
 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7ffe3ff08198>
 'celery.group' = {group} <@task: celery.group of myTest at 0x7ffe3ff08198>
 'celery.map' = {xmap} <@task: celery.map of myTest at 0x7ffe3ff08198>
 'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7ffe3ff08198>
 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7ffe3ff08198>
 'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7ffe3ff08198>
 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7ffe3ff08198>
 __len__ = {int} 10
self = {Consumer} <Consumer: celery (running)>

此時我們繼續查看task.start_strategy函數,

def start_strategy(self, app, consumer, **kwargs):
    return instantiate(self.Strategy, self, app, consumer, **kwargs)    # 生成task實例

操作之后,得到 strategies 如下,里面都是每個任務的callback方法,目前都是 task_message_handler,就是在這里,會 根據 task 實例 構建一個 Request,從而把 broker 消息,consumer,多進程都聯系起來

strategies = {dict: 10} 
 'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>
 'celery.backend_cleanup' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878400>
 'celery.chord_unlock' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878598>
 'celery.group' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878840>
 'celery.map' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878ae8>
 'celery.chain' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878d90>
 'celery.starmap' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b0d0>
 'celery.chord' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b378>
 'myTest.add' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b620>
 'celery.accumulate' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b8c8>
 __len__ = {int} 10

邏輯為(因為Consumer成員變量太多,為了畫圖清晰,所以省略了部分變量):

                                      +-----------------------+                      +---------------------------+
                                      | Celery                |                      | Consumer                  |
                                      |                       |                      |                           |
                                      |            consumer +--------------------->  |                           |            +---------------+
                                      |                       |                      |        task_consumer +---------------> | amqp.Consumer |
                                      |             _tasks    |                      |                           |            +---------------+
                                      |                +      |                      |                           |
                                      |                |      |                      |        strategies +----------------+
                                      +-----------------------+                      |                           |        |
                                                       |                             |                           |        |
                                                       |                             +---------------------------+        |
                                                       |                                                                  v
                                                       v
+------------------------------------------------------+-------------------------------------+  +-----------------------------------------------------------------------------+
|                                                                                            |  | strategies = {dict: 10}                                                     |
|                          TaskRegistry                                                      |  |  'celery.chunks' = function default.<locals>.task_message_handler           |
|                                                                                            |  |  'celery.backend_cleanup' = function default.<locals>.task_message_handler  |
|   NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>                         |  |  'celery.chord_unlock' = function default.^locals>.task_message_handler     |
|   'celery.chunks' = {chunks} <@task: celery.chunks of myTest>                              |  |  'celery.group' = function default.<localsv.task_message_handler            |
|   'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest >  |  |  'celery.map' = function default.<locals>.task_message_handler              |
|   'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest>            |  |  'celery.chain' = function default.<locals>.task_message_handler            |
|   'celery.group' = {group} <@task: celery.group of myTest>                                 |  |  'celery.starmap' = function default.<locals>.task_message_handler          |
|   'celery.map' = {xmap} <@task: celery.map of myTest>                                      |  |  'celery.chord' = function default.<locals>.task_message_handler            |
|   'celery.chain' = {chain} <@task: celery.chain of myTest>                                 |  |  'myTest.add' = function default.<locals^.task_message_handler              |
|   'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest>                          |  |  'celery.accumulate' = function default.vlocals>.task_message_handler       |
|   'celery.chord' = {chord} <@task: celery.chord of myTest>                                 |  |                                                                             |
|   'myTest.add' = {add} <@task: myTest.add of myTest>                                       |  +-----------------------------------------------------------------------------+
|   'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest>                  |
|                                                                                            |
+--------------------------------------------------------------------------------------------+

手機上如圖,主要就是 tasks,以及其對於的 strategies

3.3.5 消費

如下代碼就把 task 和 對應的 kombu consumer 聯系起來。

c.task_consumer = c.app.amqp.TaskConsumer(
    c.connection, on_decode_error=c.on_decode_error,
)

c.app.amqp.TaskConsumer的作用就是返回一個 Consumer。可以看出來,返回的為 c.task_consumer,即 kombu . Consumer,也配置了c.connection進去

  • <Consumer: [<Queue celery -> <Exchange celery(direct) bound to chan:1> -> celery bound to chan:1>]>
  • <Connection: redis://localhost:6379>

於是,Celery 的 Consumer 組件就和 kombu的 consumer 聯系起來。既,celery.consumer.task_consumer 就是一個 kombu consumer,而且此時 這個 kombu . consumer 已經和 channel 聯系了起來,當 connection 之中有消息,就會回調到 kombu . consumer。我們下面就能看到如何使用

代碼如下:

from kombu import Connection, Consumer, Exchange, Producer, Queue, pools

class AMQP:
    """App AMQP API: app.amqp."""

    Connection = Connection
    Consumer = Consumer
    Producer = Producer

    def TaskConsumer(self, channel, queues=None, accept=None, **kw):
        if accept is None:
            accept = self.app.conf.accept_content
        return self.Consumer(
            channel, accept=accept,
            queues=queues or list(self.queues.consume_from.values()),
            **kw
        )

邏輯如下(因為Consumer成員變量太多,為了畫圖清晰,所以省略了部分變量):

 +--------------------------------+
 |  [Consumer]                    |                        For Read
 |                                |       +----------------------------------------+
 |                    Connection+-------> | <Connection: redis://localhost:6379//> |
 |                                |       +----------------------------------------+
 |                                |
 |                                |       +--------+
 |                                +-----> | Tasks  |
 |                                |       +--------+
 |                                |
 |   app           task_consumer+--------------------------------------------------------------------------------------+
 |    +                           |                                                                                    |
 |    |                           |                                                                                    |
 |    |                           |                                                                                    |
 +--------------------------------+                                                                                    |
      |                                  +-------------------------------------+                                       |
      |                                  |                celery.app.amqp.AMQP |                                       |
      |                                  |                                     |                                       |
+-----+----------------------+           |                                     |                                       |
| Celery                     |           |                   BrokerConnection +------->  kombu.connection.Connection   |
|                            |           |                                     |                                       |
|                      amqp+------------>+                   Connection       +------->  kombu.connection.Connection   |
|                            |           |                                     |                                       |
|                            |           |                   Consumer         +------->  kombu.messaging.Consumer <----+
+----------------------------+           |                                     |
                                         |                   Producer         +------->  kombu.messaging.Producer
                                         |                                     |
                                         |                   producer_pool    +------->  kombu.pools.ProducerPool
                                         |                                     |
                                         |                   queues           +------->  celery.app.amqp.Queues
                                         |                                     |
                                         |                   router           +------->  celery.app.routes.Router
                                         +-------------------------------------+

手機如圖

3.4 event loop 子組件

上一個子組件配置了各種 task,本子組件就開啟了對應 tasks 的消費。代碼位置在: celery/worker/consumer/consumer.py。

對應了 'celery.worker.consumer.consumer:Evloop' 這個 step。

[<step: Connection>, <step: Events>, <step: Mingle>, <step: Gossip>, <step: Tasks>, <step: Control>, <step: Heart>, <step: event loop>]

class Evloop(bootsteps.StartStopStep):
    """Event loop service.

    Note:
        This is always started last.
    """

    label = 'event loop'
    last = True

    def start(self, c):
        self.patch_all(c)
        c.loop(*c.loop_args())

    def patch_all(self, c):
        c.qos._mutex = DummyLock()

其實我們發現,這里就是調用了 consumer中的loop函數進行循環。

3.4.1 loop in consumer

代碼位於 celery/worker/loops.py。

consumer中的loop函數就是位於celery/worker/loops.py中的 asynloop 函數。

init有如下代碼配置了loop函數:

if not hasattr(self, 'loop'):
    self.loop = loops.asynloop if hub else loops.synloop

此時具體如下(因為Consumer成員變量太多,為了畫圖清晰,所以省略了部分變量):

+--------------------------------+       +--------+
| [Consumer]                     +-----> | Evloop |
|                                |       +--------+
|                                |          +--------------------------+
|                                |          |                on_tick +-------->  Transport.register_with_event_loop
|                                +------->  | Hub                      |
|                                |          |                poller +--------->  kombu.utils.eventio._poll
|                                |          |                          |
|                                |          |                readers   |
|                                |          |                          |
|  create_task_handler     loop +----------------------->  create_loop+------->  loops.asynloop
|                                |          |                          |
|                                |          +--------------------------+
|   app                          |
|    +                           |
|    |            task_consumer +-------------------------------------------------------------------------->+
|    |                           |                                                                          |
+--------------------------------+                                                                          |
     |                                                                                                      |
     |                                                                                                      |
     |                                                                                                      |
+----+---------+                                                                                            |
| Celery       |       +-------------------------------------+                                              |
|              |       | celery.app.amqp.AMQP                |                                              |
|              |       |                                     |                                              |
|              |       |                                     |                                              |
|              |       |                   BrokerConnection +------->  kombu.connection.Connection          |
|              |       |                                     |                                              |
|        amqp+-------->+                   Connection       +------->  kombu.connection.Connection          |
|              |       |                                     |                                              |
+--------------+       |                   Consumer         +------->  kombu.messaging.Consumer  <----------+
                       |                                     |
                       |                   Producer         +------->  kombu.messaging.Producer
                       |                                     |
                       |                   producer_pool    +------->  kombu.pools.ProducerPool
                       |                                     |
                       |                   queues           +------->  celery.app.amqp.Queues
                       |                                     |
                       |                   router           +------->  celery.app.routes.Router
                       +-------------------------------------+

手機為:

3.4.2 配置 kombu.consumer

前面有如下代碼:

c.loop(*c.loop_args())

注意這里用到的為 self.task_consumer,即 kombu . Consumer,也配置了c.connection進去。

def loop_args(self):
    return (self, self.connection, self.task_consumer,
            self.blueprint, self.hub, self.qos, self.amqheartbeat,
            self.app.clock, self.amqheartbeat_rate)

此時邏輯如下(因為Consumer成員變量太多,為了畫圖清晰,所以省略了部分變量):

+--------------------------------+          +--------+
| [Consumer]                     +--------> | Evloop |
|                                |          +--------+
|                                |          +--------------------------+
|                                |          |                on_tick +-----+-->  Transport.register_with_event_loop
|                                +------->  | Hub                      |   |
|                                |          |                          |   +-->  AsynPool._create_write_handlers.<locals>.on_poll_start
|                                |          |                          |                                                       +
|                                |          |                          |                                                       |
|                                |          |                          |                                                       v
|  create_task_handler           |          |                          |                                      iterate_file_descriptors_safely
|                                |          |                poller +--------->  kombu.utils.eventio._poll                     ^
|                                |          |                          |                       +                               |
|   app                    loop +----------------------->  create_loop+-------> loops.asynloop |                               |
|    +                           |          |                          |                       |             +-----+           |
|    |       task_consumer       |          +--------------------------+                       +---------->  | fd  |  +--------+
|    |            +              |                                                                           +-----+
|    |            |              |
|    |            |              |          +--------------------------------------+
|    |            | Connection +----------> | <Connection: redis://localhost:6379> |
|    |            |              |          +--------------------------------------+
+--------------------------------+
     |            |                                    ^
     |            |                                    |
     |            v                                    |
+----+----+  +----+-------------------------+          |
| Celery  |  | kombu . Consumer             |          |
|         |  |                              |          |
|         |  |                   channel+--------------+
+---------+  |                              |
             +------------------------------+

手機上如下:

)

3.4.3 啟動消費

在 asynloop 中,會:

  • 設置消息處理(解析消息並執行)函數,就是說,真正的消息處理(解析消息並執行)的邏輯是這個create_task_handler;

  • 設置 kombu . consumer 的消息回調函數,就是說,on_task_received 就是最后接受消息的函數。

  • 調用 hub.create_loop() 得到執行引擎;

  • 調用 next(loop) 執行引擎;

def asynloop(obj, connection, consumer, blueprint, hub, qos,
             heartbeat, clock, hbrate=2.0):
    """Non-blocking event loop."""
    RUN = bootsteps.RUN
    update_qos = qos.update
    errors = connection.connection_errors

    on_task_received = obj.create_task_handler() # 設置消息處理(解析消息並執行)函數

    _enable_amqheartbeats(hub.timer, connection, rate=hbrate)

    consumer.on_message = on_task_received
    obj.controller.register_with_event_loop(hub)
    obj.register_with_event_loop(hub)
    consumer.consume()
    obj.on_ready()

    loop = hub.create_loop()

    try:
        while blueprint.state == RUN and obj.connection:
            state.maybe_shutdown()

            # We only update QoS when there's no more messages to read.
            # This groups together qos calls, and makes sure that remote
            # control commands will be prioritized over task messages.
            if qos.prev != qos.value:
                update_qos()

            try:
                next(loop)
            except StopIteration:
                loop = hub.create_loop()
    finally:
        try:
            hub.reset()
        except Exception as exc:  # pylint: disable=broad-except
            logger.exception(
                'Error cleaning up after event loop: %r', exc)

至此,異步Loop就開啟了,然后就開始了服務端的事件等待處理。下一篇文章我們將分析 task 概念以及實現。

0xFF 參考

Celery 源碼解析一:Worker 啟動流程概述

Celery 源碼解析二:Worker 的執行引擎

Celery 源碼解析三:Task 對象的實現

Celery 源碼解析四:定時任務的實現

Celery 源碼解析五:遠程控制管理

Celery 源碼解析六:Events 的實現

Celery 源碼解析七:Worker 之間的交互

Celery 源碼解析八:State 和 Result


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM