Storm常用操作命令及WordCount


Storm常用操作命令

1、任務提交命令:storm jar jar路徑】 【拓撲包名.拓撲類名】 【拓撲名稱】

storm jar /export/servers/storm/examples/storm-starter/storm-starter-topologies-1.0.3.jar org.apache.storm.starter.WordCountTopology  wordcount

與hadoop不同的是:不需要指定輸入輸出路徑

hadoop jar /usr/local/wordcount.jar /data.txt /wcout

 

2、殺死任務命令:storm kill 【拓撲名稱】 -w 10(執行kill命令時可以通過-w [等待秒數]指定拓撲停用以后的等待時間)

storm kill topology-name -w 10

 

3、停用任務命令:storm deactive  【拓撲名稱】

storm deactive topology-name

我們能夠掛起或停用運行中的拓撲。當停用拓撲時,所有已分發的元組都會得到處理,但是spoutsnextTuple方法不會被調用。銷毀一個拓撲,可以使用kill命令。它會以一種安全的方式銷毀一個拓撲,首先停用拓撲,在等待拓撲消息的時間段內允許拓撲完成當前的數據流。

4、啟用任務命令:storm activate 【拓撲名稱】

storm activate topology-name

 

5、重新部署任務命令:storm rebalance  【拓撲名稱】

storm rebalance topology-name

再平衡使你重分配集群任務。這是個很強大的命令。比如,你向一個運行中的集群增加了節點。再平衡命令將會停用拓撲,然后在相應超時時間之后重分配worker,並重啟拓撲。

 

StormWordCount(重點掌握

WordCount分析:

Java版本:

1、讀取文件中的數據,一行一行的讀取;

2、將讀到的數據進行切割;

3、對切割后的數組中的單詞進行計算。

 

Hadoop版本:

1、按行讀取文件中的數據;

2、在Mapper()函數中對每一行的數據進行切割,並輸出切割后的數據數組;

3、接收Mapper()中輸出的數據數組,在Reducer()函數中對數組中的單詞進行計算,將計算后的統計結果輸出。

 

Storm版本:

1、Spout從外部數據源中讀取數據,隨機發送一個元組對象出去;

2、SplitBolt接收Spout中輸出的元組對象,將元組中的數據切分成單詞,並將切分后的單詞發射出去;

3、WordCountBolt接收SplitBolt中輸出的單詞數組,對里面單詞的頻率進行累加,將累加后的結果輸出。

 

StormWordCount代碼實現及分析(重點掌握

在IDEA中創建一個Maven項目,先在pom.xml添加依賴--->import changes

創建Maven項目步驟:

使用IDEA編輯器創建一個Maven項目
前提:假設您已經安裝好了IDEA編輯器,由於編輯器自帶Maven插件,不需要單獨安裝maven。當然IDEA本身是支持安裝外部的maven的。
1、打開編輯器
筆者使用的是14.1 不是當前最新編輯器
2、創建maven項目第一步:
依次點擊軟件左上角的File->new->project
然后選擇maven,並點擊next。在這一步有一個需要注意的地方,就是為你的項目選擇JDK或者SDK.如果您之前沒有配置過JDK,可以點擊new按鈕,設置您JDK的home目錄。
3、填寫maven項目的groupid,和artifactid。然后點擊下一步
一般來講,groupid寫您的公司及部門或項目的名稱,比如:com.ahu
artifactid寫您的子項目或者子模塊的名字,比如當前項目是創建maven項目,我們可以將artifactid寫成:stormwordcount
version可以不用修改
4、填寫項目名稱及指定項目所在的目錄
projectName:StormWordCount
location:任意地址---->比如:E:\StormWordCount
至此,創建maven項目完畢。

1     <dependencies>
2         <dependency>
3             <groupId>org.apache.storm</groupId>
4             <artifactId>storm-core</artifactId>
5             <version>0.9.5</version>
6             <!-- <scope>provided</scope>-->
7         </dependency>
8     </dependencies>

 

 

 

然后在寫相關代碼:

項目主要流程:

 

 1 package com.ahu.storm;
 2 
 3 
 4 import backtype.storm.Config;
 5 import backtype.storm.LocalCluster;
 6 import backtype.storm.StormSubmitter;
 7 import backtype.storm.generated.AlreadyAliveException;
 8 import backtype.storm.generated.InvalidTopologyException;
 9 import backtype.storm.topology.TopologyBuilder;
10 import backtype.storm.tuple.Fields;
11 
12 /**
13  * Created by ahu_lichang on 2017/5/18.
14  */
15 public class WordCountTopologyMain {
16     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
17         //1、准備一個TopologyBuilder
18         //storm框架支持多語言,在Java環境下創建一個拓撲,需要使用TopologyBuilder
19         TopologyBuilder topologyBuilder = new TopologyBuilder();
20         //MySpout類,在已知的英文句子中,所及發送一條句子出去
21         topologyBuilder.setSpout("mySpout", new MySpout(), 2);
22         //MySplitBolt類,主要是將一行一行的文本內容切割成單詞
23         topologyBuilder.setBolt("mybolt1", new MySplitBolt(), 2).shuffleGrouping("mySpout");
24         //MyCountBolt類,負責對單詞的頻率進行累加
25         topologyBuilder.setBolt("mybolt2", new MyCountBolt(), 4).fieldsGrouping("mybolt1", new Fields("word"));
26         /**
27          * i
28          * am
29          * lilei
30          * love
31          * hanmeimei
32          */
33         //2、創建一個configuration,用來指定當前topology 需要的worker的數量
34         //啟動topology的配置信息
35         Config config = new Config();
36         //定義你希望集群分配多少個工作進程給你來執行這個topology
37         config.setNumWorkers(2);
38 
39         //3、提交任務  -----兩種模式 本地模式和集群模式
40         //這里將拓撲名稱寫死了mywordcount,所以在集群上打包運行的時候,不用寫拓撲名稱了!也可用arg[0]
41         StormSubmitter.submitTopology("mywordcount", config, topologyBuilder.createTopology());
42         //LocalCluster localCluster = new LocalCluster();
43         //localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology());
44     }
45 }

 

 

 

MySpout的實現及生命周期:

 

 1 package com.ahu.storm;
 2 
 3 import backtype.storm.spout.SpoutOutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.OutputFieldsDeclarer;
 6 import backtype.storm.topology.base.BaseRichSpout;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Values;
 9 
10 import java.util.Map;
11 
12 /**
13  * Created by ahu_lichang on 2017/5/18.
14  */
15 public class MySpout extends BaseRichSpout {
16     //用來收集Spout輸出的Tuple
17     SpoutOutputCollector collector;
18 
19     //初始化方法
20     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
21         this.collector = collector;
22     }
23 
24     //storm 框架在 while(true) 調用nextTuple方法
25     public void nextTuple() {
26         collector.emit(new Values("i am lilei love hanmeimei"));
27     }
28 
29     //消息源可以發射多條消息流stream.多條消息流可以理解為多種類型的數據
30     public void declareOutputFields(OutputFieldsDeclarer declarer) {
31         declarer.declare(new Fields("sentence"));
32     }
33 }

 

 

 

MySplitBolt的實現及生命周期:

 

 1 package com.ahu.storm;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.OutputFieldsDeclarer;
 6 import backtype.storm.topology.base.BaseRichBolt;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Tuple;
 9 import backtype.storm.tuple.Values;
10 
11 import java.util.Map;
12 
13 /**
14  * Created by ahu_lichang on 2017/5/18.
15  */
16 public class MySplitBolt extends BaseRichBolt {
17     OutputCollector collector;
18 
19     //初始化方法
20     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
21         this.collector = collector;
22     }
23 
24     // 被storm框架 while(true) 循環調用  傳入參數tuple
25     //input內容是句子,execute方法將句子切割成單詞發出
26     public void execute(Tuple input) {
27         String line = input.getString(0);
28         String[] arrWords = line.split(" ");
29         for (String word : arrWords) {
30             collector.emit(new Values(word, 1));
31         }
32     }
33 
34     public void declareOutputFields(OutputFieldsDeclarer declarer) {
35         declarer.declare(new Fields("word", "num"));
36     }
37 }

 

 

 

MyCountBolt的實現及生命周期:

 

 1 package com.ahu.storm;
 2 
 3 
 4 import backtype.storm.task.OutputCollector;
 5 import backtype.storm.task.TopologyContext;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.topology.base.BaseRichBolt;
 8 import backtype.storm.tuple.Tuple;
 9 
10 import java.util.HashMap;
11 import java.util.Map;
12 
13 /**
14  * Created by ahu_lichang on 2017/5/18.
15  */
16 public class MyCountBolt extends BaseRichBolt {
17     OutputCollector collector;
18     //用來保存最后計算的結果key=單詞,value=單詞個數
19     Map<String, Integer> map = new HashMap<String, Integer>();
20 
21     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
22         this.collector = collector;
23     }
24 
25     public void execute(Tuple input) {
26         String word = input.getString(0);
27         Integer num = input.getInteger(1);
28         System.out.println(Thread.currentThread().getId() + "    word:" + word);
29         if (map.containsKey(word)) {
30             Integer count = map.get(word);
31             map.put(word, count + num);
32         } else {
33             map.put(word, num);
34         }
35         System.out.println("count:" + map);
36     }
37 
38     public void declareOutputFields(OutputFieldsDeclarer declarer) {
39         //不輸出
40     }
41 }

 

 

 

兩種運行模式

1、本地模式:直接在IDEA中的WordCountTopologyMain運行即可在控制台觀察到輸出結果

2、集群模式:

要打包運行。打包方法:

將jar包上傳到storm1上,去運行storm /root/stormwordcount.XXXX.jar  com.ahu.storm.WordCountTopologyMain 

注意:這樣打包運行的時候,會出錯:NoClassDefFoundError: backtype/storm/topology/IRichSpout

這是因為打包的時候,有的jar包沒有打到里面去,打包方式不對!需要在pom.xml指定一個Build,指定打包的方式,將所有的依賴都打成jar。

 1     <build>
 2         <plugins>
 3             <plugin>
 4                 <artifactId>maven-assembly-plugin</artifactId>
 5                 <configuration>
 6                     <descriptorRefs>
 7                         <descriptorRef>jar-with-dependencies</descriptorRef>
 8                     </descriptorRefs>
 9                    <!-- <archive>
10                         <manifest>
11                             <mainClass>com.ahu.storm.hadoop.mapreduce.wordcount.WordCount</mainClass>
12                         </manifest>
13                     </archive>-->
14                 </configuration>
15                 <executions>
16                     <execution>
17                         <id>make-assembly</id>
18                         <phase>package</phase>
19                         <goals>
20                             <goal>single</goal>
21                         </goals>
22                     </execution>
23                 </executions>
24             </plugin>
25         </plugins>
26     </build>

 

這樣再打包運行,就不會出錯了!運行成功后,可以在worker運行的機器上查看日志:/export/servers/storm/logs/下查看,tail -100f worker-6701.log.1

 

Storm具體的任務執行流程圖

 

Stream Grouping詳解

Storm里面有7種類型的stream grouping

l Shuffle Grouping: 隨機分組, 隨機派發stream里面的tuple,保證每個bolt接收到的tuple數目大致相同。

Fields Grouping按字段分組,比如按userid來分組,具有同樣useridtuple會被分到相同的Bolts里的一個task,而不同的userid則會被分配到不同的bolts里的task

l All Grouping:廣播發送,對於每一個tuple,所有的bolts都會收到。

l Global Grouping:全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task

l Non Grouping:不分組,這stream grouping個分組的意思是說stream不關心到底誰會收到它的tuple目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程里面去執行。

l Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味着消息的發送者指定由消息接收者的哪個task處理這個消息。只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來獲取處理它的消息的taskid OutputCollector.emit方法也會返回taskid)。

Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM