Storm安裝及使用


一:安裝storm

(一)安裝一個zookeeper集群

注意:需要先啟動zookeeper集群才可以,不然后面容易出錯

(二)上傳storm的安裝包,解壓

(三)修改配置文件storm.yaml

#所使用的zookeeper集群主機
storm.zookeeper.servers:
- "hadoopH5"
- "hadoopH6"
- "hadoopH7" #nimbus所在的主機名 nimbus.host: "hadoopH5"

可選配置:為worker進程配置端口號(端口數決定worker數)

supervisor.slots.ports
-6701
-6702
-6703
-6704
-6705

(四)啟動storm

1.啟動nimbus

nohup ./storm nimbus 1>/dev/null 2>&1 &        開啟nimbus
nohup ./storm ui 1>/dev/null 2>&1 &          開啟ui界面,通過web服務

 

 

2.啟動supervisor

nohup ./storm supervisor 1>/dev/null 2>&1 &

3.測試UI

二:storm程序編寫 

 (一)程序實現功能

 (二)代碼實現

1.實現spout功能,進行源數據獲取

package cn.storm.tl;

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class RandomWordSpout extends BaseRichSpout{ //用於存放SpoutOutputCollector變量,在open初始化時賦值 private SpoutOutputCollector collector; //數據模擬 String[] words = {"Iphone","XiaoMi","HUAWEI","Geli"}; //是spout組件核心邏輯 //不斷向下一個組件中發送tuple消息  @Override public void nextTuple() { //一般從Kafka消息隊列中獲取數據,這里我們直接從數組中隨機選取數據發送 Random random = new Random(); int index = random.nextInt(words.length); String word=words[index]; //將數據封裝為tuple,通過SpoutOutputCollector控制器實例對象,發送出去 collector.emit(new Values(word)); //values可以輸出元組列表 //間隔時間,休眠500ms Utils.sleep(500); } //進行初始化  @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //進行初始化操作,collector用於發送數據 this.collector = collector; } //聲明輸出tuple元組數據的字段含義  @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("phoneName")); //如果元組數據多個,可以使用list列表聲明  } }

2.實現大寫轉換bolt

package cn.storm.tl;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class UpperBolt extends BaseBasicBolt{ //處理業務邏輯  @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //先獲取到上一個組件傳遞過來的數據,數據存放在tuple String phoneName = tuple.getString(0); //tuple中只存放了一個值,下標為0 //將數據轉為大寫 String phoneName_upper = phoneName.toUpperCase(); //將轉換完成的數據再次發送出去 collector.emit(new Values(phoneName_upper)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("phoneNameUpper")); } }

3.實現后綴添加,以及文件寫入

package cn.storm.tl;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;

import javax.management.RuntimeErrorException;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class SuffixBolt extends BaseBasicBolt{ FileWriter fw = null; //在bolt組件運行過程中,只會被調用一次,可以用於進行初始化操作  @Override public void prepare(Map stormConf, TopologyContext context) { try { fw = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID()); }catch(IOException e) { e.printStackTrace(); } } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //獲取數據,進行修改 String upper_name = tuple.getString(0); //進行修改 String suffix_phone = upper_name+"-2020"; //文件寫入 try { fw.write(suffix_phone); fw.write('\n'); fw.flush(); }catch(IOException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub  } }

4.實現整個topology,提交任務給storm集群

package cn.storm.tl;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

/*
 * 組織各個組件形成一個完整的處理流程,就是所謂的topology
 * 並且將該topology提交給storm進行運行(一直運行,無退出)
 */
public class TopoMain { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder = new TopologyBuilder(); //將spout組件添加到topology中 builder.setSpout("randomspout", new RandomWordSpout(),4); //並發度:啟動executor線程數4 //將大寫轉換bolt組件設置到topology中,並且指定它接收spout消息 builder.setBolt("upperbolt", new UpperBolt(),4).shuffleGrouping("randomspout"); //將添加后綴的bolt組件設置到topology,並指定它接收upperbolt組件的消息 builder.setBolt("suffixbolt", new SuffixBolt(),4).shuffleGrouping("upperbolt"); //用builder創建一個topology StormTopology topology = builder.createTopology(); //配置topology在集群運行時的參數 Config conf = new Config(); conf.setNumWorkers(4); //設置拓撲worker進程數 conf.setDebug(true); conf.setNumAckers(0); //設置事務ack機制,類似於TCP機制 //將這個topology提交給storm集群運行 StormSubmitter.submitTopology("demotopo", conf, topology); } } 

三:結果測試

(一)storm啟動jar包

storm jar demotopo.jar cn.storm.tl.TopoMain

(二)查看supervisor節點 

文件寫入:使用tail -f查看動態文件數據

四:Storm體系結構

(一)Storm中的Nimbus和Supervisor

1.Nimbus和Supervisor之間的所有協調工作都是通過Zookeeper集群完成。

2.Nimbus進程和Supervisor進程都是快速失敗(fail-fast)和無狀態的。所有的狀態要么在zookeeper里面, 要么在本地磁盤上。 3.這也就意味着你可以用kill -9來殺死Nimbus和Supervisor進程, 然后再重啟它們,就好像什么都沒有發生過。這個設計使得Storm異常的穩定。

(二)Storm中的Topologies

一個topology是spouts和bolts組成的圖, 通過stream groupings將圖中的spouts和bolts連接起來,如下圖:

(三)Storm中的Stream

消息流stream是storm里的關鍵抽象;

一個消息流是一個沒有邊界的tuple序列(消息流中包含無限的tuple), 而這些tuple序列會以一種分布式的方式並行地創建和處理;

通過對stream中tuple序列中每個字段命名來定義stream;

在默認的情況下,tuple的字段類型可以是:integer,long,short, byte,string,double,float,boolean和byte array; 可以自定義類型(只要實現相應的序列化器)。

(四)Storm中的Spouts

消息源spout是Storm里面一個topology里面的消息生產者;

一般來說消息源會從一個外部源讀取數據並且向topology里面發出消息:tuple;

Spouts可以是可靠的也可以是不可靠的:如果這個tuple沒有被storm成功處理,可靠的消息源spouts可以重新發射一個tuple, 但是不可靠的消息源spouts一旦發出一個tuple就不能重發了;

消息源可以發射多條消息流stream:
     使用OutputFieldsDeclarer.declareStream來定義多個stream,
     然后使用SpoutOutputCollector來發射指定的stream。

(五)Storm中的Bolts

所有的消息處理邏輯被封裝在bolts里面;

Bolts可以做很多事情:過濾,聚合,查詢數據庫等等。

Bolts可以簡單的做消息流的傳遞,也可以通過多級Bolts的組合來完成復雜的消息流處理;比如求TopN、聚合操作等(如果要把這個過程做得更具有擴展性那么可能需要更多的步驟)。
Bolts可以發射多條消息流: 
     使用OutputFieldsDeclarer.declareStream定義stream;
     使用OutputCollector.emit來選擇要發射的stream;

Bolts的主要方法是execute,:
     它以一個tuple作為輸入,使用OutputCollector來發射tuple;
     通過調用OutputCollector的ack方法,以通知這個tuple的發射者spout;

Bolts一般的流程: 
     處理一個輸入tuple,  發射0個或者多個tuple, 然后調用ack通知storm自己已經處理過這個tuple了;
     storm提供了一個IBasicBolt會自動調用ack。

(六)Storm中的Stream groupings

定義一個topology的關鍵一步是定義每個bolt接收什么樣的流作為輸入;

stream grouping就是用來定義一個stream應該如何分配數據給bolts;

Storm里面有7種類型的stream grouping:
Shuffle Grouping——隨機分組, 隨機派發stream里面的tuple,保證每個bolt接收到的tuple數目大致相同;
Fields Grouping——按字段分組, 比如按userid來分組, 具有同樣userid的tuple會被分到相同的Bolts里的一個task, 而不同的userid則會被分配到不同的bolts里的task;
All Grouping——廣播發送,對於每一個tuple,所有的bolts都會收到;
Global Grouping——全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task;
Non Grouping——不分組,這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程里面去執行;
Direct Grouping——直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪個task處理這個消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。
消息處理者可以通過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id);
Local or shuffle grouping——如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。

(七)Storm中的Workers

一個topology可能會在一個或者多個worker(工作進程)里面執行;

每個worker是一個物理JVM並且執行整個topology的一部分;

比如,對於並行度是300的topology來說,如果我們使用50個工作進程來執行,那么每個工作進程會處理其中的6個tasks;
Storm會盡量均勻的工作分配給所有的worker;

(八)Storm中的Tasks

每一個spout和bolt會被當作很多task在整個集群里執行

每一個executor對應到一個線程,在這個線程上運行多個task

stream grouping則是定義怎么從一堆task發射tuple到另外一堆task

可以調用TopologyBuilder類的setSpout和setBolt來設置並行度(也就是有多少個task)

五:Topology運行機制

(一)運行機制

(1)Storm提交后,會把代碼首先存放到Nimbus節點的inbox目錄下,之后,會把當前Storm運行的配置生成一個stormconf.ser文件放到Nimbus節點的stormdist目錄中,在此目錄中同時還有序列化之后的Topology代碼文件;

(2)在設定Topology所關聯的Spouts和Bolts時,可以同時設置當前Spout和Bolt的executor數目和task數目,默認情況下,一個Topology的task的總和是和executor的總和一致的。之后,系統根據worker的數目,盡量平均的分配這些task的執行。worker在哪個supervisor節點上運行是由storm本身決定的;

(3)任務分配好之后,Nimbes節點會將任務的信息提交到zookeeper集群,同時在zookeeper集群中會有workerbeats節點,這里存儲了當前Topology的所有worker進程的心跳信息;

(4)Supervisor節點會不斷的輪詢zookeeper集群,在zookeeper的assignments節點中保存了所有Topology的任務分配信息、代碼存儲目錄、任務之間的關聯關系等,Supervisor通過輪詢此節點的內容,來領取自己的任務,啟動worker進程運行;

(5)一個Topology運行之后,就會不斷的通過Spouts來發送Stream流,通過Bolts來不斷的處理接收到的Stream流,Stream流是無界的。

(6)最后一步會不間斷的執行,除非手動結束Topology。

 

(二)運行機制補充

有幾點需要說明的地方:
   (1)每個組件(Spout或者Bolt)的構造方法和declareOutputFields方法都只被調用一次。
   (2)open方法、prepare方法的調用是多次的。入口函數中設定的setSpout或者setBolt里的並行度參數指的是executor的數目,是負責運行組件中的task的線程 的數目,此數目是多少,上述的兩個方法就會被調用多少次,在每個executor運行的時候調用一次。相當於一個線程的構造方法。
   (3)nextTuple方法、execute方法是一直被運行的,nextTuple方法不斷的發射Tuple,Bolt的execute不斷的接收Tuple進行處理。只有這樣不斷地運行,才會產生無界的Tuple流,體現實時性。相當於線程的run方法。
   (4)在提交了一個topology之后,Storm就會創建spout/bolt實例並進行序列化。之后,將序列化的component發送給所有的任務所在的機器(即Supervisor節 點),在每一個任務上反序列化component。
   (5)Spout和Bolt之間、Bolt和Bolt之間的通信,是通過zeroMQ的消息隊列實現的。
   (6)上圖沒有列出ack方法和fail方法,在一個Tuple被成功處理之后,需要調用ack方法來標記成功,否則調用fail方法標記失敗,重新處理這個Tuple。

(三)補充:終止Topology

通過在Nimbus節點利用如下命令來終止一個Topology的運行:

storm kill topologyName

kill之后,可以通過UI界面查看topology狀態,會首先變成KILLED狀態,在清理完本地目錄和zookeeper集群中的和當前Topology相關的信息之后,此Topology就會徹底消失

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM