一、Storm编程模型
- Topology:Storm中运行的一个实时应用程序的名称。(拓扑)
- Spout:在一个topology中获取源数据流的组件。
- 通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。
- Bolt:接受数据然后执行处理的组件,用户可以在其中执行自己想要的操作。
- Tuple:一次消息传递的基本单元,理解为一组消息就是一个Tuple。
- Stream:表示数据的流向。
- StreamGroup:数据分组策略
- Shuffle Grouping :随机分组,尽量均匀分布到下游Bolt中
- Fields Grouping :按字段分组,按数据中field值进行分组;相同field值的Tuple被发送到相同的Task
- All grouping:广播
- Global grouping :全局分组,Tuple被分配到一个Bolt中的一个Task,实现事务性的Topology。
- None grouping :不分组
- Direct grouping :直接分组 指定分组
二、流式计算一般框架图
- Flume用来获取数据。
- Kafka用来临时保存数据。
- Strom用来计算数据。
- Redis是个内存数据库,用来保存数据。
0.使用Maven管理工程,pom.xml需要添加的依赖
pom.xml
<!-- apache storm core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>1.0.3</version>
</dependency>
完整的pom.xml

1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 <groupId>com.wulei</groupId> 5 <artifactId>Bigdata</artifactId> 6 <version>1.0.0</version> 7 8 9 <properties> 10 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 11 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 12 <maven.compiler.encoding>UTF-8</maven.compiler.encoding> 13 <hadoop.version>2.7.3</hadoop.version> 14 </properties> 15 16 <dependencies> 17 <!-- Start-SQL connector --> 18 <dependency> 19 <groupId>mysql</groupId> 20 <artifactId>mysql-connector-java</artifactId> 21 <version>5.1.43</version> 22 </dependency> 23 24 <!-- Hadoop 2.7.3 --> 25 <dependency> 26 <groupId>org.apache.hadoop</groupId> 27 <artifactId>hadoop-client</artifactId> 28 <version>${hadoop.version}</version> 29 </dependency> 30 <dependency> 31 <groupId>org.apache.hadoop</groupId> 32 <artifactId>hadoop-common</artifactId> 33 <version>${hadoop.version}</version> 34 </dependency> 35 <dependency> 36 <groupId>org.apache.hadoop</groupId> 37 <artifactId>hadoop-hdfs</artifactId> 38 <version>${hadoop.version}</version> 39 </dependency> 40 41 <!-- HBase --> 42 <dependency> 43 <groupId>org.apache.hbase</groupId> 44 <artifactId>hbase</artifactId> 45 <version>1.3.1</version> 46 <type>pom</type> 47 </dependency> 48 49 <dependency> 50 <groupId>org.apache.hbase</groupId> 51 <artifactId>hbase-client</artifactId> 52 <version>1.3.1</version> 53 </dependency> 54 55 56 <dependency> 57 <groupId>org.apache.mrunit</groupId> 58 <artifactId>mrunit</artifactId> 59 <version>1.1.0</version> 60 <classifier>hadoop2</classifier> 61 <scope>test</scope> 62 </dependency> 63 64 <dependency> 65 <groupId>org.mockito</groupId> 66 <artifactId>mockito-all</artifactId> 67 <version>1.10.19</version> 68 <scope>test</scope> 69 </dependency> 70 71 <dependency> 72 <groupId>junit</groupId> 73 <artifactId>junit</artifactId> 74 <version>4.12</version> 75 <scope>test</scope> 76 </dependency> 77 78 <!-- apache storm core --> 79 <dependency> 80 <groupId>org.apache.storm</groupId> 81 <artifactId>storm-core</artifactId> 82 <version>1.0.3</version> 83 <scope>provided</scope> 84 </dependency> 85 86 <dependency> 87 <groupId>org.apache.storm</groupId> 88 <artifactId>storm-rename-hack</artifactId> 89 <version>1.0.3</version> 90 </dependency> 91 92 <dependency> 93 <groupId>org.apache.storm</groupId> 94 <artifactId>storm-hbase</artifactId> 95 <version>1.0.3</version> 96 <scope>test</scope> 97 </dependency> 98 99 <dependency> 100 <groupId>org.apache.storm</groupId> 101 <artifactId>storm-redis</artifactId> 102 <version>1.0.3</version> 103 </dependency> 104 105 106 107 </dependencies> 108 <build> 109 <plugins> 110 <plugin> 111 <groupId>org.apache.maven.plugins</groupId> 112 <artifactId>maven-compiler-plugin</artifactId> 113 <configuration> 114 <source>1.8</source> 115 <target>1.8</target> 116 </configuration> 117 </plugin> 118 119 <plugin> 120 <groupId>org.apache.maven.plugins</groupId> 121 <artifactId>maven-shade-plugin</artifactId> 122 <version>2.4.1</version> 123 <configuration> 124 <createDependencyReducedPom>false</createDependencyReducedPom> 125 </configuration> 126 <executions> 127 <execution> 128 <phase>package</phase> 129 <goals> 130 <goal>shade</goal> 131 </goals> 132 <configuration> 133 <transformers> 134 <transformer 135 implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 136 <mainClass>com.bigdata.storm.WordCountTopology</mainClass> 137 </transformer> 138 </transformers> 139 </configuration> 140 </execution> 141 </executions> 142 </plugin> 143 144 </plugins> 145 </build> 146 </project>
1.Spout组件:创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源
WordCountSpout.java

1 package com.bigdata.storm; 2 3 import java.util.Map; 4 import java.util.Random; 5 6 import org.apache.storm.spout.SpoutOutputCollector; 7 import org.apache.storm.task.TopologyContext; 8 import org.apache.storm.topology.OutputFieldsDeclarer; 9 import org.apache.storm.topology.base.BaseRichSpout; 10 import org.apache.storm.tuple.Fields; 11 import org.apache.storm.tuple.Values; 12 import org.apache.storm.utils.Utils; 13 14 //采集数据: Spout组件 15 public class WordCountSpout extends BaseRichSpout{ 16 //模拟产生一些数据 17 private String[] data = {"I love Beijing","I love China","Beijing is the capital of China"}; 18 19 //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件 20 private SpoutOutputCollector collector; 21 22 @Override 23 public void nextTuple() { 24 //每隔3秒 采集一次数据 25 Utils.sleep(3000); 26 27 //由Storm的引擎调用,用于处理采集的每条数据 28 29 //生成一个3以内的随机数 30 int random = (new Random()).nextInt(3); 31 String value = data[random]; 32 33 //打印 34 System.out.println("采集的数据是:" + value); 35 36 //发送给下一个组件 37 this.collector.emit(new Values(value)); 38 } 39 40 @Override 41 public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { 42 //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件 43 //在open方法中对collector初始化 44 this.collector = collector; 45 } 46 47 @Override 48 public void declareOutputFields(OutputFieldsDeclarer declare) { 49 // 申明发送给下一个组建的tuple的schema(结构) 50 declare.declare(new Fields("sentence")); 51 } 52 }
2.Bolt组件1:创建Bolt(WordCountSplitBolt)组件进行分词操作
WordCountSplitBolt.java

1 package com.bigdata.storm; 2 3 import java.util.Map; 4 5 import org.apache.storm.task.OutputCollector; 6 import org.apache.storm.task.TopologyContext; 7 import org.apache.storm.topology.OutputFieldsDeclarer; 8 import org.apache.storm.topology.base.BaseRichBolt; 9 import org.apache.storm.tuple.Fields; 10 import org.apache.storm.tuple.Tuple; 11 import org.apache.storm.tuple.Values; 12 13 //第一个bolt组件:单词拆分 14 public class WordCountSplitBolt extends BaseRichBolt{ 15 16 //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件 17 private OutputCollector collector; 18 19 @Override 20 public void execute(Tuple tuple) { 21 //如何处理上一级发来的数据: I love Beijing 22 String value = tuple.getStringByField("sentence"); 23 24 //分词 25 String[] words = value.split(" "); 26 27 //输出 28 for(String w:words){ 29 collector.emit(new Values(w,1)); 30 } 31 } 32 33 @Override 34 public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { 35 // 初始化 36 //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件 37 this.collector = collector; 38 } 39 40 @Override 41 public void declareOutputFields(OutputFieldsDeclarer declare) { 42 // 申明发送给下一个组建的tuple的schema(结构) 43 declare.declare(new Fields("word","count")); 44 } 45 }
3.Bolt组件2:创建Bolt(WordCountBoltCount)组件进行单词统计操作
WordCountTotalBolt.java

package com.bigdata.storm; import java.util.HashMap; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; //第二个Bolt组件:单词的计数 public class WordCountTotalBolt extends BaseRichBolt{ //使用Map集合存储结果 private Map<String, Integer> result = new HashMap<>(); //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件 private OutputCollector collector; @Override public void execute(Tuple tuple) { //取出数据 String word = tuple.getStringByField("word"); int count = tuple.getIntegerByField("count"); //求和 if(result.containsKey(word)){ //如果已经存在,累加 int total = result.get(word); result.put(word, total+count); }else{ //这是一个新单词 result.put(word, count); } //输出到屏幕 System.out.println("统计的结果是:" + result); //输出给下一个组件 单词 总频率 this.collector.emit(new Values(word,result.get(word))); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declare) { declare.declare(new Fields("word","total")); } }
4.Topology主程序:(WordCountTopology)
WordCountTopology.java

package com.bigdata.storm; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class WordCountTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); //指定任务的spout组件 builder.setSpout("mywordcountspout", new WordCountSpout()); //指定任务的第一个bolt组件 builder.setBolt("mywordcountsplit", new WordCountSplitBolt()) .shuffleGrouping("mywordcountspout");//随机分组 //指定任务的第二个bolt组件 builder.setBolt("mywordcounttotal", new WordCountTotalBolt()) .fieldsGrouping("mywordcountsplit", new Fields("word")); //创建任务 StormTopology job = builder.createTopology(); Config conf = new Config(); //任务有两种运行方式:1、本地模式 2、集群模式 //1、本地模式 LocalCluster localcluster = new LocalCluster(); localcluster.submitTopology("MyWordCount", conf, job); //2、集群模式:用于打包jar,并放到storm运行 // StormSubmitter.submitTopology(args[0], conf, job); } }
5.本地运行主程序:Run java application的结果
6.storm集群运行程序jar
集群运行时,需要将WordCountTopology.java的代码的本地cluster运行注释掉,通过这一行运行集群:StormSubmitter.submitTopology(args[0], conf, job);
然后打包工程为jar包,至少以上包含4个java文件。并且主程序设置为WordCountTopology.java。将打包的jar发送到storm环境的任意Linux目录下:
- mystorm.jar包内容:
- 上传到storm环境Linux
[root@bigdata111 mapreducejar]# pwd
/root/mapreducejar
[root@bigdata111 mapreducejar]# ls mystorm.jar
mystorm.jar
- 运行jar程序
提交任务命令格式:storm jar 【jar包路径】 【包名.topology类名】 【别名】
[root@bigdata111 mapreducejar]# storm jar mystorm.jar com.bigdata.storm.WordCountTopology MyWordCountAlia
- 查看WEB的任务运行
- 查看任务的详情【参考Storm配置文章:【大数据实时计算框架】Storm框架】
三、storm集群任务提交过程
四、storm内部通信机制
五、storm集成
storm提供了对多个产品的集成,常见的有Redis,HDFS,HBase,JDBC,JMS,Hive,Kafka等。并且提供了集成的API。集成API的位置:
[root@bigdata111 external]# pwd
/root/training/apache-storm-1.0.3/external
[root@bigdata111 external]# ls
flux storm-eventhubs storm-jdbc storm-mongodb
sql storm-hbase storm-jms storm-mqtt
storm-cassandra storm-hdfs storm-kafka storm-redis
storm-elasticsearch storm-hive storm-kafka-client storm-solr
1.与Redis集成
- 开启redis:bin/redis-server conf/redis.conf
- 重写Topology主程序(与redis集成时,本地模式运行即可)

1 package demo; 2 3 import org.apache.storm.Config; 4 import org.apache.storm.LocalCluster; 5 import org.apache.storm.StormSubmitter; 6 import org.apache.storm.generated.AlreadyAliveException; 7 import org.apache.storm.generated.AuthorizationException; 8 import org.apache.storm.generated.InvalidTopologyException; 9 import org.apache.storm.generated.StormTopology; 10 import org.apache.storm.redis.bolt.RedisStoreBolt; 11 import org.apache.storm.redis.common.config.JedisPoolConfig; 12 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; 13 import org.apache.storm.redis.common.mapper.RedisStoreMapper; 14 import org.apache.storm.topology.IRichBolt; 15 import org.apache.storm.topology.TopologyBuilder; 16 import org.apache.storm.tuple.Fields; 17 import org.apache.storm.tuple.ITuple; 18 19 //主程序 20 public class WordCountTopology { 21 22 public static void main(String[] args) throws Exception{ 23 //创建任务构建器 24 TopologyBuilder builder = new TopologyBuilder(); 25 26 //指定任务的spout组件 27 builder.setSpout("mywordcountspout", new WordCountSpout()); 28 29 //指定任务的第一个bolt组件 30 builder.setBolt("mywordcountsplit", new WordCountSplitBolt()) 31 .shuffleGrouping("mywordcountspout");//随机分组 32 33 //指定任务的第二个bolt组件 34 builder.setBolt("mywordcounttotal", new WordCountTotalBolt()) 35 .fieldsGrouping("mywordcountsplit", new Fields("word")); 36 37 //指定任务的第三个bolt组件,将结果写入Redis 38 builder.setBolt("mywordcountredisbolt", createRedisBolt()) 39 .shuffleGrouping("mywordcounttotal"); 40 41 42 43 //创建任务 44 StormTopology job = builder.createTopology(); 45 46 Config conf = new Config(); 47 48 //任务有两种运行方式:1、本地模式 2、集群模式 49 //1、本地模式 50 LocalCluster localcluster = new LocalCluster(); 51 localcluster.submitTopology("MyWordCount", conf, job); 52 53 //2、集群模式 54 // StormSubmitter.submitTopology(args[0], conf, job); 55 } 56 57 private static IRichBolt createRedisBolt() { 58 // 创建一个RedisBolt 59 JedisPoolConfig.Builder builder = new JedisPoolConfig.Builder(); 60 builder.setHost("10.30.30.146"); 61 builder.setPort(6379); 62 JedisPoolConfig pool = builder.build(); 63 64 //RedisStoreMapper指定redis数据格式 65 return new RedisStoreBolt(pool, new RedisStoreMapper() { 66 67 @Override 68 public RedisDataTypeDescription getDataTypeDescription() { 69 //保存结果的Redis的数据类型 70 return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, 71 "result"); 72 } 73 74 @Override 75 public String getValueFromTuple(ITuple tuple) { 76 return String.valueOf(tuple.getIntegerByField("total")); 77 } 78 79 @Override 80 public String getKeyFromTuple(ITuple tuple) { 81 return tuple.getStringByField("word"); 82 } 83 }); 84 } 85 }
- 运行程序前,连接redis-cli,查看此时的result是否有数据:hgetall result 为空
- 运行程序后,再次查看:可以看到本地运行时,redis数据库已经存储了我们的数据,并且是一直在增长。
2.与HBase集成
- 启动HBase(确保zk已经启动:QuorumPeerMain进):start-habse.sh
- 启动Hadoop:start-all.sh
查看进程,确保如下进程启动完毕:Hadoop,hbase,zookeeper相关进程
- 登录HBase创建表:create ‘result’,'info'
- 新建java类,处理HBase Bolt任务:
WordCountHBaseBolt.java

1 package com.bigdata.storm; 2 3 import java.io.IOException; 4 import java.util.Map; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.HTable; 8 import org.apache.hadoop.hbase.client.Put; 9 import org.apache.hadoop.hbase.util.Bytes; 10 import org.apache.storm.task.OutputCollector; 11 import org.apache.storm.task.TopologyContext; 12 import org.apache.storm.topology.OutputFieldsDeclarer; 13 import org.apache.storm.topology.base.BaseRichBolt; 14 import org.apache.storm.tuple.Tuple; 15 16 //将结果写入HBase 17 public class WordCountHBaseBolt extends BaseRichBolt { 18 19 //定义HBase的表的客户端 20 private HTable table; 21 22 @Override 23 public void execute(Tuple tuple) { 24 //取出数据 25 String word = tuple.getStringByField("word"); 26 int total = tuple.getIntegerByField("total"); 27 28 //构造一个Put对象 29 Put put = new Put(Bytes.toBytes(word)); 30 put.add(Bytes.toBytes("info"), Bytes.toBytes("word"), Bytes.toBytes(word)); 31 put.add(Bytes.toBytes("info"), Bytes.toBytes("total"), Bytes.toBytes(String.valueOf(total))); 32 33 //插入数据 34 try { 35 table.put(put); 36 } catch (IOException e) { 37 // TODO Auto-generated catch block 38 e.printStackTrace(); 39 } 40 } 41 42 @Override 43 public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { 44 //创建HBase的客户端 45 //指定Zk 46 Configuration conf = new Configuration(); 47 conf.set("hbase.zookeeper.quorum", "10.30.30.146"); 48 49 try { 50 table = new HTable(conf, "result"); 51 } catch (IOException e) { 52 // TODO Auto-generated catch block 53 e.printStackTrace(); 54 } 55 } 56 57 @Override 58 public void declareOutputFields(OutputFieldsDeclarer arg0) { 59 // TODO Auto-generated method stub 60 61 } 62 63 }
- 重写topology主类
WordCountTopology.java

1 package com.bigdata.storm; 2 3 import org.apache.storm.Config; 4 import org.apache.storm.LocalCluster; 5 import org.apache.storm.StormSubmitter; 6 import org.apache.storm.generated.StormTopology; 7 import org.apache.storm.redis.bolt.RedisStoreBolt; 8 import org.apache.storm.redis.common.config.JedisPoolConfig; 9 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; 10 import org.apache.storm.redis.common.mapper.RedisStoreMapper; 11 import org.apache.storm.topology.IRichBolt; 12 import org.apache.storm.topology.TopologyBuilder; 13 import org.apache.storm.tuple.Fields; 14 import org.apache.storm.tuple.ITuple; 15 16 public class WordCountTopology { 17 public static void main(String[] args) throws Exception { 18 TopologyBuilder builder = new TopologyBuilder(); 19 20 // 指定任务的spout组件 21 builder.setSpout("mywordcountspout", new WordCountSpout()); 22 23 // 指定任务的第一个bolt组件 24 builder.setBolt("mywordcountsplit", new WordCountSplitBolt()).shuffleGrouping("mywordcountspout");// 随机分组 25 26 // 指定任务的第二个bolt组件 27 builder.setBolt("mywordcounttotal", new WordCountTotalBolt()).fieldsGrouping("mywordcountsplit", 28 new Fields("word")); 29 // // 指定任务的第三个bolt组件,将结果写入Redis 30 // builder.setBolt("mywordcountredisbolt", createRedisBolt()).shuffleGrouping("mywordcounttotal"); 31 32 // 指定任务的第三个bolt组件,将结果写入HBase 33 builder.setBolt("mywordcountredisbolt", new WordCountHBaseBolt()).shuffleGrouping("mywordcounttotal"); 34 35 // 创建任务 36 StormTopology job = builder.createTopology(); 37 38 Config conf = new Config(); 39 40 // 任务有两种运行方式:1、本地模式 2、集群模式 41 // 1、本地模式 42 LocalCluster localcluster = new LocalCluster(); 43 localcluster.submitTopology("MyWordCount", conf, job); 44 45 // 2、集群模式:用于打包jar,并放到storm运行 46 // StormSubmitter.submitTopology(args[0], conf, job); 47 } 48 49 private static IRichBolt createRedisBolt() { 50 // 创建一个RedisBolt 51 JedisPoolConfig.Builder builder = new JedisPoolConfig.Builder(); 52 builder.setHost("10.30.30.146"); 53 builder.setPort(6379); 54 JedisPoolConfig pool = builder.build(); 55 56 // RedisStoreMapper指定redis数据格式 57 return new RedisStoreBolt(pool, new RedisStoreMapper() { 58 59 @Override 60 public RedisDataTypeDescription getDataTypeDescription() { 61 // 保存结果的Redis的数据类型 62 return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "result"); 63 } 64 65 @Override 66 public String getValueFromTuple(ITuple tuple) { 67 return String.valueOf(tuple.getIntegerByField("total")); 68 } 69 70 @Override 71 public String getKeyFromTuple(ITuple tuple) { 72 return tuple.getStringByField("word"); 73 } 74 }); 75 } 76 77 }
- 运行程序,查看结果:可以看到result表中已经保存了word,total列的信息: