使用storm分別進行計數和詞頻統計


計數

直接上代碼

public class LocalStormSumTopology {

    public static void main(String[] agrs) {

        //Topology是通過build模式創建出來的
        //storm中的所有作業都是通過topology來指定的
        TopologyBuilder builder = new TopologyBuilder();

        //在設置bolt到topology時,需要設置該bolt的上游的spout或者bolt的id,這樣topology才知道該bolt的執行順序,有點類似於單向鏈表結構,
        //每一個環節持有上一個環節的引用,在bolt這里是持有上一個環節的id,這樣同樣可以定位到上一個環節
        builder.setSpout("DataSourceSpout", new DataSourceSpout());
        builder.setBolt("TotalBolt", new TotalBolt()).shuffleGrouping("DataSourceSpout");


        //啟動一個本地的Storm集群,不需要搭真正的集群,本地集群使用LocalCluster來提交topology,如果是在生產環境上提交topology,那么使用
        //這個類StormSubmitter來代替LocalCluster來提交topology
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalStormSumTopology", new Config(), builder.createTopology());
    }


    private static final String NUM = "num";

    /**
     * 發送數據源的spout類,一般是繼承BaseRichSpout這個類
     */
    public static class DataSourceSpout extends BaseRichSpout {

        private SpoutOutputCollector mCollector;

        int num;

        /**
         * 在storm開始的開始工作前回調一次,在這里做初始化
         *
         * @param conf      配置參數
         * @param context   上下文
         * @param collector 數據發射器,用來將數據發送到bolt中,類似於rxjava的數據發射器
         */
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.mCollector = collector;
        }


        /**
         * 這是一個死循環方法,會自動循環調用,這個方法用來發送數據到下游
         */
        public void nextTuple() {

            //將數據發射到bolt中,一般使用Values這個類,傳入的是可變參數,底層封裝成ArrayList
            mCollector.emit(new Values(++num));

            System.out.println("從spout發射出的數據:" + num);

            Utils.sleep(1000);
        }

        /**
         * 聲明從spout中發射的數據的字段名,在bolt階段可以通過這里預設置的字段名進行取值,類似於安卓中的使用sp傳輸,
         * 字段名和發送出來的數據一一對應,這樣如果下游需要接收多個數據發射源,那么可以通過該字段名來做區別
         *
         * @param declarer
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            
            //一般使用Fields來進行封裝字段名fields底層封裝了ArrayList<String>
            declarer.declare(new Fields(NUM));
        }

        @Override
        public void close() {
            this.mCollector = null;
        }
    }

    public static class TotalBolt extends BaseRichBolt {

        private int sum = 0;

        /**
         * 初始化方法,跟spout中的open方法類似,只會調用一次,在這里做初始化
         *
         * @param stormConf
         * @param context
         * @param collector
         */
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        /**
         * 每從上游接收到一個數據,就調用該方法回調過來
         *
         * @param input 用來提取上一個流程傳過來的數據
         */
        public void execute(Tuple input) {
            
            //通過在上游設置的字段名來獲取數據
            Integer integerByField = input.getIntegerByField(NUM);
            sum += integerByField;
            System.out.println("累加的結果是:" + sum);
        }

        /**
         * 為往下游發送的數據加上字段名,方面區別數據的來源
         * @param declarer
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }

    }
}

詞頻統計

直接上代碼

public class LocalWorldCountStormTopology {


    public static void main(String[] agrs) {

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout", new DataSourceSpout());
        builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("DataSourceSpout");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalWorldCountStormTopology", new Config(), builder.createTopology());
    }

    /**
     * 輸出每一行文本的spout
     */

    public static class DataSourceSpout extends BaseRichSpout {

        private SpoutOutputCollector mCollector;

        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.mCollector = collector;
        }

        public void nextTuple() {

            //通過這個方法,可以獲取到某一個文件夾下所有符合規定后綴的文件,並且可以設置是否遞歸獲取
            Collection<File> files = FileUtils.listFiles(new File("/Users/teng/Downloads"), new String[]{"txt"}, true);

            try {
                for (File file : files) {
                    
                    //因為下一步還需要做切割,因此需要先將文件一行一行取出來,放在String集合中
                    List<String> lines = FileUtils.readLines(file);
                    for (String line : lines) {
                        //使用,進行分割
                        String[] split = line.split(",");
                        //發射單詞出去
                        for (String s : split) {
                            mCollector.emit(new Values(s));
                        }
                    }
                //執行完成一次之后,需要修改文件名,這樣就不用一直執行
                FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            //定義數據的字段名
            declarer.declare(new Fields("word"));
        }
    }

    /**
     * 統計詞頻的bolt
     */
    public static class CountBolt extends BaseRichBolt {

        private Map<String, Integer> map = new HashMap<String, Integer>();

        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        public void execute(Tuple input) {
            String word = input.getStringByField("word");

            Integer num = map.get(word);
            if (num == null) {
                num = 1;
            } else {
                num++;
            }

            map.put(word, num);

            System.out.println("~~~~~~~~~");
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String, Integer> entry : entries) {
                System.out.println(entry.getKey() + "出現的次數為:" + entry.getValue());
            }

        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM