eclipse配置storm1.1.0開發環境並本地跑起來


  storm的開發環境搭建比hadoop(參見前文http://www.cnblogs.com/wuxun1997/p/6849878.html)簡單,無需安裝插件,只需新建一個java項目並配置好lib包引用即可。本地跑也無需先啟動storm,直接Run As->Java Application完事。下面細看:

  1、新建項目:在eclipse中點File->New->選Project->Java Project->next,輸入自己想要的項目名,我這里寫storm,點Finish;

  2、引入jar包:右擊storm項目src目錄->Build Path,選Config Build Path->Libraries->Add Library,選User Library,點next,點擊User Libraries->點New,輸入引用lib包名,這里寫storm->點Add External JARs,選storm安裝目錄lib包所在路徑:D:\apache-storm-1.1.0\lib,為了使用中文分詞還要引用到IKAnalyzer2012_FF.jar,該包下載地址同樣參見上面鏈接->一路確定后就可以開始寫代碼了;

  3、代碼結構如下:

src

 |---com.wulinfeng.storm.wordsplit.WordSplit.java

 |---IKAnalyzer.cfg.xml

 |---myext.dic

 |---mystopword.dic

  除了WordSplit.java要新寫,其他3個文件無需修改,內容參見上面鏈接。

package com.wulinfeng.storm.wordsplit;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.StringReader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

public class WordSplit {

    /**
     * 發射數據源
     * 
     * @author Administrator
     *
     */
    public static class WordReaderSpout extends BaseRichSpout {

        SpoutOutputCollector _collector;
        InputStreamReader isr;
        boolean isEnd = false;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            String inputFile = "D:/input/people.txt";
            try {
                isr = new InputStreamReader(new FileInputStream(inputFile));
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            _collector = collector;
        }

        @Override
        public void nextTuple() {
            // 讀取文件一次就無需再讀了
            if (isEnd) {
                System.out.println("*******Spout is over, no neccessary to emit.*********");
                return;
            }

            // 讀本地文件,一行發射一次
            String line = null;
            try (BufferedReader br = new BufferedReader(isr)) {
                while ((line = br.readLine()) != null) {
                    System.out.printf("line : %s", line);
                    _collector.emit(new Values(line));
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                isEnd = true; // 文件讀完了
            }

        }

        @Override
        public void ack(Object id) {
        }

        @Override
        public void fail(Object id) {
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

    }

    /**
     * 處理上面發射過來的數據源
     * 
     * @author Administrator
     *
     */
    public static class SplitWordBolt extends BaseRichBolt {

        private OutputCollector outputCollector;

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.outputCollector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0); // 一次處理一行
            IKSegmenter ikSeg = new IKSegmenter(new StringReader(sentence), true); // 智能分詞
            try {
                for (Lexeme lexeme = ikSeg.next(); lexeme != null; lexeme = ikSeg.next()) {
                    outputCollector.emit(new Values(lexeme.getLexemeText()));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }

    }

    /**
     * 統計從上面取到的分詞,關鍵人名統計后的放到result.txt
     * 
     * @author Administrator
     *
     */
    public static class WordCountBolt extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
        String out;
        Set<String> keyName = new HashSet<>();

        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            out = "D:/out/result.txt";

            // 判斷result文件是否已存在,是則先刪掉,以待新建
            File outFile = new File(out);
            if (outFile.exists()) {
                outFile.delete();
            }

            // 讀字典文件並放入一個set,以備參照set里的人名讀取統計結果,寫入result.txt文件
            try (BufferedReader br = new BufferedReader(
                    new InputStreamReader(WordSplit.class.getClassLoader().getResourceAsStream("myext.dic")))) {
                String peopleName = null;
                while ((peopleName = br.readLine()) != null) {
                    keyName.add(peopleName);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0); // 每次統計一個分詞
            Integer count = counts.get(word);
            if (count == null)
                count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }

        @Override
        public void cleanup() {
            // 最后時刻,輸出關鍵人名的統計結果到result.txt文件
            try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(out)))) {
                for (Map.Entry<String, Integer> keyWord : counts.entrySet()) {
                    if (keyName.contains(keyWord.getKey())) {
                        bw.write(keyWord.getKey() + " : " + keyWord.getValue() + "\r\n");
                        bw.flush();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    /**
     * 輸出分詞結果到本地文件,過程數據放在tmp文件
     * 
     * @author Administrator
     *
     */
    public static class SaveOutput extends BaseRichBolt {
        String temp;

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            temp = "D:/out/tmp" + System.currentTimeMillis();

            // 判斷tmp文件是否已存在,是則先刪掉,以待新建
            File tempFile = new File(temp);
            if (tempFile.exists()) {
                tempFile.delete();
            }
        }

        @Override
        public void execute(Tuple input) {
            // 從上面獲取分詞的累計次數
            String name = input.getStringByField("word");
            Integer counts = input.getIntegerByField("count");

            // 輸出分詞統計過程追加到tmp文件
            try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(temp, true)))) {
                bw.write(name + " : " + counts + "\r\n");
                bw.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO Auto-generated method stub

        }
    }

    public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder(); // 新建一個拓撲

        builder.setSpout("spout", new WordReaderSpout(), 1); // 設置數據源

        // 讀取spout里的數據,進行split處理
        builder.setBolt("split", new SplitWordBolt(), 10).shuffleGrouping("spout");

        // 讀取split后的數據,進行count處理
        builder.setBolt("count", new WordCountBolt(), 10).fieldsGrouping("split", new Fields("word"));

        // 保存計算結果
        builder.setBolt("save", new SaveOutput(), 10).allGrouping("count");

        Config conf = new Config();
        conf.setDebug(true);

        conf.setMaxTaskParallelism(1);

        // 有參數則到集群跑,沒有則在本地跑
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-split", conf, builder.createTopology());
            Thread.sleep(300000); // 5分鍾后自動結束
            cluster.shutdown();
        }
    }

}

  上面的java文件直接右鍵選擇Run As->Java Application就可以跑起來了,因為是流的形式,所以會跑得慢一些,這里設置5分鍾自動結束。跑的時候可以看到D:\out\tmpXXX.txt不斷在刷數據,跑結束后可以去D:\out\result.txt看那幾個豬腳的出境率。跑集群的話需要先起zookeeper和storm,把上面代碼和引用的lib包打個jar,到命令行里去執行storm jar,運行情況可以去localhost:8088上看。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM