大數據筆記(二十二)——大數據實時計算框架Storm


一.

1、對比:離線計算和實時計算
離線計算:MapReduce,批量處理(Sqoop-->HDFS--> MR ---> HDFS)
實時計算:Storm和Spark Sparking,數據實時性(Flume ---> Kafka ---> 流式計算 ---> Redis)

2、常見的實時計算(流式計算)代表
(1)Apache Storm
(2)Spark Streaming
(3)Apache Flink:既可以流式計算,也可以離線計算

二、Storm的體系結構

 

三、安裝和配置Apache Storm
1、前提條件:安裝ZooKeeper(Hadoop的HA)

tar -zxvf apache-storm-1.0.3.tar.gz -C ~/training/
設置環境變量:

STORM_HOME=/root/training/apache-storm-1.0.3
export STORM_HOME

PATH=$STORM_HOME/bin:$PATH
export PATH


配置文件: conf/storm.yaml
注意:- 后面有一個空格
: 后面有一個空格

2、Storm的偽分布模式(bigdata11)
18 storm.zookeeper.servers:
19 - "bigdata11"

主節點的信息
23 nimbus.seeds: ["bigdata11"]

每個從節點上的worker個數
25 supervisor.slots:ports:
26 - 6700
27 - 6701
28 - 6702
29 - 6703

任務上傳后,保存的目錄
storm.local.dir: "/root/training/apache-storm-1.0.3/tmp"

啟動Storm:bigdata11
主節點: storm nimbus &
從節點: storm supervisor &
UI: storm ui & ---> http://ip:8080
logviewer:storm logviewer &

3、Storm的全分布模式(bigdata12 bigdata13 bigdata14)
(*)在bigata12上進行配置
storm.zookeeper.servers:
- "bigdata12"
- "bigdata13"
- "bigdata14"

nimbus.seeds: ["bigdata12"]
storm.local.dir: "/root/training/apache-storm-1.0.3/tmp"
supervisor.slots:ports:
- 6700
- 6701
- 6702
- 6703

(*)復制到其他節點
scp -r apache-storm-1.0.3/ root@bigdata13:/root/training
scp -r apache-storm-1.0.3/ root@bigdata14:/root/training


(*)啟動
bigdata12: storm nimbus &
storm ui &
storm logviewer &

bigdata13: storm supervisor &
storm logviwer &

bigdata14: storm supervisor &
storm logviwer &


4、Storm的HA(bigdata12 bigdata13 bigdata14)
每台機器都要修改:
nimbus.seeds: ["bigdata12", "bigdata13"]

在bigdata13上,單獨啟動一個nimbus ----> not leader
還可以單獨啟動一個UI

 

四.WordCount數據流動的過程

 

 用Java程序實現:

WordCountSpout.java

package demo;

import java.util.Map;
import java.util.Random;
import java.util.stream.Collector;

import org.apache.jute.Utils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

/**
 * @作用:采集數據,送到下一個Bolt組件
 *
 */
public class WordCountSpout extends BaseRichSpout{

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    //定義數據
    private String[] data = {"I love Beijing","I love China","Beijing is the capital of China"};
    
    private SpoutOutputCollector collector;
    
    @Override
    public void nextTuple() {
        //每三秒采集一次
        org.apache.storm.utils.Utils.sleep(3000);
        
        // 由storm框架進行調用,用於接收外部系統產生的數據
        //隨機產生一個字符串,代表采集的數據
        int random = new Random().nextInt(3);//3以內隨機數
        
        //采集數據,然后發送給下一個組件
        System.out.println("采集的數據是: "+data[random]);
        this.collector.emit(new Values(data[random]));
    }

    /* 
     * SpoutOutputCollector 輸出流
     */
    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
        // spout組件初始化方法
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 聲明輸出的schema
        declarer.declare(new Fields("sentence"));
    }

}

WordCountSplitBolt.java

package demo;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * 第一個Bolt組件,用於分詞操作
 *
 */
public class WordCountSplitBolt extends BaseRichBolt{

    private OutputCollector collector;
    @Override
    public void execute(Tuple tuple) {
        //處理上一個組件發來的數據
        //獲取數據
        String line = tuple.getStringByField("sentence");
        //分詞
        String[] words = line.split(" ");
        
        //輸出
        for (String word : words) {
            this.collector.emit(new Values(word,1));
        }
    }

    //OutputCollector:bolt組件輸出流
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        // 對bolt組件初始化
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 聲明輸出的Schema
        declarer.declare(new Fields("word","count"));
    }

}

WordCountTotalBolt.java

package demo;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.generated.DistributedRPCInvocations.AsyncProcessor.result;
import org.apache.storm.shade.org.apache.commons.lang.Validate;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

/**
 * 第二個Bolt組件:單詞的計數
 *
 */
public class WordCountTotalBolt extends BaseRichBolt{

    private OutputCollector collector;
    
    private Map<String, Integer> result = new HashMap<>();
    
    @Override
    public void execute(Tuple tuple) {
        //獲取數據:單詞、頻率:1
        String word = tuple.getStringByField("word");
        int count = tuple.getIntegerByField("count");
        
        if (result.containsKey(word)) {
            //單詞已存在
            int total = result.get(word);
            result.put(word, total+count);
        }else {
            //單詞不存在
            result.put(word, count);
        }
        
        //輸出
        System.out.println("輸出的結果是: "+ result);
        //發送給下一個組件
        this.collector.emit(new Values(word,result.get(word)));
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("word","total"));
    }

}

WordCountTopology.java

package demo;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.ITuple;

public class WordCountTopology {

    public static void main(String[] args) throws Exception {
        //設置用戶為root權限
        System.setProperty("HADOOP_USER_NAME", "root");
        //創建一個任務:Topology = spout + bolt(s)
        
        TopologyBuilder builder = new TopologyBuilder();
        
        //設置任務的第一個組件:spout組件
        builder.setSpout("mywordcount_spout", new WordCountSpout());
        //builder.setSpout("mywordcount_spout", createKafkaSpout());
        
        //設置任務的第二個組件:bolt組件,拆分單詞
        builder.setBolt("mywordcount_split", new WordCountSplitBolt()).shuffleGrouping("mywordcount_spout");
        
        //設置任務的第三個組件:bolt組件,計數
        builder.setBolt("mywordcount_total", new WordCountTotalBolt()).fieldsGrouping("mywordcount_split", new Fields("word"));
        
        //設置任務的第四個bolt組件,將結果寫入Redis
        //builder.setBolt("mywordcount_redis", createRedisBolt()).shuffleGrouping("mywordcount_total");
        
        //設置任務的第四個bolt組件,將結果寫入HDFS
        //builder.setBolt("mywordcount_hdfs", createHDFSBolt()).shuffleGrouping("mywordcount_total");

        //設置任務的第四個bolt組件,將結果寫入HBase
        //builder.setBolt("mywordcount_hdfs", new WordCountHBaseBolt()).shuffleGrouping("mywordcount_total");        
        
        //創建任務
        StormTopology topology = builder.createTopology();
        
        //配置參數
        Config conf = new Config();
        
        //提交任務
        //方式1:本地模式(直接在eclipse運行)
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("mywordcount", conf, topology);
        
        // 方式2 集群模式: storm jar temp/storm.jar demo.WordCountTopology MyStormWordCount
        //StormSubmitter.submitTopology(args[0], conf, topology);
    }

    private static IRichBolt createHDFSBolt() {
        // 創建一個HDFS的Bolt組件,寫入到HDFS
        HdfsBolt bolt = new HdfsBolt();
        
        //指定HDFS位置:namenode地址
        bolt.withFsUrl("hdfs://192.168.153.11:9000");
        
        //數據保存在HDFS哪個目錄
        bolt.withFileNameFormat(new DefaultFileNameFormat().withPath("/stormresult"));
        
        //ָ指定key和value的分隔符:Beijing|10
        bolt.withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter("|"));
        
        //生成文件的策略:每5M生成一個文件
        bolt.withRotationPolicy(new FileSizeRotationPolicy(5.0f,Units.MB));
        
        //與HDFS進行數據同步的策略:tuple數據達到1K同步一次
        bolt.withSyncPolicy(new CountSyncPolicy(1024));
        
        return bolt;
    }

    private static IRichBolt createRedisBolt() {
        // 創建一個Redis的bolt組件,將數據寫入redis中
        //創建一個Redis的連接池
        
        JedisPoolConfig.Builder builder = new JedisPoolConfig.Builder();
        builder.setHost("192.168.153.11");
        builder.setPort(6379);
        JedisPoolConfig poolConfig = builder.build();
    
        //storeMapper: 存入Redis中數據的格式
        return new RedisStoreBolt(poolConfig, new RedisStoreMapper() {
            
            @Override
            public RedisDataTypeDescription getDataTypeDescription() {
                // 聲明存入Redis的數據類型
                return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,"wordcount");
            }
            
            @Override
            public String getValueFromTuple(ITuple tuple) {
                // 從上一個組件接收的value
                return String.valueOf(tuple.getIntegerByField("total"));
            }
            
            @Override
            public String getKeyFromTuple(ITuple tuple) {
                // 從上一個組件接收的key
                return tuple.getStringByField("word");
            }
        });
    }

}

集成redis結果:

集成hdfs:

集成hbase:

WordCountHBaseBolt.java

package demo;

import java.util.Map;

import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.generated.master.table_jsp;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

/**
 * 創建一個HBASE的表:create 'result','info'
 *
 */
public class WordCountHBaseBolt extends BaseRichBolt {

    //定義一個Hbase的客戶端
    private HTable htable;
    
    @Override
    public void execute(Tuple tuple) {
        //得到上一個組件處理的數據
        String word = tuple.getStringByField("word");
        int total = tuple.getIntegerByField("total");
        
        //創建一個put對象
        Put put = new Put(Bytes.toBytes(word));
        //列族:info 列:word 值:word
        put.add(Bytes.toBytes("info"), Bytes.toBytes("word"), Bytes.toBytes(word));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("total"), Bytes.toBytes(String.valueOf(total)));
        try {
            htable.put(put);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        // 初始化:指定HBASE的相關信息
        
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        // TODO Auto-generated method stub
        
    }


}

 

通過hbase shell打開hbase命令行

 

五.Strom任務提交的過程

1.客戶端提交任務

2.創建任務的本地目錄

3.nimbus分配任務到zookeeper

4.supervisor從ZK獲取分配的任務,啟動對應的worker來執行任務

5.將任務執行的心跳存入ZK

6.nimbus監聽任務的執行

六、Storm內部通信的機制

 任務的執行:worker中的Executor

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM