Storm:分布式流式計算框架


Storm是一個分布式的、高容錯的實時計算系統。Storm適用的場景:

  1. Storm可以用來用來處理源源不斷的消息,並將處理之后的結果保存到持久化介質中。
  2. 由於Storm的處理組件都是分布式的,而且處理延遲都極低,所以可以Storm可以做為一個通用的分布式RPC框架來使用。(實時計算?)

Storm集群架構

Storm集群采用主從架構方式,主節點是Nimbus,從節點是Supervisor,有關調度相關的信息存儲到ZooKeeper集群中,架構如下圖所示
image

  • Nimbus:Storm集群的Master節點,負責分發用戶代碼,指派給具體的Supervisor節點上的Worker節點,去運行Topology對應的組件(Spout/Bolt)的Task。
  • Supervisor:Storm集群的從節點,負責管理運行在Supervisor節點上的每一個Worker進程的啟動和終止。
  • ZooKeeper:
    • 存儲客戶端提供的topology任務信息,nimbus負責將任務分配信息寫入Zookeeper,supervisor從Zookeeper上讀取任務分配信息
    • 存儲supervisor和worker的心跳(包括它們的狀態),使得nimbus可以監控整個集群的狀態, 從而重啟一些掛掉的worker
    • 存儲整個集群的所有狀態信息和配置信息。

組件抽象

我們先看一下,Topology提交到Storm集群后的運行時部署分布圖,如下圖所示:
image
通過上圖我們可以看出,一個Topology的Spout/Bolt對應的多個Task可能分布在多個Supervisor的多個Worker內部。而每個Worker內部又存在多個Executor,根據實際對Topology的配置在運行時進行計算並分配。

  • Topology:Storm對一個分布式計算應用程序的抽象,目的是通過一個實現Topology能夠完整地完成一件事情(從業務角度來看)。一個Topology是由一組靜態程序組件(Spout/Bolt)、組件關系Streaming Groups這兩部分組成。
  • Spout:描述了數據是如何從外部系統(或者組件內部直接產生)進入到Storm集群
  • Bolt:描述了與業務相關的處理邏輯。
  • Task:Spout/Bolt在運行時所表現出來的實體,都稱為Task(多個)
  • Worker:運行時Task所在的一級容器,Executor運行於Worker中,一個Worker對應於Supervisor上創建的一個JVM實例(和Spark一樣的概念)
  • Executor:運行時Task所在的直接容器,在Executor中執行Task的處理邏輯

數據流

storm是流式計算框架,數據源源不斷地到來。storm中,每條消息稱為元組,我們可以把這條消息靈活地看作KV結構。

容錯

一般而言,數據在節點間的傳遞次數氛圍以下三種,storm根據用戶指定,選擇這三種類型的數據保證:

  1. 至少一次:節點收到相同的數據一次或者多次
  2. 至多一次:節點最多收到一次數據(S4),沒有容錯
  3. 只有一次:計算正確性的必須保證(strom特有的可以選擇數據只被計算一次,防止有些有狀態的任務,多次計算后出錯)

至少送達一次

storm在保證數據被傳遞到所有節點方面做得非常巧妙。

  1. 對於每條消息i,賦予64bit長度的ID。同時,在一張表T(表是邏輯上的,通過一致性hash來找到對應的消息)上記錄這條消息的初始ID di = ID。
  2. 在每個后繼節點上,每產生一條消息,就會生成一個隨機ID。然后更新di = di ^ 消息輸入ID ^ (消息輸出ID1 ^ 消息輸出ID2 ……)
  3. 當di = 0,說明已經成功處理消息。
  4. storm定期掃描T,對於沒有成功更新為0的消息,重新發送。對於這種機制,我們可以發現,存在誤判的概率2^(-64)

上面說得有點抽象,舉例:
image
image

上面圖12-10不對,系統表T是這樣的

header 1 header 2
d1 [11011->0]
d2 [11101->0]

原理:每一條消息ID,我們都會異或兩次,一次是前驅節點在產生這條消息的時候,一次是在后繼節點消費這條消息后。

Trident

通過Trident API同時實現了狀態持久化和“恰好送達一次”

  1. 將多條數據封裝為一個batch(這不成批處理了嗎),每個batch都有一個遞增的ID(要有一個全局遞增ID,需要zookeeper的幫助啊)。通過前面的至少發送一次機制,發現某個ID的batch處理失敗,重新發送這個batch,ID不變
  2. 各個計算節點在計算過程中通過ID提交狀態,如果發現已經有這個ID的狀態,則放棄本次計算。
  3. ID必須順序提交

storm高可用性(HA)

  • 如果woker掛了,supervisor會重新創建
    • bolt任務失敗直接重新啟動
    • Spout任務失敗,會再次從數據源拿數據
  • 如果機器節點掛了,nimbus會把該節點上的task轉移到其他節點
  • 如果nimbus或者supervisor掛了,重啟就行了。nimbus和supervisor被設計成無狀態,- 狀態都被存到zookeeper里面了
  • 為防止nimbus掛掉,worker節點也掛掉,導致任務無法被nimbus轉移到其他機器。nimbus也被設計成HA的,利用主從結構保證主節點掛了之后從節點一樣能服務

Stream Groupings(shuffle)

Storm中最重要的抽象,應該就是Stream grouping了,它能夠控制Spot/Bolt對應的Task以什么樣的方式來分發Tuple,將Tuple發射到目的Spot/Bolt對應的Task,如下圖所示:

image

  1. Shuffle Grouping:隨機分組,跨多個Bolt的Task,能夠隨機使得每個Bolt的Task接收到大致相同數目的Tuple,但是Tuple不重復
  2. Fields Grouping:根據指定的Field進行分組 ,同一個Field的值一定會被發射到同一個Task上
  3. Partial Key Grouping:與Fields grouping 類似,根據指定的Field的一部分進行分組分發
  4. All Grouping:所有Bolt的Task都接收同一個Tuple(這里有復制的含義)
  5. Global Grouping:所有的流都指向一個Bolt的同一個Task(也就是Task ID最小的)
  6. None Grouping:不需要關心Stream如何分組,等價於Shuffle grouping
  7. Direct Grouping:由Tupe的生產者來決定發送給下游的哪一個Bolt的Task ,這個要在實際開發編寫Bolt代碼的邏輯中進行精確控制
  8. Local or Shuffle Grouping:如果目標Bolt有1個或多個Task都在同一個Worker進程對應的JVM實例中,則Tuple只發送給這些Task

最常用的應該是Shuffle Grouping(隨機)、Fields Grouping(哈希)、Direct Grouping(用戶指定)這三種

Topology並行度計算

image

conf.setNumWorkers(2); // 該Topology運行在Supervisor節點的2個Worker進程中

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // 設置並行度為2,則Task個數為2*1

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout"); // 設置並行度為2,設置Task個數為4 ,則Task個數為4

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
               .shuffleGrouping("green-bolt"); // 設置並行度為6,則Task個數為6*1

image

一共有12個任務,10的並行度,2個work。所以一個work里面有5個executor,6個task(分別是1藍,2綠,3黃),storm會把同類型的Task盡量放到同一個Executor中運行,Task個數最少的開始分配。

work和excutor內部構造

  • 同一work不同executor的兩個task消息傳遞
    image
  • 不同work不同executor的兩個task消息傳遞
    image

上圖我們還能看出同一executor不同task之間的消息傳遞

總結:

  1. 同一work之內的消息傳遞都要通過executor的消息收發線程
  2. 不同work的消息傳遞要通過work的收發線程
  3. 每個Executor應該維護Task與所在的Executor之間的關系,這樣才能正確地將Tuple傳輸到目的Bolt Task進行處理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM