Storm是一個分布式的、高容錯的實時計算系統。Storm適用的場景:
- Storm可以用來用來處理源源不斷的消息,並將處理之后的結果保存到持久化介質中。
- 由於Storm的處理組件都是分布式的,而且處理延遲都極低,所以可以Storm可以做為一個通用的分布式RPC框架來使用。(實時計算?)
Storm集群架構
Storm集群采用主從架構方式,主節點是Nimbus,從節點是Supervisor,有關調度相關的信息存儲到ZooKeeper集群中,架構如下圖所示

- Nimbus:Storm集群的Master節點,負責分發用戶代碼,指派給具體的Supervisor節點上的Worker節點,去運行Topology對應的組件(Spout/Bolt)的Task。
- Supervisor:Storm集群的從節點,負責管理運行在Supervisor節點上的每一個Worker進程的啟動和終止。
- ZooKeeper:
- 存儲客戶端提供的topology任務信息,nimbus負責將任務分配信息寫入Zookeeper,supervisor從Zookeeper上讀取任務分配信息
- 存儲supervisor和worker的心跳(包括它們的狀態),使得nimbus可以監控整個集群的狀態, 從而重啟一些掛掉的worker
- 存儲整個集群的所有狀態信息和配置信息。
組件抽象
我們先看一下,Topology提交到Storm集群后的運行時部署分布圖,如下圖所示:

通過上圖我們可以看出,一個Topology的Spout/Bolt對應的多個Task可能分布在多個Supervisor的多個Worker內部。而每個Worker內部又存在多個Executor,根據實際對Topology的配置在運行時進行計算並分配。
- Topology:Storm對一個分布式計算應用程序的抽象,目的是通過一個實現Topology能夠完整地完成一件事情(從業務角度來看)。一個Topology是由一組靜態程序組件(Spout/Bolt)、組件關系Streaming Groups這兩部分組成。
- Spout:描述了數據是如何從外部系統(或者組件內部直接產生)進入到Storm集群
- Bolt:描述了與業務相關的處理邏輯。
- Task:Spout/Bolt在運行時所表現出來的實體,都稱為Task(多個)
- Worker:運行時Task所在的一級容器,Executor運行於Worker中,一個Worker對應於Supervisor上創建的一個JVM實例(和Spark一樣的概念)
- Executor:運行時Task所在的直接容器,在Executor中執行Task的處理邏輯
數據流
storm是流式計算框架,數據源源不斷地到來。storm中,每條消息稱為元組,我們可以把這條消息靈活地看作KV結構。
容錯
一般而言,數據在節點間的傳遞次數氛圍以下三種,storm根據用戶指定,選擇這三種類型的數據保證:
- 至少一次:節點收到相同的數據一次或者多次
- 至多一次:節點最多收到一次數據(S4),沒有容錯
- 只有一次:計算正確性的必須保證(strom特有的可以選擇數據只被計算一次,防止有些有狀態的任務,多次計算后出錯)
至少送達一次
storm在保證數據被傳遞到所有節點方面做得非常巧妙。
- 對於每條消息i,賦予64bit長度的ID。同時,在一張表T(表是邏輯上的,通過一致性hash來找到對應的消息)上記錄這條消息的初始ID di = ID。
- 在每個后繼節點上,每產生一條消息,就會生成一個隨機ID。然后更新di = di ^ 消息輸入ID ^ (消息輸出ID1 ^ 消息輸出ID2 ……)
- 當di = 0,說明已經成功處理消息。
- storm定期掃描T,對於沒有成功更新為0的消息,重新發送。對於這種機制,我們可以發現,存在誤判的概率2^(-64)
上面說得有點抽象,舉例:


上面圖12-10不對,系統表T是這樣的
| header 1 | header 2 |
|---|---|
| d1 | [11011->0] |
| d2 | [11101->0] |
原理:每一條消息ID,我們都會異或兩次,一次是前驅節點在產生這條消息的時候,一次是在后繼節點消費這條消息后。
Trident
通過Trident API同時實現了狀態持久化和“恰好送達一次”
- 將多條數據封裝為一個batch(這不成批處理了嗎),每個batch都有一個遞增的ID(要有一個全局遞增ID,需要zookeeper的幫助啊)。通過前面的至少發送一次機制,發現某個ID的batch處理失敗,重新發送這個batch,ID不變
- 各個計算節點在計算過程中通過ID提交狀態,如果發現已經有這個ID的狀態,則放棄本次計算。
- ID必須順序提交
storm高可用性(HA)
- 如果woker掛了,supervisor會重新創建
- bolt任務失敗直接重新啟動
- Spout任務失敗,會再次從數據源拿數據
- 如果機器節點掛了,nimbus會把該節點上的task轉移到其他節點
- 如果nimbus或者supervisor掛了,重啟就行了。nimbus和supervisor被設計成無狀態,- 狀態都被存到zookeeper里面了
- 為防止nimbus掛掉,worker節點也掛掉,導致任務無法被nimbus轉移到其他機器。nimbus也被設計成HA的,利用主從結構保證主節點掛了之后從節點一樣能服務
Stream Groupings(shuffle)
Storm中最重要的抽象,應該就是Stream grouping了,它能夠控制Spot/Bolt對應的Task以什么樣的方式來分發Tuple,將Tuple發射到目的Spot/Bolt對應的Task,如下圖所示:

- Shuffle Grouping:隨機分組,跨多個Bolt的Task,能夠隨機使得每個Bolt的Task接收到大致相同數目的Tuple,但是Tuple不重復
- Fields Grouping:根據指定的Field進行分組 ,同一個Field的值一定會被發射到同一個Task上
- Partial Key Grouping:與Fields grouping 類似,根據指定的Field的一部分進行分組分發
- All Grouping:所有Bolt的Task都接收同一個Tuple(這里有復制的含義)
- Global Grouping:所有的流都指向一個Bolt的同一個Task(也就是Task ID最小的)
- None Grouping:不需要關心Stream如何分組,等價於Shuffle grouping
- Direct Grouping:由Tupe的生產者來決定發送給下游的哪一個Bolt的Task ,這個要在實際開發編寫Bolt代碼的邏輯中進行精確控制
- Local or Shuffle Grouping:如果目標Bolt有1個或多個Task都在同一個Worker進程對應的JVM實例中,則Tuple只發送給這些Task
最常用的應該是Shuffle Grouping(隨機)、Fields Grouping(哈希)、Direct Grouping(用戶指定)這三種
Topology並行度計算

conf.setNumWorkers(2); // 該Topology運行在Supervisor節點的2個Worker進程中
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // 設置並行度為2,則Task個數為2*1
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("blue-spout"); // 設置並行度為2,設置Task個數為4 ,則Task個數為4
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
.shuffleGrouping("green-bolt"); // 設置並行度為6,則Task個數為6*1

一共有12個任務,10的並行度,2個work。所以一個work里面有5個executor,6個task(分別是1藍,2綠,3黃),storm會把同類型的Task盡量放到同一個Executor中運行,Task個數最少的開始分配。
work和excutor內部構造
- 同一work不同executor的兩個task消息傳遞

- 不同work不同executor的兩個task消息傳遞

上圖我們還能看出同一executor不同task之間的消息傳遞
總結:
- 同一work之內的消息傳遞都要通過executor的消息收發線程
- 不同work的消息傳遞要通過work的收發線程
- 每個Executor應該維護Task與所在的Executor之間的關系,這樣才能正確地將Tuple傳輸到目的Bolt Task進行處理。
