並行度###
在Storm集群中真正運行Topology的主要有三個實體:worker、executor、task,下圖是可以表示他們之間的關系。

數據流模型
對於一個Spout或Bolt,都會有多個task線程來運行,那么如何在兩個組件(Spout和Bolt)之間發送tuple元組呢?Storm提供了若干種數據流分發(Stream Grouping)策略用來解決這一問題。在Topology定義時,需要為每個Bolt指定接收什么樣的Stream作為其輸入(注:Spout並不需要接收Stream,只會發射Stream)。
目前Storm中提供了以下7種Stream Grouping策略:Shuffle Grouping、Fields Grouping、All Grouping、Global Grouping、Non Grouping、Direct Grouping、Local or shuffle grouping。
- Shuffle Grouping: 隨機分組, 隨機派發stream里面的tuple,保證每個bolt接收到的tuple數目大致相同。
- Fields Grouping:按字段分組, 比如按userid來分組, 具有同樣userid的tuple會被分到相同的Bolts里的一個task(這句話很關鍵,代表了我對storm的理解。)而不同的userid則會被分配到不同的bolts里的task。
- All Grouping:廣播發送,對於每一個tuple,所有的bolts都會收到。
- Global Grouping:全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。
- Non Grouping:不分組,這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程里面去執行。
- Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪個task處理這個消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)。
- Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作進程worker中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。
一個運行中Topology的例子:
Topology里包含了三個component,一個是Blue Spout,另外兩個分別是Green Bolt和Yellow Bolt。

Config conf = new Config();
conf.setNumWorkers(2);
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2);
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout");
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
.shuffleGrouping("green-bolt");
StormSubmitter.submitTopology(
"mytopology",
conf,
topologyBuilder.createTopology()
);
圖和代碼, 很清晰, 通過setBolt和setSpout一共定義2+2+6=10個executor threads 。並且同setNumWorkers設置2個workers, 所以storm會平均在每個worker上run 5個executors (線程)。而對於green-bolt, 定義了4個tasks, 所以每個executor中有2個tasks。
總結
- 一個Topology可以包含多個worker ,一個worker只能對應於一個topology。worker process是一個topology的子集。
- 一個worker可以包含多個executor,一個executor只能對應於一個component(spout或者bolt)。
- Task就是具體的處理邏輯, 一個executor線程可以執行一個或多個tasks。線程就是資源,task就是要運行的任務。
消息的可靠處理
Storm記錄級容錯
首先來看一下什么叫做記錄級容錯?storm允許用戶在spout中發射一個新的源tuple時為其指定一個message id, 這個message id可以是任意的object對象。多個源tuple可以共用一個message id,表示這多個源 tuple對用戶來說是同一個消息單元。storm中記錄級容錯的意思是說,storm會告知用戶每一個消息單元是否在指定時間內被完全處理了。那什么叫做完全處理呢,就是該message id綁定的源tuple及由該源tuple后續生成的tuple經過了topology中每一個應該到達的bolt的處理。舉個例子。在下圖中,在spout由message 1綁定的tuple1和tuple2經過了bolt1和bolt2的處理生成兩個新的tuple,並最終都流向了bolt3。當這個過程完成處理完時,稱message 1被完全處理了。

在storm的topology中有一個系統級組件,叫做acker。這個acker的任務就是追蹤從spout中流出來的每一個message id綁定的若干tuple的處理路徑,如果在用戶設置的最大超時時間內這些tuple沒有被完全處理,那么acker就會告知spout該消息處理失敗了,相反則會告知spout該消息處理成功了。在剛才的描述中,我們提到了”記錄tuple的處理路徑”,storm中卻是使用了一種非常巧妙的方法做到了。在說明這個方法之前,我們來復習一個數學定理。
A xor A = 0.
A xor B…xor B xor A = 0,其中每一個操作數出現且僅出現兩次。
storm中使用的巧妙方法就是基於這個定理。具體過程是這樣的:在spout中系統會為用戶指定的message id生成一個對應的64位整數,作為一個root id。root id會傳遞給acker及后續的bolt作為該消息單元的唯一標識。同時無論是spout還是bolt每次新生成一個tuple的時候,都會賦予該tuple一個64位的整數的id。Spout發射完某個message id對應的源tuple之后,會告知acker自己發射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一個輸入tuple處理完之后,也會告知acker自己處理的輸入tuple的id及新生成的那些tuple的id。Acker只需要對這些id做一個簡單的異或運算,就能判斷出該root id對應的消息單元是否處理完成了。下面通過一個圖示來說明這個過程。
第一步:初始化,spout中綁定message 1生成了兩個源tuple,id分別是0010和1011.

第二步:計算一個turple達到第1個bolt

第三步:計算一個turple達到第2個bolt

第四步:消息到達最后一個bolt

即在正常情況下,每個id都會且只會被異或兩次,因此最后的結果一定是0,但是容錯過程存在一個可能出錯的地方,那就是,如果生成的tuple id並不是完全各異的,acker可能會在消息單元完全處理完成之前就錯誤的計算為0。這個錯誤在理論上的確是存在的,但是在實際中其概率是極低極低的,完全可以忽略。
高可靠性下Spout需要做些什么
當Spout從隊列中讀取一個消息,表示它“打開”了隊列中某個消息,這意味着,此消息並未從隊列中真正刪除,而是被置為“pending”狀態,它等待來自客戶端的應答,被應答之后,此消息才會被真正從隊列中刪除。處於“pending”狀態的消息不會被其他客戶端看到。另外,如果一個客戶端意外斷開連接,則由此客戶端“打開”的所有消息都會被重新加入到隊列中。當消息被“打開”的時候,隊列同時會為這個消息提供一個唯一的標識。
Spout使用這個唯一標識作為這個tuple的id,當ack或fail被調用時,Spout會把ack或者fail連同id一起發送給隊列,隊列會將消息從隊列中真正刪除或者將它重新放回隊列中。
選擇合適的可靠性級別
如果並不需要每個消息必須被處理,那么可以關閉消息的可靠性機制,從而獲取較好的性能。關閉消息的可靠處理機制意味着系統中的消息數會減半(每個消息不需要應答了)。另外,關閉消息的可靠性處理機制可以減少消息的大小(不需要每個tuple記錄它的根id),從而節省帶寬。
有三種方法調整消息的可靠處理機制:
- 將參數Config.TOPOLOGY_ACKERS設置為0,通過此方法,當Spout發送一個消息時,它的ack方法將立即被調用。
- 第二種方法是Spout發送一個消息時,不指定此消息的id。當需要關閉特定消息的可靠性時,可以使用此方法。
- 如果不在意某個消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發送時不要做錨定,即在emit方法中不指定輸入消息。因此這些子孫消息沒有被錨定在任何tuple tree中,因此他們的失敗不會引起任何Spout重新發送消息。
集群中各級容錯
- Bolt任務crash引起的消息未被應答。此時,acker中所有與此Bolt任務關聯的消息都會因為超時而失敗,對應Spout的fail方法將會被調用。
- acker任務本身失敗,它在失敗之前持有的所有消息都將因超時而失敗。Spout的fail方法將被調用。
- Spout任務失敗,隊列會將處於pending狀態的所有消息重新放回隊列里。
- Worker失敗,Supervisor負責監控Worker中的任務,Supervisor會嘗試在本機重啟它。
- Supervisor失敗,由於Supervisor是無狀態的,只需將它重新啟動即可。
- Nimbus失敗。由於Nimbus是無狀態的,只需將它重新啟動即可。
- Storm中的集群節點故障,此時Nimbus會將此機器上所有正在運行的任務轉移到其他可用的機器上運行。
- Zookeeper集群中的節點故障,Zookeeper保證少於半數的機器宕機系統仍可正常運行,及時修復故障機器即可。
一致性事務
Storm如何實現既對tuple並行處理,又保證事務性呢?這里先從簡單的事務性實現方法入手,逐步引出Transactional Topology的原理。
強順序流
將tuple流變成強順序性的,並且每次只處理一個tuple。從1開始,給每個tuple都順序加上一個id,在處理tuple時,將處理成功的tuple id和計算結果存在數據庫中。下一個tuple到來時,將其id與數據庫中的id作比較,如果相同,則說明這個tuple已經被成功處理過了,那么忽略,如果不同,則將它的id和計算結果更新到數據庫中。
但是這種機制使得系統一次只能處理一個tuple,無法實現分布式計算。

強順序batch流
為了實現分布式,我們可以每次處理一批tuple,即一個batch,每個bacth中的tuple可以並行處理。這樣數據庫里存的就是bacth id和bacth的計算結果。
但是這種機制每次只能處理一個batch,batch之間無法並行。

CoordinateBolt的原理
- 每個CoordinateBolt記錄兩個值:有哪些task給我發送了tuple以及我要給哪些task發送信息。
- 真正執行任務的bolt是real bolt,它發出一個tuple后,其外層的CoordinateBolt會記錄下這個tuple發送給了哪個task。
- 所有的tuple發送完了以后,CoordinateBolt會告訴它發送過tuple的task,它發送了多少tuple給這個task,下游task會將這個數字和自己接收到的tuple數量做對比,如果相等,則說明處理完了所有的tuple。

Transactional Topology
Storm提供的Transactional Topology將batch計算分為process和commit兩個階段,process階段可以同時處理多個batch,不用保證順序性;commit階段保證batch的強順序性,並且一次只能處理一個batch,第一個batch成功提交之前,第二個batch不能被提交。
Transactional Topology里發送的tuple都必須以TransactionAttempt作為第一個field,Storm根據這個field來判斷tuple屬於哪一個batch。
TransactionAttempt包含兩個值:一個是transaction id,另一個是attempt id,transaction id對於每個batch中的tuple是唯一的,不管replay多少次都是一樣的。attempt id是每個batch唯一的一個id,但是對於同一個batch,它replay之后的attempt id跟replay之前不一樣。
當bolt收到某個batch所有的tuple以后,finishBatch會被調用,將當前的transaction id與數據庫中存儲的id做比較,如果相同則忽略,不同就把這個batch的計算結果加到總結果中,並更新數據庫。

- TransactionSpout只能有一個,它將所有tuple分組為一個一個的batch,而且保證同一個batch的transaction id始終一樣。
- BatchBolt處理一個batch中所有的tuples。對於每一個tuple調用execute方法,而在整個batch處理完成時調用finishBatch方法。
- 如果BatchBolt被標記為Committer,則只能在Committer階段調用finishBolt方法,並且在commit階段batch是強順序性的。
