網站訪問量實時統計


一、需求:統計網站訪問量(實時統計)

技術選型:特點(數據量大、做計算、實時)

實時流式計算框架:storm

1)spout
數據源,接入數據源
本地文件

2)splitbolt
業務邏輯處理
切分數據
拿到網址

3)bolt
累加次數求和

1、PvCountSpout類

package com.demo.pvcount;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class PvCountSpout implements IRichSpout{

    private SpoutOutputCollector collector;
    private BufferedReader br;
    private String line;
    
    @Override
    public void nextTuple() {
        //發送讀取的數據的每一行
        try {
            while((line = br.readLine())!= null) {
                //發送數據到splitbolt
                collector.emit(new Values(line));
                //設置延遲
                Thread.sleep(500);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
        this.collector = collector;
        
        //讀取文件
        try {
            br = new BufferedReader(new InputStreamReader(new FileInputStream("e:/weblog.log")));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //聲明
        declarer.declare(new Fields("logs"));
    }
    
    //處理tuple成功 回調的方法
    @Override
    public void ack(Object arg0) {
    }

    //如果spout在失效的模式中 調用此方法來激活
    @Override
    public void activate() {
    }

    //在spout程序關閉前執行 不能保證一定被執行 kill -9 是不執行 storm kill 是不執行
    @Override
    public void close() {
    }

    //在spout失效期間,nextTuple不會被調用
    @Override
    public void deactivate() {
    }

    //處理tuple失敗回調的方法
    @Override
    public void fail(Object arg0) {
    }

    //配置
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

2、PvCountSplitBolt類

package com.demo.pvcount;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class PvCountSplitBolt implements IRichBolt{

    private OutputCollector collector;
    
    //一個bolt即將關閉時調用 不能保證一定被調用 資源清理
    @Override
    public void cleanup() {
    }

    private int pvnum = 0;
    //業務邏輯 分布式 集群 並發度 線程 (接收tuple然后進行處理)
    @Override
    public void execute(Tuple input) {
        //1.獲取數據
        String line = input.getStringByField("logs");
        
        //2.切分數據
        String[] fields = line.split("\t");
        String session_id = fields[1];
        
        //3.局部累加
        if (session_id != null) {
            //累加
            pvnum++;
            //輸出
            collector.emit(new Values(Thread.currentThread().getId(),pvnum));
        }
    }

    //初始化調用
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        this.collector = collector;
    }

    //聲明
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //聲明輸出
        declarer.declare(new Fields("threadid","pvnum"));
    }

    //配置
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

3、PvCountSumBolt類

package com.demo.pvcount;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

public class PvCountSumBolt implements IRichBolt{

    private HashMap<Long, Integer> hashMap = new HashMap<>();
    
    @Override
    public void cleanup() {
    }

    //全局累加求和 業務邏輯
    @Override
    public void execute(Tuple input) {
        //1.獲取數據
        Long threadid = input.getLongByField("threadid");
        Integer pvnum = input.getIntegerByField("pvnum");
        
        //2.創建集合 存儲(threadid,pvnum) 15 20
        hashMap.put(threadid, pvnum);
        
        //3.累加求和(拿到集合中所有value值)
        Iterator<Integer> iterator = hashMap.values().iterator();
        
        //4.清空之前的數據
        int sumnum = 0;
        while (iterator.hasNext()) {
            sumnum += iterator.next();
        }
        
        System.err.println(Thread.currentThread().getName() + "總訪問量為->" + sumnum);
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

4、PvCountDriver類

package com.demo.pvcount;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class PvCountDriver {
    public static void main(String[] args) {
        // 1.hadoop->Job storm->topology 創建拓撲
        TopologyBuilder builder = new TopologyBuilder();

        // 2.指定設置
        builder.setSpout("PvCountSpout", new PvCountSpout(), 1);
        builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4)
                .fieldsGrouping("PvCountSpout", new Fields("logs"));
        builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 1).fieldsGrouping("PvCountSplitBolt", new Fields("pvnum"));

        // 3.創建配置信息
        Config conf = new Config();
        conf.setNumWorkers(2);

        // 4.提交任務
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("pvcounttopology", conf, builder.createTopology());
    }
}

5、PvCountDriver_Shuffle類

package com.demo.pvcount;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class PvCountDriver_Shuffle {
    public static void main(String[] args) {
        // 1.hadoop->Job storm->topology 創建拓撲
        TopologyBuilder builder = new TopologyBuilder();

        // 2.指定設置
        builder.setSpout("PvCountSpout", new PvCountSpout(), 1);
        builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4)
                .shuffleGrouping("PvCountSpout");
        builder.setBolt("PvCountSumBolt", new PvCountSumBolt(), 2).shuffleGrouping("PvCountSplitBolt");

        // 3.創建配置信息
        Config conf = new Config();
        conf.setNumWorkers(2);

        // 4.提交任務
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("pvcounttopology", conf, builder.createTopology());
    }
}

6、weblog.log文件

storm.apache.org    EEH6Y21245GHI899OFG4V9U567    2018-08-07 10:40:49
storm.apache.org    VVVYH6Y4V4SFXZWWEQRQWEQ    2018-08-07 08:40:50
storm.apache.org    BBYH61456DEL89RG5VV9UYU7    2018-08-07 10:40:49
storm.apache.org    EEH6Y21245GHI899OFG4V9U567    2018-08-07 09:40:49
storm.apache.org    CCYH6Y4V4SCVXTG6DPB4VH9U123    2018-08-07 10:40:49
storm.apache.org    CCYH6Y4V4SCVXTG6DPB4VH9U123    2018-08-07 12:40:49
storm.apache.org    VVVYH6Y4V4SFXZWWEQRQWEQ    2018-08-07 08:40:52
storm.apache.org    CCYH6Y4V4SCVXTG6DPB4VH9U123    2018-08-07 08:40:50
storm.apache.org    VVVYH6Y4V4SFXZWWEQRQWEQ    2018-08-07 09:40:49...
...
...
storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:53 storm.apache.org BBYH61456DEL89RG5VV9UYU7 2018-08-07 12:40:49 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:51 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 10:40:49 storm.apache.org HUNTERH6YCGFJYERTT834R52FDXV9U34 2018-08-07 08:40:53 storm.apache.org BBYH61456DEL89RG5VV9UYU7 2018-08-07 08:40:50 storm.apache.org EEH6Y21245GHI899OFG4V9U567 2018-08-07 08:40:53 storm.apache.org VVVYH6Y4V4SFXZWWEQRQWEQ 2018-08-07 10:40:49

7、運行(4)中的main方法,控制台顯示如下圖:

此時在weblog.log文件中增加幾條數據,則總訪問量相應增加幾條。

至此,簡單實現了網站訪問量實時統計。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM