celery源碼解讀


 

Celery啟動的入口:

文件:Celery/bin/celery.py

 

看下main函數做了什么事

可以看到主要做了幾個事根據-P參數判斷是否需要打patch,如果是gevent或者eventlet則要打對應的補丁。

然后執行命令行邏輯

 

 

可以看到,這邊取出系統參數

然后執行基類Command的execute_from_commandline

文件:celery/bin/base.py

 

setup_app_from_commandline是核心函數,作用是獲得我們的app對象和獲得我們的配置參數

文件:Celery/bin/celery.py

 

這邊主要獲取啟動類別及啟動參數,我們的類別是worker所以:

 

這邊是開始准備啟動對應類別的對象,worker、beat等。

self.commands是支持的命令:

 

上面我們知道,我們的類型是worker,即celery.bin.worker.worker,初始化該類,然后執行run_from_argv函數

文件:celery/bin/worker.py

 

最后一行會執行到父類的__call__函數,

文件:celery/bin/base.py

 

這邊主要執行的是run函數

 

這個函數主要是啟動worker

 

終於進入worker了,現在這里涉及一些比較關鍵的東西了,

文件:celery/worker/__init__.py  

WorkController類里,是worker的基類

 

 

這是worker的藍圖,這邊會形成一個依賴圖,是啟動的必要組件,分別負責worker的一部分任務,比較重要的幾個:

Timer用於執行定時任務的 Timer,和 Consumer 那里的 timer 不同

HubEvent loop 的封裝對象

Pool構造各種執行池(線程/進程/協程)的

Beat創建Beat進程,不過是以子進程的形式運行(不同於命令行中以beat參數運行)

 

文件:celery/apps/worker.py

 

文件:celery/apps/trace.py

 

文件:celery/app/base.py

 

init_before開始,這邊是最主要的,即綁定所有的task到我們的app,注冊task在下面

 

 

 

 

每個task都有delayapply_async函數,這個可以用來幫我們啟動任務。

 

文件:celery/worker/__init__.py

 

這邊是設置關注及不關注的隊列,可以看到,celery支持ampq協議。

 

調用setup_includes安裝一些通過CELERY_INCLUDE配置的模塊,保證所有的任務模塊都導入了

最后初始化藍圖,並進行apply完成藍圖各個step的依賴關系圖的構建,並進行各個組件的初始化,依賴在component中已經標出

 

這個requires就是依賴,說明hub依賴timer,上面藍圖聲明的組件都有互相依賴關系。

回到文件:celery/worker/__init__.py執行start

 

執行的是藍圖的start

 

分別執行各個步驟的start,在apply時,會判斷step是否需要start,不start但是仍要create

 

通過啟動日志看,worker啟動的stepPool,和Consumer

如果換成prefork方式起,worker會多起hubautoscaler兩個step

 

Hub依賴Timer,我們用gevent,所以include_iffalse,這個不需要start

Hub創建時候引用的kombuHub組件,Connection會注冊到HubConnection是各種類型連接的封裝,對外提供統一接口

Queue依賴Hub,這邊是基於Hub創建任務隊列

下面是我們的worker啟動的step其中的一個,重點進行說明

初始化線程/協程池,是否彈縮,最大和最小並發數

 

Celery支持的幾種TaskPool

 

我們是gevent,所以這邊直接找gevent的代碼。

 

這邊直接引用geventPool

 

下面看worker啟動的第二個step

可以看到,這邊啟動的是celery.worker.consumer.Consumer,這邊就會涉及另一個重要的藍圖了。

文件:celery/worker/consumerConsumer

 

這是Consumer的藍圖,

 

Consumer啟動的stepConnectioneventsmingleGossipTasksContorlHeartevent loop

 

__init__初始化一些必要的組件,很多都是之前worker創建的。

然后執行blueprintapply,做的事我worker之前是一樣的。

 

執行Consumerstart,也就是執行blueprintstart

啟動的step的基本功能:

Connection:管理和brokerConnection連接

Mingle:不同worker之間同步狀態用的

Tasks:啟動消息Consumer

Gossip:消費來自其他worker的事件

Heart:發送心跳事件(consumer的心跳)

Control:遠程命令管理服務

其中ConnectionTasksHeartevent loop是最重要的幾個。

先看Connection

 

使用了consumerconnect()

 

Conn引用了ampqconnectionampqConnection是直接使用的kombuConnection,上面說過,這個Connection是各種支持的類型(如redisrabbitMQ等)的抽象,對外提供統一接口。

如果hub存在,會將連接注冊到event loop

再看Tasks

 

這邊引用的ampqTaskConsumerampqTaskConsumer繼承了kombuConsumer

可以看到,在關鍵的幾個地方,celery都引用了kombuKombu對所有的MQ進行抽象,然后通過接口對外暴露出一致的APIRedis/RabbitMQ/MongoDB),KombuMQ的抽象如下:

Message:生產消費的基本單位,就是一條條消息

Connection:對 MQ 連接的抽象,一個 Connection 就對應一個 MQ 的連接

Transport:真實的 MQ 連接,也是真正連接到 MQ(redis/rabbitmq) 的實例

Producers: 發送消息的抽象類

Consumers:接受消息的抽象類

ExchangeMQ 路由,這個和 RabbitMQ 差不多,支持 5種 類型

Queue:對應的 queue 抽象,其實就是一個字符串的封裝

Hub是一個eventloopConnection注冊到Hub,一個Connection對應一個HubConsumer綁定了消息的處理函數,每一個Consumer初始化的時候都是和Channel綁定的,也就是說我們Consumer包含了Queue也就和Connection關聯起來了,Consumer消費消息是通過Queue來消費,然后Queue又轉嫁給Channel再轉給connectionChannelAMQPMQ的操作的封裝,ConnectionAMQP對連接的封裝那么兩者的關系就是對MQ的操作必然離不開連接,但是,Kombu並不直接讓Channel使用Connection來發送/接受請求,而是引入了一個新的抽象TransportTransport負責具體的MQ的操作,也就是說Channel的操作都會落到Transport上執行

再看下event loop

 

上面我們有了connection以及綁定connectionconsumer,下面看看消費者怎么消費消息,如果是帶hub的情況:

 

先對consumer進行一些設置,

 

然后開始進行循環。loopkombu創建的event loop,啟用事件循環機制,然后next這邊就開始不停的循環獲取消息並執行。

 

這個是kombu里的部分實現,是對從池里取到的消息進行處理。

 

看下同步代碼,register_callback將回調注冊consumer,然后執行consume

 

再看消息循環那幾行,

獲取到消息后,調用回調函數進行處理。

 

回調函數使用的是create_task_handler()strategies是在上面的update_strategies里進行的更新,該函數是在Task里調用的

 

打印一下strategies里的信息,只截部分圖:

 

下面看下我們怎么啟動任務的,

 

調用到appsend_task

 

再調用到ampqpublish_task

 

最終又交給kombupublish

關於pool的選擇:

 

使用的是apppool,即

 

通過connection又走到了ampq再轉到kombu里。

 

 

Workerconsumer基本大框架就是上面的流程,下面看下beat是怎么實現的。

Beat起動的時候是celery beat,根據我們上面的分析,首先進入的應該是celey/bin/beat.py,然后調用該文件中的Beatrun函數:

 

然后在指向appsBeat

 

在apps里的Beat調用run

 

主要執行了三個函數,init_loader主要初始化並綁定task,第二步設置一些頭信息之類的,關鍵是第三步,主干代碼

 

主要是初始化servicestart

 

Start最關鍵的部分是那個while循環體,只要不被shutdown,就會一直調用schedulertick

 

這邊這個self.schedule就是我們准備調度的任務:

 

下面看對這些任務的處理:

 

這是判斷是否要執行任務的邏輯,如果要執行,則執行apply_async

 

如果發現任務該執行了,則去tasks里獲取任務,並執行,這邊的apply_asyncworker那邊的沒區別,如果沒找到task,則將task注冊到broker

 

 

 

怎樣將consumerconcurrency聯系起來

這邊調用了_process_task,調用的是worker里的

這邊調用各種池的啟動函數:

但是queue里只是引用,后面還有別的處理

在初始化consumer時候將調用池的操作傳了進去,成為了Consumer里的on_task_request

在Tasks調用start的時候會更新strategies

然后在這邊調用start_strategy

然后就進入

然后走入strategy的default

這里取了consumeron_task_request,就是我們傳入的池執行的邏輯,_limit_task是這樣的:

做了一些判斷,符合條件再執行。

這個文件是strategydefault的下半個文件,做了一些流量控制,然后執行limit_task或者直接執行handler

這邊因為使用的gevent,所以就走到geventapply_async

這邊是起一個協程處理,這樣就將任務交給了gevent

具體上面是執行流程,具體在哪里執行的呢?

這邊注冊了callbackcreate_task_handlerstrategy這邊取值取值執行

 

Qosack的處理部分:

Kombutransportredis.py里的額basic_consume,調用channelbasic_consume

 

在Kombu.transport.virtual.__init__.py文件中

 

這里維護了一個dictself._delivered,一個setself._dirty和一個intprefetch_count

如果no_ackFalse在執行consume后會向self._delivered中添加一條數據,

ack后會向self._dirty中添加一條數據,然后,后面會將self._dirty逐條刪除,並同時刪除self._delivered中的數據,如果沒有ack,則不會刪除:

 

 

每次拉任務的時候會調用can_consume

 

比較prefetch_countself._deliveredself._dirty的值,如果小於預取限制,則允許,否則不允許。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM