一:安裝storm
(一)安裝一個zookeeper集群
注意:需要先啟動zookeeper集群才可以,不然后面容易出錯
(二)上傳storm的安裝包,解壓
(三)修改配置文件storm.yaml
#所使用的zookeeper集群主機
storm.zookeeper.servers:
- "hadoopH5"
- "hadoopH6"
- "hadoopH7" #nimbus所在的主機名 nimbus.host: "hadoopH5"
可選配置:為worker進程配置端口號(端口數決定worker數)
supervisor.slots.ports
-6701
-6702
-6703
-6704
-6705
(四)啟動storm
1.啟動nimbus
nohup ./storm nimbus 1>/dev/null 2>&1 & 開啟nimbus
nohup ./storm ui 1>/dev/null 2>&1 & 開啟ui界面,通過web服務
2.啟動supervisor
nohup ./storm supervisor 1>/dev/null 2>&1 &
3.測試UI
二:storm程序編寫
(一)程序實現功能
(二)代碼實現
1.實現spout功能,進行源數據獲取
package cn.storm.tl;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class RandomWordSpout extends BaseRichSpout{ //用於存放SpoutOutputCollector變量,在open初始化時賦值 private SpoutOutputCollector collector; //數據模擬 String[] words = {"Iphone","XiaoMi","HUAWEI","Geli"}; //是spout組件核心邏輯 //不斷向下一個組件中發送tuple消息 @Override public void nextTuple() { //一般從Kafka消息隊列中獲取數據,這里我們直接從數組中隨機選取數據發送 Random random = new Random(); int index = random.nextInt(words.length); String word=words[index]; //將數據封裝為tuple,通過SpoutOutputCollector控制器實例對象,發送出去 collector.emit(new Values(word)); //values可以輸出元組列表 //間隔時間,休眠500ms Utils.sleep(500); } //進行初始化 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //進行初始化操作,collector用於發送數據 this.collector = collector; } //聲明輸出tuple元組數據的字段含義 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("phoneName")); //如果元組數據多個,可以使用list列表聲明 } }
2.實現大寫轉換bolt
package cn.storm.tl;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class UpperBolt extends BaseBasicBolt{ //處理業務邏輯 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //先獲取到上一個組件傳遞過來的數據,數據存放在tuple String phoneName = tuple.getString(0); //tuple中只存放了一個值,下標為0 //將數據轉為大寫 String phoneName_upper = phoneName.toUpperCase(); //將轉換完成的數據再次發送出去 collector.emit(new Values(phoneName_upper)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("phoneNameUpper")); } }
3.實現后綴添加,以及文件寫入
package cn.storm.tl;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import javax.management.RuntimeErrorException;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class SuffixBolt extends BaseBasicBolt{ FileWriter fw = null; //在bolt組件運行過程中,只會被調用一次,可以用於進行初始化操作 @Override public void prepare(Map stormConf, TopologyContext context) { try { fw = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID()); }catch(IOException e) { e.printStackTrace(); } } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //獲取數據,進行修改 String upper_name = tuple.getString(0); //進行修改 String suffix_phone = upper_name+"-2020"; //文件寫入 try { fw.write(suffix_phone); fw.write('\n'); fw.flush(); }catch(IOException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } }
4.實現整個topology,提交任務給storm集群
package cn.storm.tl;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
/*
* 組織各個組件形成一個完整的處理流程,就是所謂的topology
* 並且將該topology提交給storm進行運行(一直運行,無退出)
*/
public class TopoMain { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder = new TopologyBuilder(); //將spout組件添加到topology中 builder.setSpout("randomspout", new RandomWordSpout(),4); //並發度:啟動executor線程數4 //將大寫轉換bolt組件設置到topology中,並且指定它接收spout消息 builder.setBolt("upperbolt", new UpperBolt(),4).shuffleGrouping("randomspout"); //將添加后綴的bolt組件設置到topology,並指定它接收upperbolt組件的消息 builder.setBolt("suffixbolt", new SuffixBolt(),4).shuffleGrouping("upperbolt"); //用builder創建一個topology StormTopology topology = builder.createTopology(); //配置topology在集群運行時的參數 Config conf = new Config(); conf.setNumWorkers(4); //設置拓撲worker進程數 conf.setDebug(true); conf.setNumAckers(0); //設置事務ack機制,類似於TCP機制 //將這個topology提交給storm集群運行 StormSubmitter.submitTopology("demotopo", conf, topology); } }
三:結果測試
(一)storm啟動jar包
storm jar demotopo.jar cn.storm.tl.TopoMain
(二)查看supervisor節點
文件寫入:使用tail -f查看動態文件數據
四:Storm體系結構
(一)Storm中的Nimbus和Supervisor
1.Nimbus和Supervisor之間的所有協調工作都是通過Zookeeper集群完成。
2.Nimbus進程和Supervisor進程都是快速失敗(fail-fast)和無狀態的。所有的狀態要么在zookeeper里面, 要么在本地磁盤上。 3.這也就意味着你可以用kill -9來殺死Nimbus和Supervisor進程, 然后再重啟它們,就好像什么都沒有發生過。這個設計使得Storm異常的穩定。
(二)Storm中的Topologies
一個topology是spouts和bolts組成的圖, 通過stream groupings將圖中的spouts和bolts連接起來,如下圖:
(三)Storm中的Stream
消息流stream是storm里的關鍵抽象;
一個消息流是一個沒有邊界的tuple序列(消息流中包含無限的tuple), 而這些tuple序列會以一種分布式的方式並行地創建和處理;
通過對stream中tuple序列中每個字段命名來定義stream;
在默認的情況下,tuple的字段類型可以是:integer,long,short, byte,string,double,float,boolean和byte array; 可以自定義類型(只要實現相應的序列化器)。
(四)Storm中的Spouts
消息源spout是Storm里面一個topology里面的消息生產者;
一般來說消息源會從一個外部源讀取數據並且向topology里面發出消息:tuple;
Spouts可以是可靠的也可以是不可靠的:如果這個tuple沒有被storm成功處理,可靠的消息源spouts可以重新發射一個tuple, 但是不可靠的消息源spouts一旦發出一個tuple就不能重發了;
消息源可以發射多條消息流stream:
使用OutputFieldsDeclarer.declareStream來定義多個stream,
然后使用SpoutOutputCollector來發射指定的stream。
(五)Storm中的Bolts
所有的消息處理邏輯被封裝在bolts里面;
Bolts可以做很多事情:過濾,聚合,查詢數據庫等等。
Bolts可以簡單的做消息流的傳遞,也可以通過多級Bolts的組合來完成復雜的消息流處理;比如求TopN、聚合操作等(如果要把這個過程做得更具有擴展性那么可能需要更多的步驟)。
Bolts可以發射多條消息流:
使用OutputFieldsDeclarer.declareStream定義stream;
使用OutputCollector.emit來選擇要發射的stream;
Bolts的主要方法是execute,:
它以一個tuple作為輸入,使用OutputCollector來發射tuple;
通過調用OutputCollector的ack方法,以通知這個tuple的發射者spout;
Bolts一般的流程:
處理一個輸入tuple, 發射0個或者多個tuple, 然后調用ack通知storm自己已經處理過這個tuple了;
storm提供了一個IBasicBolt會自動調用ack。
(六)Storm中的Stream groupings
定義一個topology的關鍵一步是定義每個bolt接收什么樣的流作為輸入;
stream grouping就是用來定義一個stream應該如何分配數據給bolts;
Storm里面有7種類型的stream grouping:
Shuffle Grouping——隨機分組, 隨機派發stream里面的tuple,保證每個bolt接收到的tuple數目大致相同;
Fields Grouping——按字段分組, 比如按userid來分組, 具有同樣userid的tuple會被分到相同的Bolts里的一個task, 而不同的userid則會被分配到不同的bolts里的task;
All Grouping——廣播發送,對於每一個tuple,所有的bolts都會收到;
Global Grouping——全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task;
Non Grouping——不分組,這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程里面去執行;
Direct Grouping——直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪個task處理這個消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。
消息處理者可以通過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id);
Local or shuffle grouping——如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。
(七)Storm中的Workers
一個topology可能會在一個或者多個worker(工作進程)里面執行;
每個worker是一個物理JVM並且執行整個topology的一部分;
比如,對於並行度是300的topology來說,如果我們使用50個工作進程來執行,那么每個工作進程會處理其中的6個tasks;
Storm會盡量均勻的工作分配給所有的worker;
(八)Storm中的Tasks
每一個spout和bolt會被當作很多task在整個集群里執行
每一個executor對應到一個線程,在這個線程上運行多個task
stream grouping則是定義怎么從一堆task發射tuple到另外一堆task
可以調用TopologyBuilder類的setSpout和setBolt來設置並行度(也就是有多少個task)
五:Topology運行機制
(一)運行機制
(1)Storm提交后,會把代碼首先存放到Nimbus節點的inbox目錄下,之后,會把當前Storm運行的配置生成一個stormconf.ser文件放到Nimbus節點的stormdist目錄中,在此目錄中同時還有序列化之后的Topology代碼文件;
(2)在設定Topology所關聯的Spouts和Bolts時,可以同時設置當前Spout和Bolt的executor數目和task數目,默認情況下,一個Topology的task的總和是和executor的總和一致的。之后,系統根據worker的數目,盡量平均的分配這些task的執行。worker在哪個supervisor節點上運行是由storm本身決定的;
(3)任務分配好之后,Nimbes節點會將任務的信息提交到zookeeper集群,同時在zookeeper集群中會有workerbeats節點,這里存儲了當前Topology的所有worker進程的心跳信息;
(4)Supervisor節點會不斷的輪詢zookeeper集群,在zookeeper的assignments節點中保存了所有Topology的任務分配信息、代碼存儲目錄、任務之間的關聯關系等,Supervisor通過輪詢此節點的內容,來領取自己的任務,啟動worker進程運行;
(5)一個Topology運行之后,就會不斷的通過Spouts來發送Stream流,通過Bolts來不斷的處理接收到的Stream流,Stream流是無界的。
(6)最后一步會不間斷的執行,除非手動結束Topology。
(二)運行機制補充
有幾點需要說明的地方: (1)每個組件(Spout或者Bolt)的構造方法和declareOutputFields方法都只被調用一次。 (2)open方法、prepare方法的調用是多次的。入口函數中設定的setSpout或者setBolt里的並行度參數指的是executor的數目,是負責運行組件中的task的線程 的數目,此數目是多少,上述的兩個方法就會被調用多少次,在每個executor運行的時候調用一次。相當於一個線程的構造方法。 (3)nextTuple方法、execute方法是一直被運行的,nextTuple方法不斷的發射Tuple,Bolt的execute不斷的接收Tuple進行處理。只有這樣不斷地運行,才會產生無界的Tuple流,體現實時性。相當於線程的run方法。 (4)在提交了一個topology之后,Storm就會創建spout/bolt實例並進行序列化。之后,將序列化的component發送給所有的任務所在的機器(即Supervisor節 點),在每一個任務上反序列化component。 (5)Spout和Bolt之間、Bolt和Bolt之間的通信,是通過zeroMQ的消息隊列實現的。 (6)上圖沒有列出ack方法和fail方法,在一個Tuple被成功處理之后,需要調用ack方法來標記成功,否則調用fail方法標記失敗,重新處理這個Tuple。
(三)補充:終止Topology
通過在Nimbus節點利用如下命令來終止一個Topology的運行:
storm kill topologyName
kill之后,可以通過UI界面查看topology狀態,會首先變成KILLED狀態,在清理完本地目錄和zookeeper集群中的和當前Topology相關的信息之后,此Topology就會徹底消失