Celery啟動的入口:
文件:Celery/bin/celery.py
看下main函數做了什么事
可以看到主要做了幾個事根據-P參數判斷是否需要打patch,如果是gevent或者eventlet則要打對應的補丁。
然后執行命令行邏輯
可以看到,這邊取出系統參數
然后執行基類Command的execute_from_commandline,
文件:celery/bin/base.py
setup_app_from_commandline是核心函數,作用是獲得我們的app對象和獲得我們的配置參數
文件:Celery/bin/celery.py
這邊主要獲取啟動類別及啟動參數,我們的類別是worker所以:
這邊是開始准備啟動對應類別的對象,worker、beat等。
self.commands是支持的命令:
上面我們知道,我們的類型是worker,即celery.bin.worker.worker,初始化該類,然后執行run_from_argv函數
文件:celery/bin/worker.py
最后一行會執行到父類的__call__函數,
文件:celery/bin/base.py
這邊主要執行的是run函數
這個函數主要是啟動worker
終於進入worker了,現在這里涉及一些比較關鍵的東西了,
文件:celery/worker/__init__.py
在WorkController類里,是worker的基類
這是worker的藍圖,這邊會形成一個依賴圖,是啟動的必要組件,分別負責worker的一部分任務,比較重要的幾個:
Timer:用於執行定時任務的 Timer,和 Consumer 那里的 timer 不同
Hub:Event loop 的封裝對象
Pool:構造各種執行池(線程/進程/協程)的
Beat:創建Beat進程,不過是以子進程的形式運行(不同於命令行中以beat參數運行)
文件:celery/apps/worker.py
文件:celery/apps/trace.py
文件:celery/app/base.py
從init_before開始,這邊是最主要的,即綁定所有的task到我們的app,注冊task在下面
每個task都有delay和apply_async函數,這個可以用來幫我們啟動任務。
文件:celery/worker/__init__.py
這邊是設置關注及不關注的隊列,可以看到,celery支持ampq協議。
調用setup_includes安裝一些通過CELERY_INCLUDE配置的模塊,保證所有的任務模塊都導入了
最后初始化藍圖,並進行apply完成藍圖各個step的依賴關系圖的構建,並進行各個組件的初始化,依賴在component中已經標出
這個requires就是依賴,說明hub依賴timer,上面藍圖聲明的組件都有互相依賴關系。
回到文件:celery/worker/__init__.py執行start
執行的是藍圖的start。
分別執行各個步驟的start,在apply時,會判斷step是否需要start,不start但是仍要create。
通過啟動日志看,worker啟動的step為Pool,和Consumer;
如果換成prefork方式起,worker會多起hub和autoscaler兩個step:
Hub依賴Timer,我們用gevent,所以include_if是false,這個不需要start。
Hub創建時候引用的kombu的Hub組件,Connection會注冊到Hub,Connection是各種類型連接的封裝,對外提供統一接口
Queue依賴Hub,這邊是基於Hub創建任務隊列
下面是我們的worker啟動的step其中的一個,重點進行說明
初始化線程/協程池,是否彈縮,最大和最小並發數
Celery支持的幾種TaskPool,
我們是gevent,所以這邊直接找gevent的代碼。
這邊直接引用gevent的Pool
下面看worker啟動的第二個step
可以看到,這邊啟動的是celery.worker.consumer.Consumer,這邊就會涉及另一個重要的藍圖了。
文件:celery/worker/consumer,Consumer類
這是Consumer的藍圖,
Consumer啟動的step為Connection,events,mingle,Gossip,Tasks,Contorl,Heart和event loop。
__init__初始化一些必要的組件,很多都是之前worker創建的。
然后執行blueprint的apply,做的事我worker之前是一樣的。
執行Consumer的start,也就是執行blueprint的start。
啟動的step的基本功能:
Connection:管理和broker的Connection連接
Mingle:不同worker之間同步狀態用的
Tasks:啟動消息Consumer
Gossip:消費來自其他worker的事件
Heart:發送心跳事件(consumer的心跳)
Control:遠程命令管理服務
其中Connection,Tasks,Heart和event loop是最重要的幾個。
先看Connection。
使用了consumer的connect()
Conn引用了ampq的connection,ampq的Connection是直接使用的kombu的Connection,上面說過,這個Connection是各種支持的類型(如redis,rabbitMQ等)的抽象,對外提供統一接口。
如果hub存在,會將連接注冊到event loop。
再看Tasks:
這邊引用的ampq的TaskConsumer,ampq的TaskConsumer繼承了kombu的Consumer。
可以看到,在關鍵的幾個地方,celery都引用了kombu,Kombu對所有的MQ進行抽象,然后通過接口對外暴露出一致的API(Redis/RabbitMQ/MongoDB),Kombu對MQ的抽象如下:
Message:生產消費的基本單位,就是一條條消息
Connection:對 MQ 連接的抽象,一個 Connection 就對應一個 MQ 的連接
Transport:真實的 MQ 連接,也是真正連接到 MQ(redis/rabbitmq) 的實例
Producers: 發送消息的抽象類
Consumers:接受消息的抽象類
Exchange:MQ 路由,這個和 RabbitMQ 差不多,支持 5種 類型
Queue:對應的 queue 抽象,其實就是一個字符串的封裝
Hub是一個eventloop,Connection注冊到Hub,一個Connection對應一個Hub。Consumer綁定了消息的處理函數,每一個Consumer初始化的時候都是和Channel綁定的,也就是說我們Consumer包含了Queue也就和Connection關聯起來了,Consumer消費消息是通過Queue來消費,然后Queue又轉嫁給Channel,再轉給connection,Channel是AMQP對MQ的操作的封裝,Connection是AMQP對連接的封裝,那么兩者的關系就是對MQ的操作必然離不開連接,但是,Kombu並不直接讓Channel使用Connection來發送/接受請求,而是引入了一個新的抽象Transport,Transport負責具體的MQ的操作,也就是說Channel的操作都會落到Transport上執行。
再看下event loop:
上面我們有了connection以及綁定connection的consumer,下面看看消費者怎么消費消息,如果是帶hub的情況:
先對consumer進行一些設置,
然后開始進行循環。loop是kombu創建的event loop,啟用事件循環機制,然后next這邊就開始不停的循環獲取消息並執行。
這個是kombu里的部分實現,是對從池里取到的消息進行處理。
看下同步代碼,register_callback將回調注冊consumer,然后執行consume:
再看消息循環那幾行,
獲取到消息后,調用回調函數進行處理。
回調函數使用的是create_task_handler(),strategies是在上面的update_strategies里進行的更新,該函數是在Task里調用的
打印一下strategies里的信息,只截部分圖:
下面看下我們怎么啟動任務的,
調用到app的send_task
再調用到ampq的publish_task,
最終又交給kombu的publish。
關於pool的選擇:
使用的是app的pool,即
通過connection又走到了ampq再轉到kombu里。
Worker和consumer基本大框架就是上面的流程,下面看下beat是怎么實現的。
Beat起動的時候是celery beat,根據我們上面的分析,首先進入的應該是celey/bin/beat.py,然后調用該文件中的Beat的run函數:
然后在指向apps的Beat:
在apps里的Beat調用run:
主要執行了三個函數,init_loader主要初始化並綁定task,第二步設置一些頭信息之類的,關鍵是第三步,主干代碼
主要是初始化service並start。
Start最關鍵的部分是那個while循環體,只要不被shutdown,就會一直調用scheduler的tick
這邊這個self.schedule就是我們准備調度的任務:
下面看對這些任務的處理:
這是判斷是否要執行任務的邏輯,如果要執行,則執行apply_async。
如果發現任務該執行了,則去tasks里獲取任務,並執行,這邊的apply_async和worker那邊的沒區別,如果沒找到task,則將task注冊到broker。
怎樣將consumer和concurrency聯系起來
這邊調用了_process_task,調用的是worker里的
這邊調用各種池的啟動函數:
但是queue里只是引用,后面還有別的處理
在初始化consumer時候將調用池的操作傳了進去,成為了Consumer里的on_task_request
在Tasks調用start的時候會更新strategies
然后在這邊調用start_strategy
然后就進入
然后走入strategy的default
這里取了consumer的on_task_request,就是我們傳入的池執行的邏輯,_limit_task是這樣的:
做了一些判斷,符合條件再執行。
這個文件是strategy的default的下半個文件,做了一些流量控制,然后執行limit_task或者直接執行handler。
這邊因為使用的gevent,所以就走到gevent的apply_async,
這邊是起一個協程處理,這樣就將任務交給了gevent。
具體上面是執行流程,具體在哪里執行的呢?
這邊注冊了callback,create_task_handler從strategy這邊取值取值執行
Qos對ack的處理部分:
Kombu的transport的redis.py里的額basic_consume,調用channel的basic_consume;
在Kombu.transport.virtual.__init__.py文件中
這里維護了一個dict:self._delivered,一個set:self._dirty和一個int:prefetch_count,
如果no_ack為False在執行consume后會向self._delivered中添加一條數據,
ack后會向self._dirty中添加一條數據,然后,后面會將self._dirty逐條刪除,並同時刪除self._delivered中的數據,如果沒有ack,則不會刪除:
每次拉任務的時候會調用can_consume:
比較prefetch_count和self._delivered減self._dirty的值,如果小於預取限制,則允許,否則不允許。