基於Storm的WordCount


Storm WordCount 工作過程

Storm 版本:
1、Spout 從外部數據源中讀取數據,隨機發送一個元組對象出去;
2、SplitBolt 接收 Spout 中輸出的元組對象,將元組中的數據切分成單詞,並將切分后的單詞發射出去;
3、WordCountBolt 接收 SplitBolt 中輸出的單詞數組,對里面單詞的頻率進行累加,將累加后的結果輸出。

Java 版本:
1、讀取文件中的數據,一行一行的讀取;
2、將讀到的數據進行切割;
3、對切割后的數組中的單詞進行計算。

Hadoop 版本:
1、按行讀取文件中的數據;
2、在 Mapper()函數中對每一行的數據進行切割,並輸出切割后的數據數組;
3、接收 Mapper()中輸出的數據數組,在 Reducer()函數中對數組中的單詞進行計算,將計算后的統計結果輸出。

源代碼

storm的配置、eclipse里maven的配置以及創建項目部分省略。

Mainclass

package com.test.stormwordcount;
import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.StormSubmitter; 
import backtype.storm.generated.AlreadyAliveException; 
import backtype.storm.generated.InvalidTopologyException; 
import backtype.storm.topology.TopologyBuilder; 
import backtype.storm.tuple.Fields; 

public class MainClass { 

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {         
	    //創建一個 TopologyBuilder         
	    TopologyBuilder tb = new TopologyBuilder();         
	    tb.setSpout("SpoutBolt", new SpoutBolt(), 2);         tb.setBolt("SplitBolt", new SplitBolt(), 2).shuffleGrouping("SpoutBolt");         
	    tb.setBolt("CountBolt", new CountBolt(), 4).fieldsGrouping("SplitBolt", new Fields("word"));         
	    //創建配置         
	    Config conf = new Config();         
	    //設置 worker 數量         
        conf.setNumWorkers(2);         
        //提交任務         
	    //集群提交         
        //StormSubmitter.submitTopology("myWordcount", conf, tb.createTopology());         
	    //本地提交         
	    LocalCluster localCluster = new LocalCluster();         
	    localCluster.submitTopology("myWordcount", conf, tb.createTopology()); 
    }  
} 

SplitBolt 部分

package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values; 

public class SplitBolt extends BaseRichBolt{      
    OutputCollector collector; 

    /**      * 初始化      */     
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
	    this.collector = collector;     
	    } 

    /**      * 執行方法      */     
    public void execute(Tuple input) {         
	    String line = input.getString(0);         
	    String[] split = line.split(" ");         
	    for (String word : split) {             
		    collector.emit(new Values(word));         
		    }     
	    } 

    /**      * 輸出      */     
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
	    declarer.declare(new Fields("word"));     
	    } 
} 

CountBolt 部分

package com.test.stormwordcount;
import java.util.HashMap; 
import java.util.Map; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Tuple; 

public class CountBolt extends BaseRichBolt{ 

    OutputCollector collector;
    Map<String, Integer> map = new HashMap<String, Integer>(); 

    /**      * 初始化      */     
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {         
	    this.collector = collector;     
	    } 


    /**      * 執行方法      */     
public void execute(Tuple input) {         
    String word = input.getString(0);         
    if(map.containsKey(word)){             
	Integer c = map.get(word);             
	    map.put(word, c+1);         
	    }else{             
		map.put(word, 1);         
		}         
    //測試輸出         
    System.out.println("結果:"+map);     
    } 

    /**      * 輸出      */     
public void declareOutputFields(OutputFieldsDeclarer declarer) {     
	
} 
} 

SpoutBolt 部分

package com.test.stormwordcount;
import java.util.Map; 
import backtype.storm.spout.SpoutOutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichSpout; 
import backtype.storm.tuple.Fields; 
import backtype.storm.tuple.Values; 

public class SpoutBolt extends BaseRichSpout{ 

    SpoutOutputCollector collector;
    /**      * 初始化方法      */     
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {         
	    this.collector = collector;     
	    } 

    /**      * 重復調用方法      */     
    public void nextTuple() {         
	    collector.emit(new Values("hello world this is a test"));     
	    } 

    /**      * 輸出      */     
    public void declareOutputFields(OutputFieldsDeclarer declarer) {         
	    declarer.declare(new Fields("test"));     
	    } 
} 

POM.XML 文件內容

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.test</groupId>
<artifactId>stormwordcount</artifactId>
<version>0.9.6</version>
<packaging>jar</packaging>

<name>stormwordcount</name>
<url>http://maven.apache.org</url>

<properties>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
	<dependency>
		<groupId>junit</groupId>
		<artifactId>junit</artifactId>
		<version>3.8.1</version>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.storm</groupId>
		<artifactId>storm-core</artifactId>
		<version>0.9.6</version>
	</dependency>
</dependencies>
<build>
	<plugins>
		<plugin>
			<artifactId>maven-assembly-plugin</artifactId>
			<configuration>
				<descriptorRefs>
					<descriptorRef>jar-with-dependencies</descriptorRef>
				</descriptorRefs>
				<archive>
					<manifest>
						<mainClass>com.test.stormwordcount.MainClass</mainClass>
					</manifest>
				</archive>
			</configuration>
			<executions>
				<execution>
					<id>make-assembly</id>
					<phase>package</phase>
					<goals>
						<goal>single</goal>
					</goals>
				</execution>
			</executions>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<configuration>
				<source>1.7</source>
				<target>1.7</target>
			</configuration>
		</plugin>
	</plugins>
</build>

遇到的問題

基於Storm的WordCount需要eclipse安裝了maven插件,之前的大數據實踐安裝的eclipse版本為Eclipse IDE for Eclipse Committers4.5.2,這個版本不自帶maven插件,后續安裝失敗了幾次(網上很多的教程都已經失效),這里分享一下我成功安裝的方法:
使用鏈接下載,Help->Install New SoftWare

點擊Add,name輸入隨意,在location輸入下載eclipse的maven插件,下載地址可以這樣獲取
點擊連接:http://www.eclipse.org/m2e/index.html 進入網站后點擊download,拉到最下面可以看到很多eclipse maven插件的版本和發布時間,選在適合eclipse的版本復制鏈接即可。建議取消選中Contack all update sites during install to find required software(耗時太久)。

但是安裝成功后還是無法配置(這里原因不太清楚,沒找到解決辦法),就直接上官網換成自帶maven插件的JavaEE IDE了...

后續的maven的配置這些都比較順利,第一次創建maven-archetype-quickstat項目報錯,試了網上很多辦法都還沒成功,然后打開 Windows->Preferencs->Maven->Installation發現之前配置了的maven的安裝路徑沒了...重新配置了下就可以創建項目了。

最后運行成功的結果:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM