一、Stream:被處理的數據
二、Spout:數據源
消息源Spout是Storm的Topology中的消息生產者(Tuple的創造者)。如圖幾個Spout接口都繼承自IComponent
Spout從外部獲取數據后,向Topology發出的Tuple可以是可靠的,也可以是不可靠的
可靠的:一個可靠的消息可以重新發射一個Tuple(如果該Tuple沒有被Storm成功處理)
不可靠的:一個不可靠的消息源Spout一旦發出,一個Tuple就會徹底遺忘,不會在重新發了
Spout可以發射多個Stream,使用OutputFieldsDeclarer.declareStream來定義多個流,然后使用SpoutOutputCollector來發射指定的流
Spout中幾個重要的方法:
1、open方法:當一個Task被初始化時會調用此open方法,一般都會在此方法中初始化發送Tuple的對象SpoutOutputCollector和配置對象TopologyContext
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; }
2、declareOutputFields方法:聲明當前Spout的Tuple發送流。
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence"));//告訴組件發出數據流包含sentence字段 }
3、nextTuple方法:發射一個Tuple到Topology都是通過該方法。
public void nextTuple() { this.collector.emit(new Values("123")); }
三、Bolt:處理數據
Bolt是接收Spout發出元組Tuple后處理數據的組件,所有的消息處理邏輯被封裝在Bolt中,Bolt負責處理輸入的數據流並產生輸出的新數據流;Bolt把元組Tuple作為輸入,之后處理產生新的Tuple;
1、客戶機創建Bolt,然后將其序列化為拓撲,並提交給集群的主機
2、集群啟動worker進程,反序列化Bolt,調用prepare方法開始處理元組
3、Bolt處理元組,Bolt處理一個輸入Tuple,發射0個或多個元組,然后調用ack通知Storm自己已經處理過這個Tuple了。Strom提供一個IBasicBolt自動調用ack。
在創建Bolt對象時,通過構造方法,初始化成員變量,當Bolt被提交到集群時,這些成員變量也會被序列化,所以通過反序列化可以取到這些成員變量
IBolt繼承了Serializable,在創建Bolt在序列化之后被發送到具體執行的Worker上,worker在執行Bolt時候先執行perpare方法傳入當前執行的上下文,然后調用execute方法,對Tuple進行處理,並用prepare傳入的OutPutCollector的ack方法(表示成功)或fail(表示失敗)來反饋處理結果
IBasic接口在執行execute方法時,自動調用ack方法,其目的就是實現該Bolt時,不用在代碼中提供反饋結果,Storm內部會自動反饋成功
幾個重要的方法:
1.prepare方法:preparre方法為Bolt提供了OutputCollector,用來從Bolt中發送Tuple,在Bolt中載入新的線程異步處理,OutputCollector是線程安全的。Bolt中prepare、execute、cleanup等方法中進行
2.declareOutPutFields方法:聲明當前Bolt發送的Tuple中包含的字段;Bolt可以發射多條消息流,使用OutputFieldsDeclarer.declareStream方法來定義流,之后使用OutputCollector.emit來選擇要發射的流
3、getComponentConfiguration方法:當系統需要每隔一段時間執行特定的處理時,就可以用它
4、execute方法::以一個Tuple作為輸入,Bolt使用OutPutColector來發射Tuple,Bolt必須為他處理的每一個Tuple調用ack方法,以通知Storm該Tuple處理完畢了,從而通知該Tuple的發射者Spout
1) emit有一個參數:該參數是發送到下游Bolt的Tuple,此時由上游發來的舊Bolt就此隔斷,新的Tuple和舊的Tuple不在屬於同一顆Tuple數。新的Tuple另起一顆新的Tuple樹
2)emit有兩個參數:第一個參數是舊的Tuple的輸入流,第二個參數是新的往下游Bolt下發的Tuple流。此時新的Tuple和舊的Tuple還屬於同一顆Tuple樹,即如果下游的Bolt處理失敗,則向上傳遞到當前Bolt,當前Bolt根據舊的Tuple繼續往上游傳遞,申請重發失敗的Tuple,保證Tuple處理的可靠性
四、Tuple:數據單元
Tuple是Strom的主要數據結構,並且是Storm中使用的最基本單元、數據模型和元組;Tuple是一個值列表,Tuple中的值可以是任何類型的,動態類型的Tuple的fields可以不用聲明;默認情況下,Storm中的Tuple支持私有類型,字符串,字節數組等作為它的字段值,如果使用其他類型,就需要序列化該類型。
Tuple的默認類型:integer、float、double、long、short、string、byte、binary(byte[])。Tuple可以理解為鍵值對,其中鍵就是定義在declareOutputFields方法中的Fields對象,值就是在emit中發送的Values對象
Tuple聲明周期:
1、Storm調用Spout的nextTuple方法來獲取下一個Tuple
2、Spout通過Open方法的參數提供的SpoutOutputCollector將新的Tuple發射到其中一個輸出消息流(發射Tuple時,Spout提供一個message-id,通過這個ID來追蹤該Tuple)
3、Storm跟蹤該Tuple的樹形結構是否成功創建,並根據message-id調用Spout中的ack函數,已確認Tuple是否被完全處理。
4、如果Tuple超時,則調用Spout的fail方法
5、在任務完成后,Spout調用Cloes方法結束Tuple的使命
public interface ISpout extends Serializable { void open(Map var1, TopologyContext var2, SpoutOutputCollector var3); void close(); void activate(); void deactivate(); void nextTuple(); void ack(Object var1); void fail(Object var1); }
五、Task:運行Spout和Bolt中的線程
同一個Spout/Bolt的Task可能會共享一個物理線程,該線程稱為Executor。實際的數據處理由Task來完成,Topology的生命周期中,Task數量不會變化,而Executor數量卻不一定,在一般情況下,線程數小於等於Task數量。默認Task的數量等於Executor線程數量,即一個Executor線程只運行一個Task,Executor線程在執行期間會調用該Task的nextTuple或Executor
Worker:是運行這些線程的進程
一個worker進程一直一個Topology子集,他會啟動一個或多個Executor線程來執行一個Topology的組件。因此在執行拓撲時,可能跨越一個或多個Worker,Storm會盡量均勻分配任務給所有的worker,不會出現一個Worker為多個Topology服務的情況
六、Stream Grouping:規定了Bolt接收何種類型數據作為輸入
Storm包括6種流分組類型:
1、隨機分組(Shuffer Grouping):隨機分發元組到Bolt的任務,保證每個任務獲得相等數量的元組
2、字段分組(Fields Grouping):根據指定字段分割數據流並分組
3、全部分組(ALL Grouping):對於每一個Tuple來說,所有Bolt都會收到,所有Tuple被復制到Bolt的所有任務上
4、全局分組(Global Grouping):全部的流都分配到Bolt的同一任務,就是分配給ID,最小的Task
5、無分組(NO Grouping):不分組的含義是,流不關心到底誰會收到它的Tuple,目前無分組等效於隨機分組,不同的是Storm把無分組的Bolt放到訂閱Bolt或Spout的同一線程中執行(在可能實現的前提下)
6、直接分組(Direct Grouping):元組生產者決定元組由那個元組消費者接受
七、Topology:是由Straming Grouping連接起來的Spout和Bolt節點網絡
1、本地模式:
Config conifg = new Config();
config.setDebug(true); config.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test",config,builder.creatTopology());
Utils.sleep(1000); cluster.killTopology("test"); cluster.shutdown();
submitTopology有三個參數:要允許Topology的名稱,一個配置對象,以及要運行的Topology的本身
Topoogy是以名稱來唯一區別的,可以用這個名稱殺掉該Topology,而且必須顯示的殺掉,否則他會一直運行
幾個比較重要的配置:
config.setNumWorkers(2):定義希望集群分配多少個工作進程來執行這個Topology,Topology中的每個組件都需要線程來執行。每個組件到底用多少個線程是通過setBolt和setSpout來指定的
config.setDebug(true):Storm會記錄下每個組件發射的每條信息
Topology方法調用流程:
1.每個組件(Spout或Bolt)的構造方法和declareOutputFields方法都只被調用一次
2.open和prepare方法被調用多次,在入口函數中設定的setSpout或setBolt中的並行度參數指Execute的數量,是負責運行組件中Task的數量,此數量是多少,上述兩個方法就會被調用多少次,在每個Execute運行時調用一次
3.nextTuple方法和execute方法是一直運行的,nextTuple方法不斷發射Tuple,Bolt的execute不斷接受Tuple進行處理。只有這樣不斷進行,才會產生無界的Tupe流。
4.提交一個Topology之后,Storm創建Spout/Bolt實例並進行序列化,之后將序列化的組件發送給所有任務所在的節點,在每一個任務上反序列化組件
5.Spout和Bolt之間,Bolt和Bolt之間的通信,通過ZeroMQ的消息隊列實現
6.在一個Tuple處理成功之后,需要調用ack方法來標記成功,否則調用fail方法標記失敗,重新處理該Tuple
Topology中幾個比較重要的並行度相關概念
1.Worker(工作進程):每個worker都屬於一個特定的Topology,每個Supervisor節點的Worker可以有多個,每個Worker使用一個額單獨的端口,Worker對Topology中的每個組件運行一個或者多個Executor線程來提供Task的執行服務
2.Executor
Executor是產生於Worker進程內部的線程,會執行同一個組件的一個或多個Task
3.Task
實際的數據處理由Task完成,在Topology的聲明周期中,每個組件的Task數量不會變化,而Executor的數量卻不一定,Executor數量小於等於Task數量,在默認情況下,二者是相等的
worker、executor、task設置
1、Worker設置:可以設置yaml中Topology.workers屬性,在代碼中通過Config的setNumWorker方法設定
2、Executor設置:通過Topology入口類中的setBolt、setSpout方法的最后一個參數指定,如果不指定,則使用默認值為1
3、Task設置:在默認情況下和Executor數量一致,在代碼中通過TopologyBuilder的setNumTasks方法設定具體某個組件的Task數量
Storm集群中的一個物理節點啟動一個或多個worker進程,集群的topology都是通過這些進程運行的,然而,worker進程中又會運行一個或多個Executor線程,每個Executor線程只會運行一個Topology的一個組件(spout或bolt)的Task任務,task又是數據處理的實體單元。worker是進程,Executor對應於線程,Spout或Bolt是一個個Task;同一個Worker只執行同一個Topology相關的Task;在同一個Executor中可以執行多個同類型的Task,即在同一個Executor中,要么全部是Bolt類的Task,要么全部是Spout的Task;在運行時,Spout和Bolt需要包裝成一個又一個Task