Storm核心基礎


 一、Stream:被處理的數據

 二、Spout:數據源

  消息源Spout是Storm的Topology中的消息生產者(Tuple的創造者)。如圖幾個Spout接口都繼承自IComponent

  

  Spout從外部獲取數據后,向Topology發出的Tuple可以是可靠的,也可以是不可靠的

  可靠的:一個可靠的消息可以重新發射一個Tuple(如果該Tuple沒有被Storm成功處理)

  不可靠的:一個不可靠的消息源Spout一旦發出,一個Tuple就會徹底遺忘,不會在重新發了

  Spout可以發射多個Stream,使用OutputFieldsDeclarer.declareStream來定義多個流,然后使用SpoutOutputCollector來發射指定的流

  Spout中幾個重要的方法:

  1、open方法當一個Task被初始化時會調用此open方法,一般都會在此方法中初始化發送Tuple的對象SpoutOutputCollector和配置對象TopologyContext  

 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

  2、declareOutputFields方法:聲明當前Spout的Tuple發送流。  

public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));//告訴組件發出數據流包含sentence字段
    }

  3、nextTuple方法:發射一個Tuple到Topology都是通過該方法。

public void nextTuple() {
         this.collector.emit(new Values("123"));
    }

 

三、Bolt:處理數據

  Bolt是接收Spout發出元組Tuple后處理數據的組件,所有的消息處理邏輯被封裝在Bolt中,Bolt負責處理輸入的數據流並產生輸出的新數據流;Bolt把元組Tuple作為輸入,之后處理產生新的Tuple;

  1、客戶機創建Bolt,然后將其序列化為拓撲,並提交給集群的主機

  2、集群啟動worker進程,反序列化Bolt,調用prepare方法開始處理元組

  3、Bolt處理元組,Bolt處理一個輸入Tuple,發射0個或多個元組,然后調用ack通知Storm自己已經處理過這個Tuple了。Strom提供一個IBasicBolt自動調用ack。

  

  在創建Bolt對象時,通過構造方法,初始化成員變量,當Bolt被提交到集群時,這些成員變量也會被序列化,所以通過反序列化可以取到這些成員變量

  IBolt繼承了Serializable,在創建Bolt在序列化之后被發送到具體執行的Worker上,worker在執行Bolt時候先執行perpare方法傳入當前執行的上下文,然后調用execute方法,對Tuple進行處理,並用prepare傳入的OutPutCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果

  IBasic接口在執行execute方法時,自動調用ack方法,其目的就是實現該Bolt時,不用在代碼中提供反饋結果,Storm內部會自動反饋成功

  

 

 

 幾個重要的方法:

1.prepare方法:preparre方法為Bolt提供了OutputCollector,用來從Bolt中發送Tuple,在Bolt中載入新的線程異步處理,OutputCollector是線程安全的。Bolt中prepare、execute、cleanup等方法中進行

 2.declareOutPutFields方法:聲明當前Bolt發送的Tuple中包含的字段;Bolt可以發射多條消息流,使用OutputFieldsDeclarer.declareStream方法來定義流,之后使用OutputCollector.emit來選擇要發射的流

3、getComponentConfiguration方法:當系統需要每隔一段時間執行特定的處理時,就可以用它

4、execute方法::以一個Tuple作為輸入,Bolt使用OutPutColector來發射Tuple,Bolt必須為他處理的每一個Tuple調用ack方法,以通知Storm該Tuple處理完畢了,從而通知該Tuple的發射者Spout

  1) emit有一個參數:該參數是發送到下游Bolt的Tuple,此時由上游發來的舊Bolt就此隔斷,新的Tuple和舊的Tuple不在屬於同一顆Tuple數。新的Tuple另起一顆新的Tuple樹

  2)emit有兩個參數:第一個參數是舊的Tuple的輸入流,第二個參數是新的往下游Bolt下發的Tuple流。此時新的Tuple和舊的Tuple還屬於同一顆Tuple樹,即如果下游的Bolt處理失敗,則向上傳遞到當前Bolt,當前Bolt根據舊的Tuple繼續往上游傳遞,申請重發失敗的Tuple,保證Tuple處理的可靠性

 

四、Tuple:數據單元

  Tuple是Strom的主要數據結構,並且是Storm中使用的最基本單元、數據模型和元組;Tuple是一個值列表,Tuple中的值可以是任何類型的,動態類型的Tuple的fields可以不用聲明;默認情況下,Storm中的Tuple支持私有類型,字符串,字節數組等作為它的字段值,如果使用其他類型,就需要序列化該類型。

  Tuple的默認類型:integer、float、double、long、short、string、byte、binary(byte[])。Tuple可以理解為鍵值對,其中鍵就是定義在declareOutputFields方法中的Fields對象,值就是在emit中發送的Values對象

  Tuple聲明周期:

  1、Storm調用Spout的nextTuple方法來獲取下一個Tuple

  2、Spout通過Open方法的參數提供的SpoutOutputCollector將新的Tuple發射到其中一個輸出消息流(發射Tuple時,Spout提供一個message-id,通過這個ID來追蹤該Tuple)

  3、Storm跟蹤該Tuple的樹形結構是否成功創建,並根據message-id調用Spout中的ack函數,已確認Tuple是否被完全處理。

  4、如果Tuple超時,則調用Spout的fail方法

  5、在任務完成后,Spout調用Cloes方法結束Tuple的使命

public interface ISpout extends Serializable {
    void open(Map var1, TopologyContext var2, SpoutOutputCollector var3);

    void close();

    void activate();

    void deactivate();

    void nextTuple();

    void ack(Object var1);

    void fail(Object var1);
}

 

五、Task:運行Spout和Bolt中的線程

  同一個Spout/Bolt的Task可能會共享一個物理線程,該線程稱為Executor。實際的數據處理由Task來完成,Topology的生命周期中,Task數量不會變化,而Executor數量卻不一定,在一般情況下,線程數小於等於Task數量。默認Task的數量等於Executor線程數量,即一個Executor線程只運行一個Task,Executor線程在執行期間會調用該Task的nextTuple或Executor

Worker:是運行這些線程的進程

  一個worker進程一直一個Topology子集,他會啟動一個或多個Executor線程來執行一個Topology的組件。因此在執行拓撲時,可能跨越一個或多個Worker,Storm會盡量均勻分配任務給所有的worker,不會出現一個Worker為多個Topology服務的情況

六、Stream Grouping:規定了Bolt接收何種類型數據作為輸入

  Storm包括6種流分組類型:

  1、隨機分組(Shuffer Grouping):隨機分發元組到Bolt的任務,保證每個任務獲得相等數量的元組

  2、字段分組(Fields Grouping):根據指定字段分割數據流並分組

  3、全部分組(ALL Grouping):對於每一個Tuple來說,所有Bolt都會收到,所有Tuple被復制到Bolt的所有任務上

  4、全局分組(Global Grouping):全部的流都分配到Bolt的同一任務,就是分配給ID,最小的Task

  5、無分組(NO Grouping):不分組的含義是,流不關心到底誰會收到它的Tuple,目前無分組等效於隨機分組,不同的是Storm把無分組的Bolt放到訂閱Bolt或Spout的同一線程中執行(在可能實現的前提下)

  6、直接分組(Direct Grouping):元組生產者決定元組由那個元組消費者接受

七、Topology:是由Straming Grouping連接起來的Spout和Bolt節點網絡

  1、本地模式:

Config conifg = new Config();
config.setDebug(
true); config.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test",config,builder.creatTopology());
Utils.sleep(
1000); cluster.killTopology("test"); cluster.shutdown();

  submitTopology有三個參數:要允許Topology的名稱,一個配置對象,以及要運行的Topology的本身

  Topoogy是以名稱來唯一區別的,可以用這個名稱殺掉該Topology,而且必須顯示的殺掉,否則他會一直運行

  幾個比較重要的配置: 

   config.setNumWorkers(2):定義希望集群分配多少個工作進程來執行這個Topology,Topology中的每個組件都需要線程來執行。每個組件到底用多少個線程是通過setBolt和setSpout來指定的

  config.setDebug(true):Storm會記錄下每個組件發射的每條信息

  Topology方法調用流程:

  1.每個組件(Spout或Bolt)的構造方法和declareOutputFields方法都只被調用一次

  2.open和prepare方法被調用多次,在入口函數中設定的setSpout或setBolt中的並行度參數指Execute的數量,是負責運行組件中Task的數量,此數量是多少,上述兩個方法就會被調用多少次,在每個Execute運行時調用一次

  3.nextTuple方法和execute方法是一直運行的,nextTuple方法不斷發射Tuple,Bolt的execute不斷接受Tuple進行處理。只有這樣不斷進行,才會產生無界的Tupe流。

  4.提交一個Topology之后,Storm創建Spout/Bolt實例並進行序列化,之后將序列化的組件發送給所有任務所在的節點,在每一個任務上反序列化組件

  5.Spout和Bolt之間,Bolt和Bolt之間的通信,通過ZeroMQ的消息隊列實現

  6.在一個Tuple處理成功之后,需要調用ack方法來標記成功,否則調用fail方法標記失敗,重新處理該Tuple

  

   Topology中幾個比較重要的並行度相關概念

  1.Worker(工作進程):每個worker都屬於一個特定的Topology,每個Supervisor節點的Worker可以有多個,每個Worker使用一個額單獨的端口,Worker對Topology中的每個組件運行一個或者多個Executor線程來提供Task的執行服務

  2.Executor

  Executor是產生於Worker進程內部的線程,會執行同一個組件的一個或多個Task

  3.Task

  實際的數據處理由Task完成,在Topology的聲明周期中,每個組件的Task數量不會變化,而Executor的數量卻不一定,Executor數量小於等於Task數量,在默認情況下,二者是相等的

  worker、executor、task設置

  1、Worker設置:可以設置yaml中Topology.workers屬性,在代碼中通過Config的setNumWorker方法設定

  2、Executor設置:通過Topology入口類中的setBolt、setSpout方法的最后一個參數指定,如果不指定,則使用默認值為1

  3、Task設置:在默認情況下和Executor數量一致,在代碼中通過TopologyBuilder的setNumTasks方法設定具體某個組件的Task數量

 

  Storm集群中的一個物理節點啟動一個或多個worker進程,集群的topology都是通過這些進程運行的,然而,worker進程中又會運行一個或多個Executor線程,每個Executor線程只會運行一個Topology的一個組件(spout或bolt)的Task任務,task又是數據處理的實體單元。worker是進程,Executor對應於線程,Spout或Bolt是一個個Task;同一個Worker只執行同一個Topology相關的Task;在同一個Executor中可以執行多個同類型的Task,即在同一個Executor中,要么全部是Bolt類的Task,要么全部是Spout的Task;在運行時,Spout和Bolt需要包裝成一個又一個Task

  

 

  

 

  

  

  

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM