storm的開發環境搭建比hadoop(參見前文http://www.cnblogs.com/wuxun1997/p/6849878.html)簡單,無需安裝插件,只需新建一個java項目並配置好lib包引用即可。本地跑也無需先啟動storm,直接Run As->Java Application完事。下面細看:
1、新建項目:在eclipse中點File->New->選Project->Java Project->next,輸入自己想要的項目名,我這里寫storm,點Finish;
2、引入jar包:右擊storm項目src目錄->Build Path,選Config Build Path->Libraries->Add Library,選User Library,點next,點擊User Libraries->點New,輸入引用lib包名,這里寫storm->點Add External JARs,選storm安裝目錄lib包所在路徑:D:\apache-storm-1.1.0\lib,為了使用中文分詞還要引用到IKAnalyzer2012_FF.jar,該包下載地址同樣參見上面鏈接->一路確定后就可以開始寫代碼了;
3、代碼結構如下:
src
|---com.wulinfeng.storm.wordsplit.WordSplit.java
|---IKAnalyzer.cfg.xml
|---myext.dic
|---mystopword.dic
除了WordSplit.java要新寫,其他3個文件無需修改,內容參見上面鏈接。
package com.wulinfeng.storm.wordsplit; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.StringReader; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.wltea.analyzer.core.IKSegmenter; import org.wltea.analyzer.core.Lexeme; public class WordSplit { /** * 發射數據源 * * @author Administrator * */ public static class WordReaderSpout extends BaseRichSpout { SpoutOutputCollector _collector; InputStreamReader isr; boolean isEnd = false; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { String inputFile = "D:/input/people.txt"; try { isr = new InputStreamReader(new FileInputStream(inputFile)); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } _collector = collector; } @Override public void nextTuple() { // 讀取文件一次就無需再讀了 if (isEnd) { System.out.println("*******Spout is over, no neccessary to emit.*********"); return; } // 讀本地文件,一行發射一次 String line = null; try (BufferedReader br = new BufferedReader(isr)) { while ((line = br.readLine()) != null) { System.out.printf("line : %s", line); _collector.emit(new Values(line)); } } catch (IOException e) { e.printStackTrace(); } finally { isEnd = true; // 文件讀完了 } } @Override public void ack(Object id) { } @Override public void fail(Object id) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } /** * 處理上面發射過來的數據源 * * @author Administrator * */ public static class SplitWordBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.outputCollector = collector; } @Override public void execute(Tuple tuple) { String sentence = tuple.getString(0); // 一次處理一行 IKSegmenter ikSeg = new IKSegmenter(new StringReader(sentence), true); // 智能分詞 try { for (Lexeme lexeme = ikSeg.next(); lexeme != null; lexeme = ikSeg.next()) { outputCollector.emit(new Values(lexeme.getLexemeText())); } } catch (IOException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } /** * 統計從上面取到的分詞,關鍵人名統計后的放到result.txt * * @author Administrator * */ public static class WordCountBolt extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); String out; Set<String> keyName = new HashSet<>(); @Override public void prepare(Map stormConf, TopologyContext context) { out = "D:/out/result.txt"; // 判斷result文件是否已存在,是則先刪掉,以待新建 File outFile = new File(out); if (outFile.exists()) { outFile.delete(); } // 讀字典文件並放入一個set,以備參照set里的人名讀取統計結果,寫入result.txt文件 try (BufferedReader br = new BufferedReader( new InputStreamReader(WordSplit.class.getClassLoader().getResourceAsStream("myext.dic")))) { String peopleName = null; while ((peopleName = br.readLine()) != null) { keyName.add(peopleName); } } catch (IOException e) { e.printStackTrace(); } } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); // 每次統計一個分詞 Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void cleanup() { // 最后時刻,輸出關鍵人名的統計結果到result.txt文件 try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(out)))) { for (Map.Entry<String, Integer> keyWord : counts.entrySet()) { if (keyName.contains(keyWord.getKey())) { bw.write(keyWord.getKey() + " : " + keyWord.getValue() + "\r\n"); bw.flush(); } } } catch (IOException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } /** * 輸出分詞結果到本地文件,過程數據放在tmp文件 * * @author Administrator * */ public static class SaveOutput extends BaseRichBolt { String temp; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { temp = "D:/out/tmp" + System.currentTimeMillis(); // 判斷tmp文件是否已存在,是則先刪掉,以待新建 File tempFile = new File(temp); if (tempFile.exists()) { tempFile.delete(); } } @Override public void execute(Tuple input) { // 從上面獲取分詞的累計次數 String name = input.getStringByField("word"); Integer counts = input.getIntegerByField("count"); // 輸出分詞統計過程追加到tmp文件 try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(temp, true)))) { bw.write(name + " : " + counts + "\r\n"); bw.flush(); } catch (IOException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); // 新建一個拓撲 builder.setSpout("spout", new WordReaderSpout(), 1); // 設置數據源 // 讀取spout里的數據,進行split處理 builder.setBolt("split", new SplitWordBolt(), 10).shuffleGrouping("spout"); // 讀取split后的數據,進行count處理 builder.setBolt("count", new WordCountBolt(), 10).fieldsGrouping("split", new Fields("word")); // 保存計算結果 builder.setBolt("save", new SaveOutput(), 10).allGrouping("count"); Config conf = new Config(); conf.setDebug(true); conf.setMaxTaskParallelism(1); // 有參數則到集群跑,沒有則在本地跑 if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-split", conf, builder.createTopology()); Thread.sleep(300000); // 5分鍾后自動結束 cluster.shutdown(); } } }
上面的java文件直接右鍵選擇Run As->Java Application就可以跑起來了,因為是流的形式,所以會跑得慢一些,這里設置5分鍾自動結束。跑的時候可以看到D:\out\tmpXXX.txt不斷在刷數據,跑結束后可以去D:\out\result.txt看那幾個豬腳的出境率。跑集群的話需要先起zookeeper和storm,把上面代碼和引用的lib包打個jar,到命令行里去執行storm jar,運行情況可以去localhost:8088上看。