Storm概念講解,工作原理


一、storm是一個用於實時流式計算的分布式計算引擎,彌補了Hadoop在實時計算方面的不足(Hadoop在本質上是一個批處理系統)。

 

二、storm在實際應用場景中的位置一般如下:

其中的編號1~5說明如下:

1、Flume用於收集日志信息;

2、結合數據傳輸功能可以把收集到的日志信息實時傳輸到kafka集群,或保存到Hadoop hdfs中保存。

這里之所以選擇kafka集群是因為kafka集群具備緩沖功能,可以防止數據采集速度和數據處理速度不匹配導致數據丟失,這樣做可以提高可靠性。

3、使用storm實時處理數據;

4、保存storm處理的結果數據,當數據量不是特別巨大時,可以使用MySQL存儲;當數據量特別巨大時,可以選擇hdfs存儲。

5、用於實時展示處理結果。

 

三、storm的抽象運行方式:

其中:

spout為數據流的源頭;

tuple為流動中的數據承載單元;

Bolt為數據流處理的中間狀態。

 

四、spout和Bolt如何形成程序運行?

storm中運行的程序稱為Topology,Topology將spout和bolt組裝在一起,完成實時計算的任務。具體操作是通過TopologyBuilder的setSpout方法和setBolt方法,例子如下:

 

[plain]  view plain  copy
 
  1. TopologyBuilder builder = new TopologyBuilder();  
  2.         builder.setSpout("spout-name", your-spout-program);  
  3.         builder.setBolt("bolt-name-one", your-bolt-program-one, thread-number)  
  4.                 .fieldsGrouping("spout-name", new Fields("field-key-name-one"));  
  5.         builder.setBolt("bolt-name-two", your-bolt-program-two).fieldsGrouping("bolt-name-one", new Fields("field-key-name-two"));  
  6.         Config conf = new Config();  
  7.         StormSubmitter.submitTopology("your-Topology-name", conf,builder.createTopology());  


五、如何決定數據流的流向:

 

(1)借助在TopologyBuilder的setSpout方法和setBolt方法的第一個參數中為Spout程序和Bolt程序取的名字,例如上面示例代碼中的“spout-name”以及“bolt-name-one”,“bolt-name-two”。

補充:setBolt方法原型:

 

[plain]  view plain  copy
 
  1. setBolt(String id, IBasicBolt bolt, Number parallelism_hint)  
  2. Define a new bolt in this topology.  

 

setSpout方法原型:

 

[plain]  view plain  copy
 
  1. setSpout(String id, IRichSpout spout, Number parallelism_hint)  
  2. Define a new spout in this topology with the specified parallelism.  

(2)setBolt方法返回的BoltDeclarer對象利用fieldGrouping方法並結合(1)中的spout和bolt名字指定數據流的流向。
補充:fieldGrouping方法原型:

 

 

[plain]  view plain  copy
 
  1. T fieldsGrouping(String componentId,  
  2.                  Fields fields)  
  3. The stream is partitioned by the fields specified in the grouping.  
  4. Parameters:  
  5. componentId -  
  6. fields -  
  7. Returns:  


六、數據流中的數據承載單元tuple結構是什么

 

官網文檔如下:

The tuple is the main data structure in Storm. A tuple is a named list of values, where each value can be any type. Tuples are dynamically typed -- the types of the fields do not need to be declared. Tuples have helper methods like getInteger and getString to get field values without having to cast the result. Storm needs to know how to serialize all the values in a tuple. By default, Storm knows how to serialize the primitive types, strings, and byte arrays. If you want to use another type, you'll need to implement and register a serializer for that type. See http://github.com/nathanmarz/storm/wiki/Serializationfor more info.

通俗的講,tuple就是一個值列表,其中的值類型可以是任何類型,默認類型有byte,integer,short,long,float,double,string,byte[]。

tuple數據結構如下:

其中,fieldName是定義在declareOutputFields方法中的Fields對象,fieldValue值是在emit方法中發送的Values對象。

tuple都是通過spout和bolt發射(傳送)的。

例如:

spout程序如下:

 

[plain]  view plain  copy
 
  1. public class ParallelFileSpout extends BaseRichSpout{  
  2.     @SuppressWarnings("rawtypes")  
  3.     public void open(Map conf, TopologyContext context,  
  4.             SpoutOutputCollector collector) {  
  5.     }  
  6.   
  7.     /**  
  8.      * called in SpoutTracker. called once, send a single tuple.  
  9.      */  
  10.     public void nextTuple() {  
  11.         //不斷獲取數據並發射  
  12.         collector.emit(new Values("your-sent-fieldValue"));  
  13.     }  
  14.   
  15.     /**  
  16.      * define field. used for grouping by field.  
  17.      */  
  18.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  19.         declarer.declare(new Fields("your-sent-fieldName"));  
  20.     }  
  21.   
  22. }  

 

 

 

 

 

 


bolt程序如下:

 

[plain]  view plain  copy
 
  1. public class DetectionBolt extends BaseBasicBolt {  
  2.     public void prepare(Map stormConf, TopologyContext context) {     
  3.     }  
  4.           
  5.     public void execute(Tuple input, BasicOutputCollector collector) {  
  6.         //不斷的處理數據后發射  
  7.         collector.emit(new Values(“your-sent-fieldValue”));  
  8.     }  
  9.   
  10.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  11.         declarer.declare(new Fields("your-sent-fieldName"));  
  12.     }  
  13. }  

 

七、spout如何發射無界的數據流,bolt如何處理接收到的數據tuple

(1)如在上一部分spout的示例代碼,其中必含有nextTuple方法,在spout程序生命周期中,nextTuple方法一直運行,所以可以一直獲取數據流中的數據並持續像bolt處理程序發射。

(2)如在上一部分bolt的示例代碼,其中必含有execute方法,在bolt程序生命周期中,只要其收到tuple數據就會處理,根據需要會把處理后的數據繼續發射出去。

 

八、如何保證所有發射的數據tuple都被正確處理

同一個tuple不管是處理成功還是失敗,都由創建它的Spout發射並維護。

 

九、storm和Hadoop中各角色對比


十、storm比Hadoop實時是因為Hadoop在把一批數據都處理完畢后才輸出處理結果,而storm是處理一點數據就實時輸出這些數據的處理結果。

 

 

 

Storm與傳統關系型數據庫 
    傳統關系型數據庫是先存后計算,而storm則是先算后存,甚至不存 
    傳統關系型數據庫很難部署實時計算,只能部署定時任務統計分析窗口數據 
    關系型數據庫重視事務,並發控制,相對來說Storm比較簡陋 
    Storm不Hadoop,Spark等是流行的大數據方案 

與Storm關系密切的語言:核心代碼用clojure書寫,實用程序用python開發,使用java開發拓撲 

topology

    Storm集群中有兩種節點,一種是控制節點(Nimbus節點),另一種是工作節點(Supervisor節點)。所有Topology任務的 提交必須在Storm客戶端節點上進行(需要配置 storm.yaml文件),由Nimbus節點分配給其他Supervisor節點進行處理。 Nimbus節點首先將提交的Topology進行分片,分成一個個的Task,並將Task和Supervisor相關的信息提交到 zookeeper集群上,Supervisor會去zookeeper集群上認領自己的Task,通知自己的Worker進程進行Task的處理。 

    和同樣是計算框架的MapReduce相比,MapReduce集群上運行的是Job,而Storm集群上運行的是Topology。但是Job在運行結束之后會自行結束,Topology卻只能被手動的kill掉,否則會一直運行下去 

    Storm不處理計算結果的保存,這是應用代碼需要負責的事情,如果數據不大,你可以簡單地保存在內存里,也可以每次都更新數據庫,也可以采用NoSQL存儲。這部分事情完全交給用戶。 

    數據存儲之后的展現,也是你需要自己處理的,storm UI 只提供對topology的監控和統計。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM