序列文章:
-
Celery 源碼解析一:Worker 啟動流程概述
-
Celery 源碼解析二:Worker 的執行引擎
-
Celery 源碼解析三: Task 對象的實現
-
Celery 源碼解析四: 定時任務的實現
-
Celery 源碼解析五: 遠程控制管理
-
Celery 源碼解析六:Events 的實現
-
Celery 源碼解析七:Worker 之間的交互
-
Celery 源碼解析八:State 和 Result
前面對於 Celery 的分布式處理已經做了一些介紹,例如第五章的 遠程控制 和第六章的 Event機制,但是,我認為這些分布式都比較簡單,並沒有體現出多實例之間的協同作用,所以,今天就來點更加復雜的,對於多實例直接的交互更多,這就是 Gossip 和 Mingle。
Mingle
在 Celery 的介紹中,Mingle 主要用在啟動或者重啟的時候,它會和其他的 worker 交互,從而進行同步。同步的數據有:
- 其他 worker 的 clock
- 其他 worker 已經處理掉的 tasks
這其實也就是它的所有功能的,所以你可以猜測功能應該很簡單吧?不妨一起來看看,最開始還是回憶一下第一篇文章中的 Bootstep,所以我們可以毫無壓力得找出源碼所在的文件:

這里從注釋中可以很簡單得看出 Mingle 的作用,然后初始化也是比較簡單,關鍵還是 Line 37 的 start,需要我們關注 sync 做了什么,為什么上來二話不說就 sync?其實上來就 sync 很好理解,畢竟 Mingle 的作用就是進行 sync 嘛,所以我們要關注的是如何實現的:

這里原來的代碼有點冗余,我給忽略掉了,直接上精簡后的代碼,所以你可以很清晰得看到代碼的邏輯是這樣的:
- Mingle 向每一個 Worker 發送問候:hello
- 每個 Worker 都向 Mingle 回復自己的信息(clock 和 tasks)
- Mingle 更新自己的信息
這些邏輯我們從精簡后的代碼可以簡單看出來,所以就不細說了,但是有一點需要展開講講,那就是 Line 47 中的 inspect.hello,這應該是 第五篇 的內容,但是,之前只是介紹了一下如何注冊,並沒有對這些命令一一解析,所以這里用到了,我們就不妨看看里面的內容。

ok,這里我們可以看到在 Line 319、320 就返回了兩個東西,分別是:
- revoked:當前 worker 記錄的已被完成的 tasks
- clock:當前 worker 的 clock
然后就返回到剛起來的 worker 了,收到這個消息的 worker 就根據這兩個信息刷新自己的狀態,然后繼續運行,Mingle 也就完成了自己的任務了。
Gossip
和 Mingle 不同,Gossip 卻是消費 Event 的,本來按道理應該放在 第六篇 中介紹,但是由於篇幅原因,所以一起放在這里來說了,不多贅述,我們直接看 Bootstep:

由於 Gossip 的初始化內容太多,所以我也不全都展開了,挑了些重點(還是很多),但是目前我們可以忽略大部分的內容,最先需要關注的是 Line 24,如果你夠細心的話你會發現這個 Bootstep 和其他不一樣,因為它繼承的是 ConsumerStep,這是會注冊一個 Consumer 的!
然后我們沒啥好看了,所以按照套路還是看看 start 唄,然而它調用的也是父類的 start,所以,沒辦法咯,直接跟過去:

諾,是這樣吧,是增加了 Consumer,這樣的話,我們就必須看看這個 Consumer 是什么了,能夠消費什么樣的數據:

好,這樣就清晰了,所有關於 worker.# 的 Event 都被這里消費了,這里算是看完了。
那現在的問題變成了這些 Event 都是從哪里過來的,我們有必要對源頭進行一下追蹤,但是,怎么追蹤呢?回想一下 第六篇 中講 Event 的消息傳遞的那里,再和這里一對比,事情就很清楚了。
你以為 Gossip 就這么結束了么?嘿嘿,那你就被 Celery 給蒙騙了,悄悄告訴你,Celery 在 Gossip 中埋伏了一個厲害的功能,但是沒有對外宣稱,那就是 Leader 選舉!,不信?我帶你去看看:

這是選舉的入口,先不解析代碼,我們先來看看有誰調用了它:

ok,可以發現這有個 control 命令用到了它,這里有注釋,我們可以看到參數分別代表的意思:
- id:唯一的標識,用於識別一次選舉
- topic:本次選舉的 topic,其實是標識 action 的類型
- action:本次選舉的目的,選中的 leader 負責處理這個 action
那么這樣我們就清楚了,首先,有一個 action 需要執行,但是,那么多的 worker,交給誰執行呢?這就需要進行 選舉,那么選舉的方式是怎么進行的呢,我先用一張圖來描述一下這個過程:
-
control 表示需要進行一個選舉,然后一個 worker 的 gossip 就發送了一個 Event:worker-elect,然后所有的 Worker 都能接收到:
 -
每個 Worker 接收到之后,就對這個選舉進行響應,將自己的選號(clock)送過去,這樣,每個 Worker 在發送選號的同時,也接收到別人的選號,因為收發的路線太多,我就找一個 Worker 來表示收,但是其他 Worker 也是有收的,只是我沒有標出來:
 -
當一個 Worker 收到所有 Worker 的 ACK 之后,那么它就會對所有的 Worker 的 選號 進行排序,選出其中 最大的選號 作為本次選舉的 Leader,如果 Leader 是自己那么就處理這個 Action,如果不是自己,那么忽略,應該被選中的 Leader 也在執行這個過程,所以不需要別人擔心。

這就是實際執行的示意圖,對應到代碼就分別是:
- 第一步中的 Control 要求選舉和發送選舉 Event 我們前面已經看過了
-
Worker 收到選舉 Event 之后,發出自己的參選聲明:
 -
每個 Wroker 對別人回應的參選信息進行選舉:

ok,整個流程就是這樣的了,那么問題來了,萬一有一個 Worker 收不到 replies 或者發出的 reply 不小心丟了會怎么樣?是不是整個選舉過程就進行不下去了?我好像沒有看到 Celery 有在這方面做一些努力。
