作者:Jack47
PS:如果喜歡我寫的文章,歡迎關注我的微信公眾賬號程序員傑克,兩邊的文章會同步,也可以添加我的RSS訂閱源。
本文主要翻譯自Storm官方文檔Guaranteeing message processing,但我覺得官方文檔寫的有些隨意,啰嗦,所以做了一些修改,里面的配圖自己重新畫了,能夠更加貼切的表達意思。
內容簡介##
Storm可以保證從Spout發出的每個消息都能被完全處理。Storm的可靠性機制是完全分布式的(distributed),可伸縮的(scalable),容錯的(fault-tolerant)。本文介紹了Storm如何保證可靠性以及作為Storm使用者,我們需要怎么做,才能充分利用Storm的可靠性。理解一些實現細節,也能夠幫助我們領悟Storm的設計理念。
PS:本文用到了Storm的一些基本概念,例如Bolt,任務(Task),元組(Tuple),如果不清楚這些概念,可以參看我之前寫的文章:Storm介紹(一),理解Storm並發。下文中元組(Tuple),跟消息(message)是等價的,Storm中處理的消息是用元組這種數據結構來表示的。
一個消息被完整處理是什么意思?##
流式計算單詞個數的例子###
考慮如下的流式計算文章中單詞個數的拓撲:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com", 22133, "sentence_queue", new StringScheme()));
builder.setBolt("split", new SplitStentence(), 10).shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20).fieldsGrouping("split", new Fields("word"));
這個拓撲由3個處理單元組成:一個叫"sentences"的Spout,負責從Kestrel隊列中讀取句子並作為新的Spout元組發送出去。名稱為"split"的Bolt是Spout元組的下游消費方,它把接收到句子切分成單詞並發送出去。名稱為"count"的Bolt是"split" Bolt的下游消費方,它使用HashMap<String, Interger>
存儲了每個任務中每個單詞出現的次數,每次讀取到新的單詞元組就讓該單詞的計數加一。"count" Bolt接收"split" Bolt發出的消息時,是使用元組中的"word"(單詞)字段來作為路由策略,所以相同的單詞元組會被路由到相同的任務(task)里,這樣就能夠計數了。
消息(元組)樹(message tree)###
在下游的Bolt中會基於某個Spout元組發射出很多新的元組:句子中的每個單詞會生成一個新元組(在split Bolt完成),每個單詞的計數更新后(在count Bolt完成)也會觸發一個新的元組。某個Spout元組觸發的消息樹如下圖:
一個Spout元組觸發的消息樹
可以看到這棵消息樹的根節點是Spout產生的句子內容為"the cow jumped over the moon"的元組。這個Spout元組在"split"這個Bolt里被切分為6個單詞,觸發了6個單詞元組,"count" Bolt接收到這6個單詞元組后,更新了每個單詞的計數並為之產生了一個新的元組。
一條消息被“完整處理”###
一條消息被“完整處理”
指一個從Spout發出的元組所觸發的消息樹中所有的消息都被Storm處理了。如果在指定的超時時間里,這個Spout元組觸發的消息樹中有任何一個消息沒有處理完,就認為這個Spout元組處理失敗了。這個超時時間是通過每個拓撲的Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置項來進行配置的,默認是30秒。
在前面消息樹的例子里,只有消息樹中所有的消息(包含一條Spout消息,六條split Bolt消息,六條count Bolt消息)都被Storm處理完了,才算是這條Spout消息被完整處理了。
消息被完整處理或者處理失敗##
當消息沒有被完整處理或者處理失敗了會怎么樣?為了理解這個問題,應該首先看一下Spout發出的一個元組的生命周期。Spout需要實現的接口(接口文檔見這里)如下:
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
首先,Storm通過調用Spout的nextTuple
函數來從Spout請求一個元組。Spout任務使用open
函數入參中提供的SpoutOutputCollector
來給Spout任務的某個輸出流發射一個新元組。當發射一個元組時,Spout
提供了一個"消息標識"(message-id),用來后續識別這個元組。例如,上面的例子里,sentence Spout從Kestrel隊列中讀取一條消息,然后把Kestrel提供的這個消息的message-id作為"消息標識"來發送出去。向SpoutOutputCollector
中發送消息的例子如下:
_collector.emit(new Values("the cow jumped over the moon"), msgId);
接下來,元組就被發送到下游的Bolt進行消費,Storm會負責跟蹤這個Spout元組創建的消息樹。如果Storm檢測到一個元組被完整地處理了,Storm會調用產生這個元組的Spout任務(Spout Bolt有多個任務來運行)的ack
函數,參數是Spout之前發送這個消息時提供給Storm的message-id。類似的,當元組處理超時或處理失敗時,Storm會在元組對應的Spout任務上調用fail
函數,參數是之前Spout發送這個消息時提供給Storm的message-id。這樣應用程序通過實現Spout Bolt中的ack
接口和fail
接口來處理消息處理成功和失敗的情況。例如當消息處理成功時記錄當前處理的進度,當處理失敗時,重新發送消息來對這個消息進行重新處理。但在本文的例子里fail
函數中不需要做任何處理,因為這些元組不會從Kestrel隊列中去掉,下次從隊列取消息,仍然會取到這些消息,只有處理成功后,才會從Kestrel隊列中摘除這些消息。
Storm的可靠性API##
作為Storm用戶,如果想利用Storm的可靠性,需要做兩件事:
1. 創建一個元組時(消息樹上創建一個新節點)需要通知Storm
2. 處理完一個元組,需要通知Storm
通過這兩個操作,當消息樹被完全處理完,Storm就可以立即檢測到,從而可以正確地確認這個Spout元組處理成功或者失敗。Storm的API提供了一套簡潔地處理這些操作的方法。
元組創建時通知Storm###
在Storm消息樹(元組樹)中添加一個子結點的操作叫做錨定
(anchoring)。在應用程序發送一個新元組時候,Storm會在幕后做錨定。還是之前的流式計算單詞個數的例子,請看如下的代碼片段:
public class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector){
_collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
每個單詞元組是通過把輸入的元組作為emit
函數中的第一個參數來做錨定的。通過錨定,Storm就能夠得到元組之間的關聯關系(輸入元組觸發了新的元組),繼而構建出Spout元組觸發的整個消息樹。所以當下游處理失敗時,就可以通知Spout當前消息樹根節點的Spout元組處理失敗,讓Spout重新處理。相反,如果在emit
的時候沒有指定輸入的元組,叫做不錨定
:
_collector.emit(new Values(word));
這樣發射單詞元組,會導致這個元組不被錨定(unanchored)
,這樣Storm就不能得到這個元組的消息樹,繼而不能跟蹤消息樹是否被完整處理。這樣下游處理失敗,不能通知到上游的Spout任務。不同的應用的有不同的容錯處理方式,有時候需要這樣不錨定的場景。
一個輸出的元組可以被錨定到多個輸入元組上,叫做多錨定(multi-anchoring)
。這在做流的合並或者聚合的時候非常有用。一個多錨定的元組處理失敗,會導致Spout上重新處理對應的多個輸入元組。多錨定是通過指定一個多個輸入元組的列表而不是單個元組來完成的。例如:
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(word));
多錨定會把這個新輸出的元組添加到多棵消息樹上。注意多錨定可能會打破消息的樹形結構,變成有向無環圖(DAG),Storm的實現既支持樹形結構,也支持有向無環圖(DAG)。在本文中,提到的消息樹跟有向無環圖是等價的。消息之間的關系是有向無環圖的例子見下圖:
消息形成的有向無環圖
Spout元組A觸發了B和C兩個元組,而這兩個元組作為輸入,共同作用后觸發D元組。
元組處理完后通知Storm###
錨定
的作用就是指定元組樹的結構--下一步是當元組樹中某個元組已經處理完成時,通知Storm。通知是通過OutputCollector
中的ack
和fail
函數來完成的。例如上面流式計算單詞個數例子中的split Bolt
的實現SplitSentence類,可以看到句子被切分成單詞后,當所有的單詞元組都被發射后,會確認(ack)輸入的元組處理完成。
可以利用OutputCollector
的fail
函數來立即通知Storm,當前消息樹的根元組處理失敗了。例如,應用程序可能捕捉到了數據庫客戶端的一個異常,就顯示地通知Storm輸入元組處理失敗。通過顯示地通知Storm元組處理失敗,這個Spout元組就不用等待超時而能更快地被重新處理。
Storm需要占用內存來跟蹤每個元組,所以每個被處理的元組都必須被確認。因為如果不對每個元組進行確認,任務最終會耗光可用的內存。
做聚合或者合並操作的Bolt可能會延遲確認一個元組,直到根據一堆元組計算出了一個結果后,才會確認。聚合或者合並操作的Bolt,通常也會對他們的輸出元組進行多錨定。
Storm 0.7.0引入了“事務拓撲”(transactional topologies)的特性,它讓你在大多數場景下能夠得到完全容錯的只被處理一次的消息語義。更多關於事物拓撲的介紹見這里
Storm怎樣高效的實現可靠性?##
acker任務###
一個Storm拓撲有一組特殊的"acker"任務,它們負責跟蹤由每個Spout元組觸發的消息的處理狀態。當一個"acker"看到一個Spout元組產生的有向無環圖中的消息被完全處理,就通知當初創建這個Spout元組的Spout任務,這個元組被成功處理。可以通過拓撲配置項Config.TOPOLOGY_ACKER_EXECUTORS來設置一個拓撲中acker任務executor
的數量。Storm默認TOPOLOGY_ACKER_EXECUTORS
和拓撲中配置的Worker的數量相同(關於executor和Worker的介紹,參見理解Storm並發一文)--對於需要處理大量消息的拓撲來說,需要增大acker executor的數量。
元組的生命周期###
理解Storm的可靠性實現方式的最好方法是查看元組的生命周期和元組構成的有向無環圖。當拓撲的Spout或者Bolt中創建一個元組時,都會被賦予一個隨機的64比特的標識(message id)。acker任務使用這些id來跟蹤每個Spout元組產生的有向無環圖的處理狀態。在Bolt中產生一個新的元組時,會從錨定的一個或多個輸入元組中拷貝所有Spout元組的message-id,所以每個元組都攜帶了自己所在元組樹的根節點Spout元組的message-id。當確認一個元組處理成功了,Storm就會給對應的acker任務發送特定的消息--通知acker當前這個Spout元組產生的消息樹中某個消息處理完了,而且這個特定消息在消息樹中又產生了一個新消息(新消息錨定的輸入是這個特定的消息)。
舉個例子,假設"D"元組和"E"元組是基於“C”元組產生的,那么下圖描述了確認“C”元組成功處理后,元組樹的變化。圖中虛線框表示的元組代表已經在消息樹上被刪除了:
確認元組成功處理后消息樹的變化
由於在“C”從消息樹中刪除(通過acker函數確認成功處理)的同時,“D”和“E”也被添加到(通過emit函數來錨定的)元組樹中,所以這棵樹從來不會被提早處理完。
正如上面已經提到的,在一個拓撲中,可以有任意數量的acker任務。這導致了如下的兩個問題:
- 當拓撲中的一個元組確認被處理完,或者產生一個新的元組時,Storm應該通知哪個acker任務?
- 通知了acker任務后,acker任務如何通知到對應的Spout任務?
Storm采用對元組中攜帶的Spout元組message-id哈希取模的方法來把一個元組映射到一個acker任務上(所以同一個消息樹里的所有消息都會映射到同一個acker任務)。因為每個元組攜帶了自己所處的元組樹中根節點Spout元組(可能有多個)的標識,所以Storm就能決定通知哪個acker任務。
當一個Spout任務產出一個新的元組,僅需要簡單的發送一個消息給對應的acker(Spout元組message-id哈希取模)來告知Spout的任務標示(task id),以此來通知acker當前這個Spout任務負責這個消息。當acker看到一個消息樹被完全處理完,它就能根據處理的元組中攜帶的Spout元組message-id來確定產生這個Spout元組的task id,然后通知這個Spout任務消息樹處理完成(調用 Spout任務的ack
函數)。
實現細節###
對於擁有上萬節點(或者更多)的巨大的元組樹,跟蹤所有的元組樹會耗盡acker使用的內存。acker任務不顯示地(記錄完整的樹型結構)跟蹤元組樹,相反它使用了一種每個Spout元組只占用固定大小空間(大約20字節)的策略。這個跟蹤算法是Storm工作的關鍵,而且是重大突破之一。
一個acker任務存儲了從一個Spout元組message-id到一對值的映射關系spout-message-id--><spout-task-id, ack-val>
。第一個值是創建了這個Spout元組的任務id,用來后續處理完成時通知到這個Spout任務。第二個值是一個64比特的叫做“ack val”的數值。它是簡單的把消息樹中所有被創建或者被確認的元組message-id異或起來的值。每個消息創建和被確認處理后都會異或到"ack val"上,A xor A = 0
,所以當一個“ack val”變成了0,說明整個元組樹都完全被處理了。無論是很大的還是很小的元組樹,"ack val"值都代表了整個元組樹中消息的處理狀態。由於元組message-id是隨機的64比特的整數,所以同一個元組樹中不同元組message-id發生撞車的可能性特別小,因此“ack val”意外的變成0的可能性非常小。如果真的發生了這種情況,而恰好這個元組也處理失敗了,那僅僅會導致這個元組的數據丟失。
使用異或操作來跟蹤消息樹處理狀態的想法非常有才。因為消息的數量可能有成千上萬條,每個都單獨跟蹤(讀者可以思考下怎么搞)是非常低效而且不可水平擴展的。而且采用異或的方式后,就不依賴於acker接收到消息的順序了。
搞明白了可靠性的算法,讓我們看看所有失敗的場景下Storm如何避免數據丟失:
- Bolt任務掛掉:導致一個元組沒有被確認,這種場景下,這個元組所在的消息樹中的根節點Spout元組會超時並被重新處理
- acker任務掛掉:這種場景下,這個acker掛掉時正在跟蹤的所有的Spout元組都會超時並被重新處理
- Spout任務掛掉:這種場景下,需要應用自己實現檢查點機制,記錄當前Spout成功處理的進度,當Spout任務掛掉之后重啟時,繼續從當前檢查點處理,這樣就能重新處理失敗的那些元組了。
調整可靠性##
acker任務是輕量級的,所以在一個拓撲中不需要太多的acker任務。可以通過Storm UI(id為"__acker"的組件)來觀察acker任務的性能。如果吞吐量看起來不正常,就需要添加更多的acker任務。
去掉可靠性##
如果可靠性無關緊要--例如你不關心元組失敗場景下的消息丟失--那么你可以通過不跟蹤元組的處理過程來提高性能。不跟蹤一個元組樹會讓傳遞的消息數量減半,因為正常情況下,元組樹中的每個元組都會有一個確認消息。另外,這也能減少每個元組需要存儲的id的數量(指每個元組存儲的Spout message-id),減少了帶寬的使用。
有三種方法來去掉可靠性:
- 設置
Config.TOPOLOGY_ACKERS
為0。這種情況下,Storm會在Spout吐出一個元組后立馬調用Spout的ack
函數。這個元組樹不會被跟蹤。 - 當產生一個新元組調用
emit
函數的時候通過忽略消息message-id參數來關閉這個元組的跟蹤機制。 - 如果你不關心某一類特定的元組處理失敗的情況,可以在調用
emit
的時候不要使用錨定。由於它們沒有被錨定到某個Spout元組上,所以當它們沒有被成功處理,不會導致Spout元組處理失敗。
參考資料##
Guaranteeing message processing
Fault Tolerant Message Processing in Storm
如果您看了本篇博客,覺得對您有所收獲,請點擊右下角的“推薦”,讓更多人看到!

