一、需求:統計網站訪問量(實時統計)
技術選型:特點(數據量大、做計算、實時) 實時流式計算框架:storm 1)spout 數據源,接入數據源 本地文件 2)splitbolt 業務邏輯處理 切分數據 拿到網址 3)bolt 累加次數求和
1、PvCountSpout類
package com.demo.pvcount; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class PvCountSpout implements IRichSpout{ private SpoutOutputCollector collector; private BufferedReader br; private String line; @Override public void nextTuple() { //發送讀取的數據的每一行 try { while((line = br.readLine())!= null) { //發送數據到splitbolt collector.emit(new Values(line)); //設置延遲 Thread.sleep(500); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { this.collector = collector; //讀取文件 try { br = new BufferedReader(new InputStreamReader(new FileInputStream("e:/weblog.log"))); } catch (FileNotFoundException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //聲明 declarer.declare(new Fields("logs")); } //處理tuple成功 回調的方法 @Override public void ack(Object arg0) { } //如果spout在失效的模式中 調用此方法來激活 @Override public void activate() { } //在spout程序關閉前執行 不能保證一定被執行 kill -9 是不執行 storm kill 是不執行 @Override public void close() { } //在spout失效期間,nextTuple不會被調用 @Override public void deactivate() { } //處理tuple失敗回調的方法 @Override public void fail(Object arg0) { } //配置 @Override public Map<String, Object> getComponentConfiguration() { return null; } }
2、PvCountSplitBolt類
package com.demo.pvcount; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class PvCountSplitBolt implements IRichBolt{ private OutputCollector collector; //一個bolt即將關閉時調用 不能保證一定被調用 資源清理 @Override public void cleanup() { } private int pvnum = 0; //業務邏輯 分布式 集群 並發度 線程 (接收tuple然后進行處理) @Override public void execute(Tuple input) { //1.獲取數據 String line = input.getStringByField("logs"); //2.切分數據 String[] fields = line.split("\t"); String session_id = fields[1]; //3.局部累加 if (session_id != null) { //累加 pvnum++; //輸出 collector.emit(new Values(Thread.currentThread().getId(),pvnum)); } } //初始化調用 @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { this.collector = collector; } //聲明 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //聲明輸出 declarer.declare(new Fields("threadid","pvnum")); } //配置 @Override public Map<String, Object> getComponentConfiguration() { return null; } }
3、PvCountSumBolt類
package com.demo.pvcount; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; public class PvCountSumBolt implements IRichBolt{ private HashMap<Long, Integer> hashMap = new HashMap<>(); @Override public void cleanup() { } //全局累加求和 業務邏輯 @Override public void execute(Tuple input) { //1.獲取數據 Long threadid = input.getLongByField("threadid"); Integer pvnum = input.getIntegerByField("pvnum"); //2.創建集合 存儲(threadid,pvnum) 15 20 hashMap.put(threadid, pvnum); //3.累加求和(拿到集合中所有value值) Iterator<Integer> iterator = hashMap.values().iterator(); //4.清空之前的數據 int sumnum = 0; while (iterator.hasNext()) { sumnum += iterator.next(); } System.err.println(Thread.currentThread().getName() + "總訪問量為->" + sumnum); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
4、PvCountDriver類
package com.demo.pvcount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class PvCountDriver { public static void main(String[] args) { // 1.hadoop->Job storm->topology 創建拓撲 TopologyBuilder builder = new TopologyBuilder(); // 2.指定設置 builder.setSpout("PvCountSpout", new PvCountSpout(), 1); builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4) .fieldsGrouping("PvCountSpout", new Fields("logs")); builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 1).fieldsGrouping("PvCountSplitBolt", new Fields("pvnum")); // 3.創建配置信息 Config conf = new Config(); conf.setNumWorkers(2); // 4.提交任務 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("pvcounttopology", conf, builder.createTopology()); } }
5、PvCountDriver_Shuffle類
package com.demo.pvcount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.topology.TopologyBuilder; public class PvCountDriver_Shuffle { public static void main(String[] args) { // 1.hadoop->Job storm->topology 創建拓撲 TopologyBuilder builder = new TopologyBuilder(); // 2.指定設置 builder.setSpout("PvCountSpout", new PvCountSpout(), 1); builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4) .shuffleGrouping("PvCountSpout"); builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 2).shuffleGrouping("PvCountSplitBolt"); // 3.創建配置信息 Config conf = new Config(); conf.setNumWorkers(2); // 4.提交任務 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("pvcounttopology", conf, builder.createTopology()); } }
6、weblog.log文件
storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 10:40:49 storm.apache.org VVVYH6Y4V4SFXZWWEQRQWEQ 2018-08-07 08:40:50 storm.apache.org BBYH61456DEL89RG5VV9UYU7 2018-08-07 10:40:49 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 09:40:49 storm.apache.org CCYH6Y4V4SCVXTG6DPB4VH9U123 2018-08-07 10:40:49 storm.apache.org CCYH6Y4V4SCVXTG6DPB4VH9U123 2018-08-07 12:40:49 storm.apache.org VVVYH6Y4V4SFXZWWEQRQWEQ 2018-08-07 08:40:52 storm.apache.org CCYH6Y4V4SCVXTG6DPB4VH9U123 2018-08-07 08:40:50 storm.apache.org VVVYH6Y4V4SFXZWWEQRQWEQ 2018-08-07 09:40:49...
...
... storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:53 storm.apache.org BBYH61456DEL89RG5VV9UYU7 2018-08-07 12:40:49 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:51 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 10:40:49 storm.apache.org HUNTERH6YCGFJYERTT834R52FDXV9U34 2018-08-07 08:40:53 storm.apache.org BBYH61456DEL89RG5VV9UYU7 2018-08-07 08:40:50 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:53 storm.apache.org VVVYH6Y4V4SFXZWWEQRQWEQ 2018-08-07 10:40:49
7、運行(4)中的main方法,控制台顯示如下圖:
此時在weblog.log文件中增加幾條數據,則總訪問量相應增加幾條。
至此,簡單實現了網站訪問量實時統計。