一.
1、對比:離線計算和實時計算
離線計算:MapReduce,批量處理(Sqoop-->HDFS--> MR ---> HDFS)
實時計算:Storm和Spark Sparking,數據實時性(Flume ---> Kafka ---> 流式計算 ---> Redis)
2、常見的實時計算(流式計算)代表
(1)Apache Storm
(2)Spark Streaming
(3)Apache Flink:既可以流式計算,也可以離線計算
二、Storm的體系結構

三、安裝和配置Apache Storm
1、前提條件:安裝ZooKeeper(Hadoop的HA)
tar -zxvf apache-storm-1.0.3.tar.gz -C ~/training/
設置環境變量:
STORM_HOME=/root/training/apache-storm-1.0.3
export STORM_HOME
PATH=$STORM_HOME/bin:$PATH
export PATH
配置文件: conf/storm.yaml
注意:- 后面有一個空格
: 后面有一個空格
2、Storm的偽分布模式(bigdata11)
18 storm.zookeeper.servers:
19 - "bigdata11"
主節點的信息
23 nimbus.seeds: ["bigdata11"]
每個從節點上的worker個數
25 supervisor.slots:ports:
26 - 6700
27 - 6701
28 - 6702
29 - 6703
任務上傳后,保存的目錄
storm.local.dir: "/root/training/apache-storm-1.0.3/tmp"
啟動Storm:bigdata11
主節點: storm nimbus &
從節點: storm supervisor &
UI: storm ui & ---> http://ip:8080
logviewer:storm logviewer &
3、Storm的全分布模式(bigdata12 bigdata13 bigdata14)
(*)在bigata12上進行配置
storm.zookeeper.servers:
- "bigdata12"
- "bigdata13"
- "bigdata14"
nimbus.seeds: ["bigdata12"]
storm.local.dir: "/root/training/apache-storm-1.0.3/tmp"
supervisor.slots:ports:
- 6700
- 6701
- 6702
- 6703
(*)復制到其他節點
scp -r apache-storm-1.0.3/ root@bigdata13:/root/training
scp -r apache-storm-1.0.3/ root@bigdata14:/root/training
(*)啟動
bigdata12: storm nimbus &
storm ui &
storm logviewer &
bigdata13: storm supervisor &
storm logviwer &
bigdata14: storm supervisor &
storm logviwer &
4、Storm的HA(bigdata12 bigdata13 bigdata14)
每台機器都要修改:
nimbus.seeds: ["bigdata12", "bigdata13"]
在bigdata13上,單獨啟動一個nimbus ----> not leader
還可以單獨啟動一個UI
四.WordCount數據流動的過程

用Java程序實現:
WordCountSpout.java
package demo; import java.util.Map; import java.util.Random; import java.util.stream.Collector; import org.apache.jute.Utils; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; /** * @作用:采集數據,送到下一個Bolt組件 * */ public class WordCountSpout extends BaseRichSpout{ /** * */ private static final long serialVersionUID = 1L; //定義數據 private String[] data = {"I love Beijing","I love China","Beijing is the capital of China"}; private SpoutOutputCollector collector; @Override public void nextTuple() { //每三秒采集一次 org.apache.storm.utils.Utils.sleep(3000); // 由storm框架進行調用,用於接收外部系統產生的數據 //隨機產生一個字符串,代表采集的數據 int random = new Random().nextInt(3);//3以內隨機數 //采集數據,然后發送給下一個組件 System.out.println("采集的數據是: "+data[random]); this.collector.emit(new Values(data[random])); } /* * SpoutOutputCollector 輸出流 */ @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { // spout組件初始化方法 this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 聲明輸出的schema declarer.declare(new Fields("sentence")); } }
WordCountSplitBolt.java
package demo; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * 第一個Bolt組件,用於分詞操作 * */ public class WordCountSplitBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void execute(Tuple tuple) { //處理上一個組件發來的數據 //獲取數據 String line = tuple.getStringByField("sentence"); //分詞 String[] words = line.split(" "); //輸出 for (String word : words) { this.collector.emit(new Values(word,1)); } } //OutputCollector:bolt組件輸出流 @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { // 對bolt組件初始化 this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 聲明輸出的Schema declarer.declare(new Fields("word","count")); } }
WordCountTotalBolt.java
package demo; import java.util.HashMap; import java.util.Map; import org.apache.storm.generated.DistributedRPCInvocations.AsyncProcessor.result; import org.apache.storm.shade.org.apache.commons.lang.Validate; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * 第二個Bolt組件:單詞的計數 * */ public class WordCountTotalBolt extends BaseRichBolt{ private OutputCollector collector; private Map<String, Integer> result = new HashMap<>(); @Override public void execute(Tuple tuple) { //獲取數據:單詞、頻率:1 String word = tuple.getStringByField("word"); int count = tuple.getIntegerByField("count"); if (result.containsKey(word)) { //單詞已存在 int total = result.get(word); result.put(word, total+count); }else { //單詞不存在 result.put(word, count); } //輸出 System.out.println("輸出的結果是: "+ result); //發送給下一個組件 this.collector.emit(new Values(word,result.get(word))); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { // TODO Auto-generated method stub this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("word","total")); } }
WordCountTopology.java
package demo; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.generated.StormTopology; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.redis.bolt.RedisStoreBolt; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.ITuple; public class WordCountTopology { public static void main(String[] args) throws Exception { //設置用戶為root權限 System.setProperty("HADOOP_USER_NAME", "root"); //創建一個任務:Topology = spout + bolt(s) TopologyBuilder builder = new TopologyBuilder(); //設置任務的第一個組件:spout組件 builder.setSpout("mywordcount_spout", new WordCountSpout()); //builder.setSpout("mywordcount_spout", createKafkaSpout()); //設置任務的第二個組件:bolt組件,拆分單詞 builder.setBolt("mywordcount_split", new WordCountSplitBolt()).shuffleGrouping("mywordcount_spout"); //設置任務的第三個組件:bolt組件,計數 builder.setBolt("mywordcount_total", new WordCountTotalBolt()).fieldsGrouping("mywordcount_split", new Fields("word")); //設置任務的第四個bolt組件,將結果寫入Redis //builder.setBolt("mywordcount_redis", createRedisBolt()).shuffleGrouping("mywordcount_total"); //設置任務的第四個bolt組件,將結果寫入HDFS //builder.setBolt("mywordcount_hdfs", createHDFSBolt()).shuffleGrouping("mywordcount_total"); //設置任務的第四個bolt組件,將結果寫入HBase //builder.setBolt("mywordcount_hdfs", new WordCountHBaseBolt()).shuffleGrouping("mywordcount_total"); //創建任務 StormTopology topology = builder.createTopology(); //配置參數 Config conf = new Config(); //提交任務 //方式1:本地模式(直接在eclipse運行) LocalCluster cluster = new LocalCluster(); cluster.submitTopology("mywordcount", conf, topology); // 方式2 集群模式: storm jar temp/storm.jar demo.WordCountTopology MyStormWordCount //StormSubmitter.submitTopology(args[0], conf, topology); } private static IRichBolt createHDFSBolt() { // 創建一個HDFS的Bolt組件,寫入到HDFS HdfsBolt bolt = new HdfsBolt(); //指定HDFS位置:namenode地址 bolt.withFsUrl("hdfs://192.168.153.11:9000"); //數據保存在HDFS哪個目錄 bolt.withFileNameFormat(new DefaultFileNameFormat().withPath("/stormresult")); //ָ指定key和value的分隔符:Beijing|10 bolt.withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter("|")); //生成文件的策略:每5M生成一個文件 bolt.withRotationPolicy(new FileSizeRotationPolicy(5.0f,Units.MB)); //與HDFS進行數據同步的策略:tuple數據達到1K同步一次 bolt.withSyncPolicy(new CountSyncPolicy(1024)); return bolt; } private static IRichBolt createRedisBolt() { // 創建一個Redis的bolt組件,將數據寫入redis中 //創建一個Redis的連接池 JedisPoolConfig.Builder builder = new JedisPoolConfig.Builder(); builder.setHost("192.168.153.11"); builder.setPort(6379); JedisPoolConfig poolConfig = builder.build(); //storeMapper: 存入Redis中數據的格式 return new RedisStoreBolt(poolConfig, new RedisStoreMapper() { @Override public RedisDataTypeDescription getDataTypeDescription() { // 聲明存入Redis的數據類型 return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH,"wordcount"); } @Override public String getValueFromTuple(ITuple tuple) { // 從上一個組件接收的value return String.valueOf(tuple.getIntegerByField("total")); } @Override public String getKeyFromTuple(ITuple tuple) { // 從上一個組件接收的key return tuple.getStringByField("word"); } }); } }
集成redis結果:

集成hdfs:

集成hbase:
WordCountHBaseBolt.java
package demo; import java.util.Map; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.generated.master.table_jsp; import org.apache.hadoop.hbase.util.Bytes; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; /** * 創建一個HBASE的表:create 'result','info' * */ public class WordCountHBaseBolt extends BaseRichBolt { //定義一個Hbase的客戶端 private HTable htable; @Override public void execute(Tuple tuple) { //得到上一個組件處理的數據 String word = tuple.getStringByField("word"); int total = tuple.getIntegerByField("total"); //創建一個put對象 Put put = new Put(Bytes.toBytes(word)); //列族:info 列:word 值:word put.add(Bytes.toBytes("info"), Bytes.toBytes("word"), Bytes.toBytes(word)); put.add(Bytes.toBytes("info"), Bytes.toBytes("total"), Bytes.toBytes(String.valueOf(total))); try { htable.put(put); } catch (Exception e) { e.printStackTrace(); } } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { // 初始化:指定HBASE的相關信息 } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { // TODO Auto-generated method stub } }

通過hbase shell打開hbase命令行
五.Strom任務提交的過程

1.客戶端提交任務
2.創建任務的本地目錄
3.nimbus分配任務到zookeeper
4.supervisor從ZK獲取分配的任務,啟動對應的worker來執行任務
5.將任務執行的心跳存入ZK
6.nimbus監聽任務的執行
六、Storm內部通信的機制
任務的執行:worker中的Executor

