Storm(三)Storm的原理機制


一.Storm的數據分發策略

1. Shuffle Grouping 

隨機分組,隨機派發stream里面的tuple,保證每個bolt task接收到的tuple數目大致相同。 輪詢,平均分配 

2. Fields Grouping

按字段分組,比如,按"user-id"這個字段來分組,那么具有同樣"user-id"的 tuple 會被分到相同的Bolt里的一個task, 而不同的"user-id"則可能會被分配到不同的task。 

3. All Grouping

廣播發送,對於每一個tuple,所有的bolts都會收到 

4. Global Grouping

全局分組,把tuple分配給task id最低的task 。

5. None Grouping

不分組,這個分組的意思是說stream不關心到底怎樣分組。目前這種分組和Shuffle grouping是一樣的效果。 有一點不同的是storm會把使用none grouping的這個bolt放到這個bolt的訂閱者同一個線程里面去執行(未來Storm如果可能的話會這樣設計)。 

6. Direct Grouping

指向型分組, 這是一種比較特別的分組方法,用這種分組意味着消息(tuple)的發送者指定由消息接收者的哪個task處理這個消息。只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法。而且這種消息tuple必須使用 emitDirect 方法來發射。消息處理者可以通過 TopologyContext 來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)  

7. Local or shuffle grouping

本地或隨機分組。如果目標bolt有一個或者多個task與源bolt的task在同一個工作進程中,tuple將會被隨機發送給這些同進程中的tasks。否則,和普通的Shuffle Grouping行為一致

8.customGrouping

自定義,相當於mapreduce那里自己去實現一個partition一樣。

二.Storm的並發機制

Worker – 進程

一個Topology拓撲會包含一個或多個Worker(每個Worker進程只能從屬於一個特定的Topology) 這些Worker進程會並行跑在集群中不同的服務器上,即一個Topology拓撲其實是由並行運行在Storm集群中多台服務器上的進程所組成

Executor – 線程

Executor是由Worker進程中生成的一個線程 每個Worker進程中會運行拓撲當中的一個或多個Executor線程 一個Executor線程中可以執行一個或多個Task任務(默認每個Executor只執行一個Task任務),但是這些Task任務都是對應着同一個組件(Spout、Bolt)。

Task

實際執行數據處理的最小單元 每個task即為一個Spout或者一個Bolt Task數量在整個Topology生命周期中保持不變,Executor數量可以變化或手動調整 (默認情況下,Task數量和Executor是相同的,即每個Executor線程中默認運行一個Task任務)

設置Worker進程數

Config.setNumWorkers(int workers)

設置Executor線程數

TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint) ,TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint) :其中, parallelism_hint即為executor線程數

設置Task數量

ComponentConfigurationDeclarer.setNumTasks(Number val)

例:

Rebalance – 再平衡

即,動態調整Topology拓撲的Worker進程數量、以及Executor線程數量

支持兩種調整方式: 1、通過Storm UI 2、通過Storm CLI

通過Storm CLI動態調整: storm help rebalance

例:storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 將mytopology拓撲worker進程數量調整為5個, “ blue-spout ” 所使用的線程數量調整為3個 ,“ yellow-bolt ”所使用的線程數量調整為10個。

三.Storm的通信機制

Worker進程間的數據通信

ZMQ ZeroMQ 開源的消息傳遞框架,並不是一個MessageQueue Netty Netty是基於NIO的網絡框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因為ZMQ的license和Storm的license不兼容。)

Worker內部的數據通信

Disruptor 實現了“隊列”的功能。 可以理解為一種事件監聽或者消息處理機制,即在隊列當中一邊由生產者放入消息數據,另一邊消費者並行取出消息數據處理。

Worker內部的消息傳遞機制

四.Storm的容錯機制

1、集群節點宕機

Nimbus服務器 單點故障? 非Nimbus服務器 故障時,該節點上所有Task任務都會超時,Nimbus會將這些Task任務重新分配到其他服務器上運行

2、進程掛掉

Worker 掛掉時,Supervisor會重新啟動這個進程。如果啟動過程中仍然一直失敗,並且無法向Nimbus發送心跳,Nimbus會將該Worker重新分配到其他服務器上 Supervisor 無狀態(所有的狀態信息都存放在Zookeeper中來管理) 快速失敗(每當遇到任何異常情況,都會自動毀滅) Nimbus 無狀態(所有的狀態信息都存放在Zookeeper中來管理) 快速失敗(每當遇到任何異常情況,都會自動毀滅)

3、消息的完整性

從Spout中發出的Tuple,以及基於他所產生Tuple(例如上個例子當中Spout發出的句子,以及句子當中單詞的tuple等) 由這些消息就構成了一棵tuple樹 當這棵tuple樹發送完成,並且樹當中每一條消息都被正確處理,就表明spout發送消息被“完整處理”,即消息的完整性

Acker -- 消息完整性的實現機制 Storm的拓撲當中特殊的一些任務 負責跟蹤每個Spout發出的Tuple的DAG(有向無環圖)

五.Storm的DRPC

DRPC (Distributed RPC) 分布式遠程過程調用

DRPC 是通過一個 DRPC 服務端(DRPC server)來實現分布式 RPC 功能的。 DRPC Server 負責接收 RPC 請求,並將該請求發送到 Storm中運行的 Topology,等待接收 Topology 發送的處理結果,並將該結果返回給發送請求的客戶端。 (其實,從客戶端的角度來說,DPRC 與普通的 RPC 調用並沒有什么區別。)

DRPC設計目的: 為了充分利用Storm的計算能力實現高密度的並行實時計算。 (Storm接收若干個數據流輸入,數據在Topology當中運行完成,然后通過DRPC將結果進行輸出。)

客戶端通過向 DRPC 服務器發送待執行函數的名稱以及該函數的參數來獲取處理結果。實現該函數的拓撲使用一個DRPCSpout 從 DRPC 服務器中接收一個函數調用流。DRPC 服務器會為每個函數調用都標記了一個唯一的 id。隨后拓撲會執行函數來計算結果,並在拓撲的最后使用一個名為 ReturnResults 的 bolt 連接到 DRPC 服務器,根據函數調用的 id 來將函數調用的結果返回。

定義DRPC拓撲:

方法1: 通過LinearDRPCTopologyBuilder (該方法也過期,不建議使用) 該方法會自動為我們設定Spout、將結果返回給DRPC Server等,我們只需要將Topology實現

方法2: 直接通過普通的拓撲構造方法TopologyBuilder來創建DRPC拓撲 需要手動設定好開始的DRPCSpout以及結束的ReturnResults

運行模式:

1、本地模式

2.遠程模式(集群模式)

修改配置文件conf/storm.yaml drpc.servers: - "node21“ 啟動DRPC Server bin/storm drpc & 通過StormSubmitter.submitTopology提交拓撲

六.Storm的事務

事務性拓撲(Transactional Topologies)

保證消息(tuple)被且僅被處理一次

Design 1

強順序流(強有序) 引入事務(transaction)的概念,每個transaction(即每個tuple)關聯一個transaction id。 Transaction id從1開始,每個tuple會按照順序+1。 在處理tuple時,將處理成功的tuple結果以及transaction id同時寫入數據庫中進行存儲。

兩種情況:

1、當前transaction id與數據庫中的transaction id不一致

2、兩個transaction id相同

缺點: 一次只能處理一個tuple,無法實現分布式計算

Design 2

強順序的Batch流

 

 

事務(transaction)以batch為單位,即把一批tuple稱為一個batch,每次處理一個batch。 每個batch(一批tuple)關聯一個transaction id ,每個batch內部可以並行計算

缺點

Design 3

Storm's design

將Topology拆分為兩個階段:

1、Processing phase 允許並行處理多個batch

2、Commit phase 保證batch的強有序,一次只能處理一個batch

Design details

Manages state - 狀態管理

Storm通過Zookeeper存儲所有transaction相關信息(包含了:當前transaction id 以及batch的元數據信息)

Coordinates the transactions - 協調事務

Storm會管理決定transaction應該處理什么階段(processing、committing)

Fault detection - 故障檢測

Storm內部通過Acker機制保障消息被正常處理(用戶不需要手動去維護)

First class batch processing API

Storm提供batch bolt接口

三種事務:

1、普通事務

2、Partitioned Transaction - 分區事務

3、Opaque Transaction - 不透明分區事務


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM