[源碼分析] 並行分布式任務隊列 Celery 之 Timer & Heartbeat
0x00 摘要
Celery是一個簡單、靈活且可靠的,處理大量消息的分布式系統,專注於實時處理的異步任務隊列,同時也支持任務調度。
之前我們用了十幾篇文章,介紹了 Kombu 和 Celery 的基礎功能。從本文開始,我們介紹 Celery 的一些輔助功能(比如負載均衡,容錯等等)。其實從某種意義上來說,這些輔助功能更加重要。
本文我們介紹 Timer 和 Heart 這兩個組件。大家可以看看底層設計是如何影響上層實現的。
[源碼解析] 並行分布式框架 Celery 之 worker 啟動 (1)
[源碼解析] 並行分布式框架 Celery 之 worker 啟動 (2)
[源碼解析] 分布式任務隊列 Celery 之啟動 Consumer
[源碼解析] 並行分布式任務隊列 Celery 之 Task是什么
[從源碼學設計]celery 之 發送Task & AMQP
[源碼解析] 並行分布式任務隊列 Celery 之 消費動態流程
[源碼解析] 並行分布式任務隊列 Celery 之 多進程模型
[源碼分析] 分布式任務隊列 Celery 多線程模型 之 子進程
[源碼分析]並行分布式任務隊列 Celery 之 子進程處理消息
0x01 Blueprint
Celery 的 Worker初始化過程中,其內部各個子模塊的執行順序是由一個BluePrint類定義,並且根據各個模塊之間的依賴進行排序(實際上把這種依賴關系組織成了一個 DAG)執行。
Celery worker 的 Blueprint 如下,我們可以看到 Timer,Hub 是 Celery Worker 的兩個基本組件,提到 hub 是因為后面講解需要用到。
class Blueprint(bootsteps.Blueprint):
"""Worker bootstep blueprint."""
name = 'Worker'
default_steps = {
'celery.worker.components:Hub', # 這里是 Hub
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer', # 這里是 Timer
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
}
0x02 Timer Step
我們首先來到 Timer Step。
從 Timer 組件 的定義中可以看到,Timer 組件 會根據當前worker是否使用事件循環機制來決定創建什么類型的timer。
- 如果使用 eventloop,則使用
kombu.asynchronous.timer.Timer as _Timer
,這里具體等待動作由用戶自己完成。 - 否則使用 Pool 內部的Timer類(就是
timer_cls='celery.utils.timer2.Timer'
),timer2 自己做了一個線程來做定時等待;
定義如下:
from kombu.asynchronous.timer import Timer as _Timer
class Timer(bootsteps.Step):
"""Timer bootstep."""
def create(self, w):
if w.use_eventloop: # 檢查傳入的Worker是否使用了use_eventloop
# does not use dedicated timer thread.
w.timer = _Timer(max_interval=10.0) # 直接使用kombu的timer做定時器
else:
if not w.timer_cls: # 如果配置文件中沒有配置timer_clas
# Default Timer is set by the pool, as for example, the
# eventlet pool needs a custom timer implementation.
w.timer_cls = w.pool_cls.Timer # 使用緩沖池中的Timer
w.timer = self.instantiate(w.timer_cls,
max_interval=w.timer_precision,
on_error=self.on_timer_error,
on_tick=self.on_timer_tick) # 導入對應的類並實例化
起初看代碼時候很奇怪,為什么要再單獨定義一個 timer2?
原因推斷是(因為對 Celery 的版本發展歷史不清楚,所以此處不甚確定,希望有同學可以指正):依據 底層 Transport 的設計來對 Timer 做具體實現調整。
2.1 Transport
大家知道,Celery 是依賴於 Kombu,而在 Kombu 體系中,用 transport 對所有的 broker 進行了抽象,為不同的 broker 提供了一致的解決方案。通過Kombu,開發者可以根據實際需求靈活的選擇或更換broker。
我們再回顧下具體 Kombu 的概念:
- Connection 是 AMQP 對 連接的封裝;
- Channel 是 AMQP 對 MQ 操作的封裝;
那么兩者的關系就是對 MQ 的操作(Channel)必然離不開連接(Connection),但是 Kombu 並不直接讓 Channel 使用 Connection 來發送 / 接受請求,而是引入了一個新的抽象 Transport。Transport 負責具體的 MQ 的操作,也就是說 Channel 的操作都會落到 Transport 上執行;
Transport 代表真實的 MQ 連接,也是真正連接到 MQ( redis / rabbitmq )的實例。就是存儲和發送消息的實體,用來區分底層消息隊列是用 amqp、Redis 還是其它實現的。
具體 Kombu 邏輯如下圖,Transport 在左下角處 :
2.2 Thread-less VS Thread-based
對於 Transport,某些 rate-limit implementation(比如 RabbitMQ / Redis ) 為了減少開銷,采用了event-loop(底層使用了 Epoll),是 thread-less and lock-free。
而其他舊類型的 Transport 就是 Thread based,比如 Mongo。因此,
-
對於 Thread-less Transport
-
Kombu 就采用了
kombu.asynchronous.timer.Timer as _Timer
,具體等待操作是在 event-loop 中實現,就是 調用者 自己會做等待。 -
具體比如在 Redis Transport 之中,就有 register_with_event_loop 函數用來在 loop(就是 event-loop)中注冊自己,具體如下:
-
def register_with_event_loop(self, connection, loop): cycle = self.cycle cycle.on_poll_init(loop.poller) cycle_poll_start = cycle.on_poll_start add_reader = loop.add_reader on_readable = self.on_readable def on_poll_start(): cycle_poll_start() [add_reader(fd, on_readable, fd) for fd in cycle.fds] loop.on_tick.add(on_poll_start) loop.call_repeatedly(10, cycle.maybe_restore_messages) loop.call_repeatedly( health_check_interval, cycle.maybe_check_subclient_health )
-
-
對於 thread-based Transport,
- 則采用了 celery.utils.timer2.Timer,timer2 自己繼承了線程類,使用自己這個線程來做定時等待;
- 比如在 Mongodb transport 之中,就沒有任何關於 event loop 的操作。
即,選用 timer 的哪種實現,看是否需要等待來決定,就是誰來完成 “等待” 這個動作。
翻了翻 Celery 2.4.7 的代碼,發現在這個版本,確實只有 Thread-based timer,其代碼涵蓋了 目前的 timer 2 和 kombu.asynchronous.timer.Timer
大部分功能。應該是從 3.0.2 之后,把部分代碼分離到了 kombu.asynchronous.timer.Timer
,實現了 Thread-less 和 Thread-based 兩個不同的實現。
具體可以參見下面源碼中的注釋:
- RabbitMQ/Redis: thread-less and lock-free rate-limit implementation.
This means that rate limits pose minimal overhead when used with
RabbitMQ/Redis or future transports using the event-loop,
and that the rate-limit implementation is now thread-less and lock-free.
The thread-based transports will still use the old implementation for
now, but the plan is to use the timer also for other
broker transports in Celery 3.1.
0x03 Timer in Pool
注意,上面的是 Timer Step,是一個啟動的階段,其目的是生成 Timer 組件 給 其他組件使用,並不是 Timer 功能類。
我們其次來看看 Timer 功能類 在 線程池 Pool 中的使用,就對應了前面 Blueprint step 之中的兩種不同 cases。
分別也對應了兩種應用場景(或者說是線程池實現):
- gevent 和 eventlet 使用
kombu.asynchronous.timer.Timer
。 - BasePool(以及其他類型線程池)使用了
timer2.Timer。
初步來分析,gevent 和 eventlet 都是用協程來模擬線程,所以本身具有Event loop,因此使用 kombu.asynchronous.timer.Timer
也算順理成章。
3.1 gevent 和 eventlet
對於 gevent,eventlet 這種情況,使用了 class Timer(_timer.Timer) 作為 Timer 功能類。
從代碼中可以看到,class Timer 擴展了 kombu.asynchronous.timer.Timer
。
from kombu.asynchronous import timer as _timer
class Timer(_timer.Timer):
def __init__(self, *args, **kwargs):
from gevent import Greenlet, GreenletExit
class _Greenlet(Greenlet):
cancel = Greenlet.kill
self._Greenlet = _Greenlet
self._GreenletExit = GreenletExit
super().__init__(*args, **kwargs)
self._queue = set()
def _enter(self, eta, priority, entry, **kwargs):
secs = max(eta - monotonic(), 0)
g = self._Greenlet.spawn_later(secs, entry)
self._queue.add(g)
g.link(self._entry_exit)
g.entry = entry
g.eta = eta
g.priority = priority
g.canceled = False
return g
def _entry_exit(self, g):
try:
g.kill()
finally:
self._queue.discard(g)
def clear(self):
queue = self._queue
while queue:
try:
queue.pop().kill()
except KeyError:
pass
@property
def queue(self):
return self._queue
3.2 BasePool
而 BasePool 采用了 timer2 . Timer
作為 Timer 功能類。
from celery.utils import timer2
class BasePool:
"""Task pool."""
Timer = timer2.Timer
下面我們具體看看 Timer 功能類 如何實現。
0x04 kombu.Timer
4.1 異步
kombu.asynchronous.timer.Timer
實現了異步Timer。
由其注釋可以,kombu.asynchronous.timer.Timer 在調用者每次得到下一次entry時,會給出tuple of (wait_seconds, entry)
,調用者應該進行等待相應時間。
即,kombu.Timer是調用者等待,普通timer是timer自己啟動線程等待。
"""Iterate over schedule.
This iterator yields a tuple of ``(wait_seconds, entry)``,
where if entry is :const:`None` the caller should wait
for ``wait_seconds`` until it polls the schedule again.
"""
定義如下:
class Timer:
"""Async timer implementation."""
Entry = Entry
on_error = None
def __init__(self, max_interval=None, on_error=None, **kwargs):
self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
self.on_error = on_error or self.on_error
self._queue = []
4.2 調用
4.2.1 添加 timer function
用戶通過 call_repeatedly 來添加 timer function。
def call_repeatedly(self, secs, fun, args=(), kwargs=None, priority=0):
kwargs = {} if not kwargs else kwargs
tref = self.Entry(fun, args, kwargs)
@wraps(fun)
def _reschedules(*args, **kwargs):
last, now = tref._last_run, monotonic()
lsince = (now - tref._last_run) if last else secs
try:
if lsince and lsince >= secs:
tref._last_run = now
return fun(*args, **kwargs) # 調用用戶方法
finally:
if not tref.canceled:
last = tref._last_run
next = secs - (now - last) if last else secs
self.enter_after(next, tref, priority)
tref.fun = _reschedules
tref._last_run = None
return self.enter_after(secs, tref, priority)
4.2.2 調用
Timer通過apply_entry進行調用。
def apply_entry(self, entry):
try:
entry()
except Exception as exc:
if not self.handle_error(exc):
logger.error('Error in timer: %r', exc, exc_info=True)
在獲取下一次entry時,會返回等待時間。
def __iter__(self, min=min, nowfun=monotonic,
pop=heapq.heappop, push=heapq.heappush):
"""Iterate over schedule.
This iterator yields a tuple of ``(wait_seconds, entry)``,
where if entry is :const:`None` the caller should wait
for ``wait_seconds`` until it polls the schedule again.
"""
max_interval = self.max_interval
queue = self._queue
while 1:
if queue:
eventA = queue[0]
now, eta = nowfun(), eventA[0]
if now < eta:
yield min(eta - now, max_interval), None
else:
eventB = pop(queue)
if eventB is eventA:
entry = eventA[2]
if not entry.canceled:
yield None, entry
continue
else:
push(queue, eventB)
else:
yield None, None
4.3 實驗
我們做實驗看看 timer 功能類 的 使用。
4.3.1 示例代碼
下面代碼來自https://github.com/liuliqiang/blog_codes/tree/master/python/celery/kombu,特此感謝。
def main(arguments):
hub = Hub()
exchange = Exchange('asynt')
queue = Queue('asynt', exchange, 'asynt')
def send_message(conn):
producer = Producer(conn)
producer.publish('hello world', exchange=exchange, routing_key='asynt')
print('message sent')
def on_message(message):
print('received: {0!r}'.format(message.body))
message.ack()
# hub.stop() # <-- exit after one message
conn = Connection('redis://localhost:6379')
conn.register_with_event_loop(hub)
def p_message():
print('redis://localhost:6379')
with Consumer(conn, [queue], on_message=on_message):
send_message(conn)
hub.timer.call_repeatedly(
3, p_message
)
hub.run_forever()
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
這里,Hub 就是 timer 的客戶。
得到Stack如下,可以看到 hub 使用 timer 做了消息循環,於是我們需要看看 hub:
p_message
_reschedules, timer.py:127
__call__, timer.py:65
fire_timers, hub.py:142
create_loop, hub.py:300
run_once, hub.py:193
run_forever, hub.py:185
main, testUb.py:46
<module>, testUb.py:50
啟動時候的邏輯如下,hub 通過 hub.timer.call_repeatedly 設置了需要調用的用戶函數 fun,在 Timer 內部,fun 被包裝設置為 _reschedules。
Hub
+
| +----------------------------------+
| | kombu.asynchronous.timer.Timer |
| | |
| call_repeatedly(fun) | |
| | |
+----------------------------------------------> _reschedules [@wraps(fun)] |
| | |
| | |
| | |
| +----------------------------------+
|
|
v
4.3.2 Hub 的使用
以下代碼是Hub類,在這里,Hub 就是 timer 的用戶。
可以看到,hub 建立了message_loop。在 loop 中,hub 會:
- 使用 fire_timers 進行 timer 處理,會設置下一次 timer。
- 得到 poll_timeout 后,會進行處理或者 sleep。
下面是簡化版代碼。
def create_loop():
while 1:
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
if readers or writers:
events = poll(poll_timeout)
for fd, event in events or ():
if event & READ:
try:
cb, cbargs = readers[fd]
try:
cb(*cbargs)
except Empty:
pass
else:
# no sockets yet, startup is probably not done.
sleep(min(poll_timeout, 0.1))
yield
我們再看看 fire_timers,這就是調用用戶方法。
def fire_timers(self, min_delay=1, max_delay=10, max_timers=10,
propagate=()):
timer = self.timer
delay = None
if timer and timer._queue:
for i in range(max_timers):
delay, entry = next(self.scheduler)
if entry is None:
break
entry()# 調用用戶方法
return min(delay or min_delay, max_delay)
使用Entry調用用戶方法
class Entry:
"""Schedule Entry."""
def __call__(self):
return self.fun(*self.args, **self.kwargs)# 調用用戶方法
具體邏輯如下:
+--------------------------+
| |
| Hub |
| + |
| | | +----------------------------------+
| | | | kombu.asynchronous.timer.Timer |
| | | | |
| | | call_repeatedly(fun) | |
| | | | |
| +----------------------------------------> _reschedules [@wraps(fun)] |
| | | | |
| | | | |
| | | | |
| | | +----------------------------------+
| create_loop |
| + | ^
| | | |
| | | |
| v | |
| | |
| +---> message_loop | |
| | + | |
| | | | |
| | v | iter(self.timer) |
| | fire_timers +--------------------------------------+
| | + |
| | | |
| | v |
| | poll |
| | + |
| | | |
| | v |
| | sleep |
| | + |
| | | |
| +-----------+ |
+--------------------------+
0x05 timer2
在celery/utils/timer2.py
中定義了Timer
類實例,可以看出其繼承了threading.Thread,但是居然也用kombu.asynchronous.timer
。
在源碼注釋中有:This is only used for transports not supporting AsyncIO
。
其實,就是 timer2 自己做了一個線程來做定時sleep等待,然后調用用戶方法而已。
from kombu.asynchronous.timer import Entry
from kombu.asynchronous.timer import Timer as Schedule
from kombu.asynchronous.timer import logger, to_timestamp
class Timer(threading.Thread): # 擴展了 線程
"""Timer thread.
Note:
This is only used for transports not supporting AsyncIO.
"""
Entry = Entry
Schedule = Schedule
running = False
on_tick = None
_timer_count = count(1)
在run方法中,會定期sleep。
def run(self):
try:
self.running = True
self.scheduler = iter(self.schedule)
while not self._is_shutdown.isSet():
delay = self._next_entry()
if delay:
if self.on_tick:
self.on_tick(delay)
if sleep is None: # pragma: no cover
break
sleep(delay)
try:
self._is_stopped.set()
except TypeError: # pragma: no cover
# we lost the race at interpreter shutdown,
# so gc collected built-in modules.
pass
except Exception as exc:
sys.stderr.flush()
os._exit(1)
在_next_entry方法中,調用用戶方法,這是通過kombu.asynchronous.timer
完成的。
def _next_entry(self):
with self.not_empty:
delay, entry = next(self.scheduler)
if entry is None:
if delay is None:
self.not_empty.wait(1.0)
return delay
return self.schedule.apply_entry(entry)
__next__ = next = _next_entry # for 2to3
0x06 Heart
Timer 類主要是做一些定時調度方面的工作。
Heart 組件 就是使用 Timer組件 進行定期調度,發送心跳 Event,告訴其他 Worker 這個 Worker 還活着。
同時,當本worker 啟動,停止時候,也發送 worker-online,worker-offline 這兩種消息。
6.1 Heart in Bootstep
位置在:celery/worker/consumer/heart.py。
其作用就是啟動 heart 功能類。
class Heart(bootsteps.StartStopStep):
"""Bootstep sending event heartbeats.
This service sends a ``worker-heartbeat`` message every n seconds.
Note:
Not to be confused with AMQP protocol level heartbeats.
"""
requires = (Events,)
def __init__(self, c,
without_heartbeat=False, heartbeat_interval=None, **kwargs):
self.enabled = not without_heartbeat
self.heartbeat_interval = heartbeat_interval
c.heart = None
super().__init__(c, **kwargs)
def start(self, c):
c.heart = heartbeat.Heart(
c.timer, c.event_dispatcher, self.heartbeat_interval,
)
c.heart.start()
def stop(self, c):
c.heart = c.heart and c.heart.stop()
shutdown = stop
6.2 Heart in Consumer
位置在:celery/worker/heartbeat.py。可以看到就是從啟動之后,使用 call_repeatedly 定期發送心跳。
class Heart:
"""Timer sending heartbeats at regular intervals.
Arguments:
timer (kombu.asynchronous.timer.Timer): Timer to use.
eventer (celery.events.EventDispatcher): Event dispatcher
to use.
interval (float): Time in seconds between sending
heartbeats. Default is 2 seconds.
"""
def __init__(self, timer, eventer, interval=None):
self.timer = timer
self.eventer = eventer
def _send(self, event, retry=True):
return self.eventer.send(event, freq=self.interval, ...)
def start(self):
if self.eventer.enabled:
self.tref = self.timer.call_repeatedly(
self.interval, self._send, ('worker-heartbeat',),
)
此時變量為:
self = {Heart} <celery.worker.heartbeat.Heart object at 0x000001D377636408>
eventer = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x000001D37765B308>
interval = {float} 2.0
timer = {Timer: 0} <Timer(Timer-1, stopped daemon)>
tref = {NoneType} None
_send_sent_signal = {NoneType} None
6.3 worker-online
當啟動時候,發送 worker-online 消息。
def start(self):
if self.eventer.enabled:
self._send('worker-online')
self.tref = self.timer.call_repeatedly(
self.interval, self._send, ('worker-heartbeat',),
)
6.4 worker-offline
當停止時候,發送 worker-offline 消息。
def stop(self):
if self.tref is not None:
self.timer.cancel(self.tref)
self.tref = None
if self.eventer.enabled:
self._send('worker-offline', retry=False)
6.5 發送心跳
Heart組件會調用 eventer 來群發心跳:
- eventer 是 celery.events.dispatcher.EventDispatcher;
- 心跳是 'worker-heartbeat' 這個 Event;
所以我們下文就要分析 celery.events.dispatcher.EventDispatcher。
def _send(self, event, retry=True):
if self._send_sent_signal is not None:
self._send_sent_signal(sender=self)
return self.eventer.send(event, freq=self.interval,
active=len(active_requests),
processed=all_total_count[0],
loadavg=load_average(),
retry=retry,
**SOFTWARE_INFO)
0xEE 個人信息
★★★★★★關於生活和技術的思考★★★★★★
微信公眾賬號:羅西的思考
如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,敬請關注。