storm中的一些概念


1.topology

  一個topolgy是spouts和bolts組成的圖,通過stream groupings將圖中的spout和bolts連接起來:如圖所示:

  

  一個topology會一直運行知道你手動kill掉,Storm自動重新分配執行失敗的任務,並且Storm可以保證你不會有數據丟失(如果開啟了高可靠性的話)。如果一些機器意外停機它上面的所有任務會被轉移到其他機器上;

  運行一個toplogy很簡單,首先,把你所有的代碼以及所依賴的jar打進一個jar中。然后運行類似下面的命令:

  storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

  這個命令會運行主類:backtype.storm.MyTopology,參數是arg1,arg2。這個類的main函數定義這個topology並且把它提交給Nimbus。storm jar負責連接到Nimbus並且上傳jar包;

  Topology的定義是一個Thrift結構,並且Nimbus就是一個Thrift服務,你可以提交任何語言創建的topology。上面的方法就是用JVM-based語言提交的最簡單的方法。

  如下代碼即定義了一個topology:

  

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout",new RandomSentceSpout(),5);

builder.setBolt("split",new SplitSentence(),8).shuffleGrouping("spout")

builder.setBolt("count", new WordCount(),12).fieldsGrouping("spilt",new Fields("word"));

 

 

2、Streams

  小溪流stream是storm里的關鍵抽象。一個消息流是一個沒有邊界的tuple序列,而這些tuple序列會以一種分布式的方式並行的創建和處理。再默認的情況寫,tuplr的字段類型可以是:integer,long,short,byte,string,double,boolean和byte array。也可以自定義類型(只要實現相應的序列化器)。

public void declareOutputFields(OutputFieldsDeclarer declarer){
        //默認ID的信息流定義
        declarer.declare(new Fields("word","cout"));
        //自定義ID的消息流
        declare.declareStream("streamId",new Fields("word","count"));
}        

   每個消息流再定義的時候會被分配給一個id,因為單向消息流使用的相當普遍,OutputFieldsDeclarer定義了一些方法讓你可以定義一個stream而不用制定這個id。在這種情況下這個stream會分配個值為‘default’默認的id。

  Storm提供的最近本的處理stream的原語是spout和bolt。你可以實現spout和bolt提供的接口來處理你的業務邏輯。

3、Soupts

  消息源spout是Storm里面一個topology里面的消息生產者。一般來說消息源會從一個外部源讀取數據並且向topology里面發送出消息:tuple。Spout可以是可靠地也可以是不可靠的。如果這個tuple沒有被storm成功處理,可靠地消息源spouts可以重新發射一個tuple,但是不可靠的消息源spouts一旦發出一個tuple就不能重發了。

  消息源可以發射多條消息流stream。使用OutputFieldsDeclarer。declareStream來定義多個stream,然后使用SpoutOutputCollector來發射指定的stream。

/**
*定義了2個消息流
*/
public void declareOutputFields (OutputFieldsDeclarer declarer){
      declarer.declareStream("streamId1",new Fields("words"));
       declarer.declareStream("streamId2",new Fields("word2"));   
}
/**
*根據消息流發射相應的消息
*/
public void nextTuple(){
     collector.emit("streamId1",new Values("streamid1's word1"));
     collector.emit("streamId2".new Values("streamid2's word2"));       
}

   Spout類里面最重要的方法是nextTuple。要么發射一個新的tuple到topology里面或者簡單的返回如果已經沒有新的tuple。要注意的是nextTuple方法不能阻塞,因為storm在同一個線程上面調用所有消息源spout的方法。

  另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調用ack,否則調用fail。storm只對可靠的spout調用ack和fail。

4、Bolts

  所有的消息處理邏輯被封裝在bolts里面。Bolts可以做很多事情:過濾,聚合,查詢數據庫等等;

  Bolts可以簡單地做消息流的傳遞。復雜的消息流處理往往需要很多步驟,從而就需要經過很多bolts。

  Bolts和spouts一樣,可以發射多條消息流,使用OutputFieldsDeclarer。declareStream定義stream,使用OutputCollector.emit來選擇要發射的stream。

  Bolts的主要方法是execute,他以一個tuple作為輸入,使用OutputCollector來發射tuple,bolts必須要為它處理的每一個tuple調用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而統治這個tuple的發射這spouts.一般的流程是:bolts處理一個輸入tuplr,發射0個或者多個tuple,然后調用ack通知storm自己已經處理過這個tuple了。storm提供了一個IBasicBolt會自動調用ack。

5、Stream Grouping

  定義一個 toplogy的其中一步是定義每個bolt接受什么樣的流作為輸入。stream grouping就是用來定義一個stream應該如果分配數據個ibolts上面的多個tasks。

  Storm里面有7種類型的stream grouping:

  --Shuffle Grouping:隨機分組,隨機派發stream里面的tuple,保證每個bolt接受到的tuple數目大致相同;

  --Fields Grouping:按字段分組,比如按userid來分組,具有同樣的userid的tuple會被分到相同的Bolts里的一個task,而不同的userid則會被分配到不同的bolts里面的task。

  --All Grouping:廣播發送,對於每一個tuple,所有的bolts都會受到。

  --Global Grouping:全局分組,這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。

  --Non Grouping:不分組,這個分組的意思是說stream不關心到底誰會受到它的tuple。目前這種分組和Shuffle grouping是一樣的效果,有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程里面去執行。

  --Direct Grouping:直接分組,這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接受者的哪個task處理這個消息。只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理着可以通過TopologyContext來獲取處理它的消息的task的id(OutputCollector。emit方法也會返回tsk的id)。

  --Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發送給這些tasks。否則,和普通的Shuffle Grouping行為一致。

代碼示例:

TopologyBuisder builder = new TopologyBuilder();

builder.setSpout("spout",new RandomSentenceSpout(),5);
builder.setBolt("split",new SplitSentence(),8).shuffleGrouping("spout");
builder.setBolt("count",new WordCount(),12).fieldsGrouping("spilt",new Fields("word")):

 

 6.Reliability

  storm保證每個tuple會被topology完整的執行。Storm會追蹤有每個spout tuple所含生的tuple樹(一個bolt處理一個tuple之后可能會發射別的tuple從而形成樹狀結構),並且跟蹤這顆tuple樹什么時候成功處理完。每個topology都有一個消息超時的設置,如果storm在這個超市的時間內檢測不到某個tuple樹到底有沒有執行成功,那么topology會把這個tuple標記為執行失敗,並且過一會重新發射這個tuple。

  為了利用Strom的可靠性特性,在你發出一個新的tuple以及你完成處理一個tuple的時候你必須要通知storm。這一切是由OutputCollector來完成的。通過emit方法來通知一個新的tuple產生了,通過ack方法通知一個tuple處理完成了。

7、Tasks

  每一個spout和bolt會被當做很多task在整個集群里執行。每一個executor對應到一個線程,在這個線程上運行多個task,而stream grouping則是定義怎么從一堆task發射tuple到另外一堆task。你可以調用TopologyBuilder類里的setSpout和setBolt來設置並調度(也就是有多少個task)。

代碼示例如下:

ToplogyBuilder builder = new TopologyBuilder();

builder.setSpout("spout",new RandomSentenceSpout(),5).setNumTasks(10);
builder.setBolt("spilt",new SplitSentence(),8).shuffleGrouping("spout").setNumTasks(8);
builder.setBolt("count",new WoedCount(),12).fieldsGrouping("split",new Fields("woed")).setNumTasks(24);

表示“spout”的線程數為5,任務數為10,即一個線程運行兩個任務;

“split”則為一個線程運行一個任務,和默認的一致。

8.Workers

  一個topology可能會在一個或者多個worker(工作進程)里面執行,每個worker是一個屋里JVM並執行整個topology的一部分。比如,對於並行度是300的topology來說,如果我們使用50個工作進程來執行,那么每個工作進程會開啟6個線程,默認默認每個線程處理一個tasks。Storm會盡量均勻的工作分配給所有的worker。每個supervisor上運行着若干個worker進程(根據配置文件supervisor.slots.ports進行配置)。

9.Configuration

  Storm里面有一堆參數可以配置來調整Nimbus,Supervisor以及正在運行的topology的行為,一些配置是系統級別的,一些配置是topology級別的。default。yaml里面有所有的默認配置。你可以通過定義一個storm.yaml在你的classpath里面來覆蓋這些默認的配置並且你也可以在代碼里面設置一些topology相關的配置信息(使用StromSubmitter)。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM