Storm概念講解和工作原理介紹


Strom的結構

Storm概念講解和工作原理介紹 

Storm與傳統關系型數據庫 
    傳統關系型數據庫是先存后計算,而storm則是先算后存,甚至不存 
    傳統關系型數據庫很難部署實時計算,只能部署定時任務統計分析窗口數據 
    關系型數據庫重視事務,並發控制,相對來說Storm比較簡陋 
    Storm不Hadoop,Spark等是流行的大數據方案 

與Storm關系密切的語言:核心代碼用clojure書寫,實用程序用python開發,使用java開發拓撲 

topology

    Storm集群中有兩種節點,一種是控制節點(Nimbus節點),另一種是工作節點(Supervisor節點)。所有Topology任務的 提交必須在Storm客戶端節點上進行(需要配置 storm.yaml文件),由Nimbus節點分配給其他Supervisor節點進行處理。 Nimbus節點首先將提交的Topology進行分片,分成一個個的Task,並將Task和Supervisor相關的信息提交到 zookeeper集群上,Supervisor會去zookeeper集群上認領自己的Task,通知自己的Worker進程進行Task的處理。 

    和同樣是計算框架的MapReduce相比,MapReduce集群上運行的是Job,而Storm集群上運行的是Topology。但是Job在運行結束之后會自行結束,Topology卻只能被手動的kill掉,否則會一直運行下去 

    Storm不處理計算結果的保存,這是應用代碼需要負責的事情,如果數據不大,你可以簡單地保存在內存里,也可以每次都更新數據庫,也可以采用NoSQL存儲。這部分事情完全交給用戶。 

    數據存儲之后的展現,也是你需要自己處理的,storm UI 只提供對topology的監控和統計。 

    總體的Topology處理流程圖為: 
Storm概念講解和工作原理介紹 

zookeeper集群

    storm使用zookeeper來協調整個集群, 但是要注意的是storm並不用zookeeper來傳遞消息。所以zookeeper上的負載是非常低的,單個節點的zookeeper在大多數情況下 都已經足夠了, 但是如果你要部署大一點的storm集群, 那么你需要的zookeeper也要大一點。關於如何部署zookeeper,可以看http://zookeeper.apache.org/doc /r3.3.3/zookeeperAdmin.html 

    部署zookeeper有些需要注意的地方: 
    1、對zookeeper做好監控非常重要, zookeeper是fail-fast的系統,只要出現什么錯誤就會退出, 所以實際場景中要監控,更多細節看http://zookeeper.apache.org/doc/r3.3.3 /zookeeperAdmin.html#sc_supervision 
    2、實際場景中要配置一個cron job來壓縮zookeeper的數據和業務日志。zookeeper自己是不會去壓縮這些的,所以你如果不設置一個cron job, 那么你很快就會發現磁盤不夠用了,更多細節可以查看http://zookeeper.apache.org/doc/r3.3.3 /zookeeperAdmin.html#sc_maintenance 

Component

    Storm中,Spout和Bolt都是Component。所以,Storm定義了一個名叫IComponent的總接口 
    全家普如下:綠色部分是我們最常用、比較簡單的部分。紅色部分是與事務相關的 
Storm概念講解和工作原理介紹

Spout

    Spout是Stream的消息產生源, Spout組件的實現可以通過繼承BaseRichSpout類或者其他Spout類來完成,也可以通過實現IRichSpout接口來實現 
public interface ISpout extends Serializable { 
  void open(Map conf, TopologyContext context, SpoutOutputCollector collector); 
  void close(); 
  void nextTuple(); 
  void ack(Object msgId); 
  void fail(Object msgId); 

    open()方法 -- 初始化方法 
    close() -- 在該spout將要關閉時調用。但是不保證其一定被調用,因為在集群中supervisor節點,可以使用kill -9來殺死worker進程。只有當Storm是在本地模式下運行,如果是發送停止命令,可以保證close的執行 
    ack(Object msgId) -- 成功處理tuple時回調的方法,通常情況下,此方法的實現是將消息隊列中的消息移除,防止消息重放 
    fail(Object msgId) -- 處理tuple失敗時回調的方法,通常情況下,此方法的實現是將消息放回消息隊列中然后在稍后時間里重放 
    nextTuple() -- 這是Spout類中最重要的一個方法。發射一個Tuple到Topology都是通過這個方法來實現的。調用此方法時,storm向spout發出請求, 讓spout發出元組(tuple)到輸出器(ouput collector)。這種方法應該是非阻塞的,所以spout如果沒有元組發出,這個方法應該返回。nextTuple、ack 和fail 都在spout任務的同一個線程中被循環調用。 當沒有元組的發射時,應該讓nextTuple睡眠一個很短的時間(如一毫秒),以免浪費太多的CPU。 
繼承了BaseRichSpout后,不用實現close、 activate、 deactivate、 ack、 fail 和 getComponentConfiguration 方法,只關心最基本核心的部分。 
通常情況下(Shell和事務型的除外),實現一個Spout,可以直接實現接口IRichSpout,如果不想寫多余的代碼,可以直接繼承BaseRichSpout 

Bolt

    Bolt類接收由Spout或者其他上游Bolt類發來的Tuple,對其進行處理。Bolt組件的實現可以通過繼承BasicRichBolt類或者IRichBolt接口等來完成 
    prepare方法 -- 此方法和Spout中的open方法類似,在集群中一個worker中的task初始化時調用。 它提供了bolt執行的環境 
    declareOutputFields方法 -- 用於聲明當前Bolt發送的Tuple中包含的字段(field),和Spout中類似 
    cleanup方法 -- 同ISpout的close方法,在關閉前調用。同樣不保證其一定執行。 
    execute方法 -- 這是Bolt中最關鍵的一個方法,對於Tuple的處理都可以放到此方法中進行。具體的發送是通過emit方法來完成的。execute接受一個 tuple進行處理,並用prepare方法傳入的OutputCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果。 
    Storm提供了IBasicBolt接口,其目的就是實現該接口的Bolt不用在代碼中提供反饋結果了,Storm內部會自動反饋成功。如果你確實要反饋失敗,可以拋出FailedException 
    通常情況下,實現一個Bolt,可以實現IRichBolt接口或繼承BaseRichBolt,如果不想自己處理結果反饋,可以實現 IBasicBolt接口或繼承BaseBasicBolt,它實際上相當於自動實現了collector.emit.ack(inputTuple) 

Topology運行流程

    (1)Storm提交后,會把代碼首先存放到Nimbus節點的inbox目錄下,之后,會把當前Storm運行的配置生成一個 stormconf.ser文件放到Nimbus節點的stormdist目錄中,在此目錄中同時還有序列化之后的Topology代碼文件 
    (2) 在設定Topology所關聯的Spouts和Bolts時,可以同時設置當前Spout和Bolt的executor數目和task數目,默認情況下, 一個Topology的task的總和是和executor的總和一致的。之后,系統根據worker的數目,盡量平均的分配這些task的執行。 worker在哪個supervisor節點上運行是由storm本身決定的 
    (3)任務分配好之后,Nimbus節點會將任務的信息提交到zookeeper集群,同時在zookeeper集群中會有workerbeats節點,這里存儲了當前Topology的所有worker進程的心跳信息 
    (4)Supervisor 節點會不斷的輪詢zookeeper集群,在zookeeper的assignments節點中保存了所有Topology的任務分配信息、代碼存儲目 錄、任務之間的關聯關系等,Supervisor通過輪詢此節點的內容,來領取自己的任務,啟動worker進程運行 
    (5)一個Topology運行之后,就會不斷的通過Spouts來發送Stream流,通過Bolts來不斷的處理接收到的Stream流,Stream流是無界的。 
    最后一步會不間斷的執行,除非手動結束Topology。 

Topology運行方式

    在開始創建項目之前,了解Storm的操作模式(operation modes)是很重要的。 Storm有兩種運行方式 
    本地運行的提交方式,例: 
LocalCluster cluster = new LocalCluster(); 
cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); 
Thread.sleep(2000); 
cluster.shutdown(); 
    分布式提交方式,例: 
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); 
  
    需要注意的是,在Storm代碼編寫完成之后,需要打包成jar包放到Nimbus中運行,打包的時候,不需要把依賴的jar都打迚去,否則如果把依賴的 storm.jar包打進去的話,運行時會出現重復的配置文件錯誤導致Topology無法運行。因為Topology運行之前,會加載本地的 storm.yaml 配置文件。 

    運行的命令如下: storm jar StormTopology.jar mainclass [args] 

storm守護進程的命令

    Nimbus: storm nimbus 啟動nimbus守護進程 
    Supervisor: storm supervisor 啟動supervisor守護迚程 
    UI:storm ui 這將啟動stormUI的守護進程,為監測storm集群提供一個基於web的用戶界面。 
    DRPC: storm drpc 啟動DRPC的守護進程 

storm管理命令

    JAR:storm jar topology_jar topology_class [arguments...] 
    jar命令是用於提交一個集群拓撲.它運行指定參數的topology_class中的main()方法,上傳topology_jar到nimbus, 由nimbus發布到集群中。一旦提交,storm將激活拓撲並開始處理topology_class 中的main()方法,main()方法負責調用StormSubmitter.submitTopology()方法,並提供一個唯一的拓撲(集群)的 名。如果一個擁有該名稱的拓撲已經存在於集群中,jar命令將會失敗。常見的做法是在使用命令行參數來指定拓撲名稱,以便拓撲在提交的時候被命名。 

    KILL:storm kill topology_name [-w wait_time] 
    殺死一個拓撲,可以使用kill命令。它會以一種安全的方式銷毀一個拓撲,首先停用拓撲,在等待拓撲消息的時間段內允許拓撲完成當前的數據流。執行 kill命令時可以通過-w [等待秒數]指定拓撲停用以后的等待時間。也可以在Storm UI 界面上實現同樣的功能 

    Deactivate:storm deactivate topology_name 
    停用拓撲時,所有已分發的元組都會得到處理,spouts的nextTuple方法將不會被調用。也可以在Storm UI 界面上實現同樣的功能 

    Activate:storm activate topology_name 
    啟動一個停用的拓撲。也可以在Storm UI 界面上實現同樣的功能 

    Rebalance:storm rebalance topology_name [-w wait_time] [-n worker_count] [-e component_name=executer_count]... 
    rebalance使你重新分配集群任務。這是個很強大的命令。比如,你向一個運行中的集群增加了節點。rebalance命令將會停用拓撲,然后在相應超時時間之后重分配worker,並重啟拓撲 
例:storm rebalance wordcount-topology -w 15 -n 5 -e sentence-spout=4 -e split-bolt=8 

    還有其他管理命令,如:Remoteconfvalue、REPL、Classpath等 

新建storm項目注意事項

    為了開發storm項目,你的classpath里面需要有storm的jar包。最推薦的方式是使用Maven,不使用maven的話你可以手動把storm發行版里面的所有的jar包添加到classpath 

    storm-starter項目使用Leiningen作為build和依賴管理工具,你可以下載這個腳本 (https://raw.githubusercontent.com/technomancy/leiningen/stable/bin /lein)來安裝Leiningen, 把它加入到你的PATH, 使它可執行。要拉取storm的所有依賴包,簡單地在項目的根目錄執行 lein deps 就可以了 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM