【大数据实时计算框架】Storm框架_开发第一个Wordcount实时计算程序


 一、Storm编程模型

  

  • Topology:Storm中运行的一个实时应用程序的名称。(拓扑)
  • Spout:在一个topology中获取源数据流的组件。
    •   通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。
  • Bolt:接受数据然后执行处理的组件,用户可以在其中执行自己想要的操作。
  • Tuple:一次消息传递的基本单元,理解为一组消息就是一个Tuple。
  • Stream:表示数据的流向。
  • StreamGroup:数据分组策略
    • Shuffle Grouping :随机分组,尽量均匀分布到下游Bolt中
    • Fields Grouping :按字段分组,按数据中field值进行分组;相同field值的Tuple被发送到相同的Task
    • All grouping:广播
    • Global grouping :全局分组,Tuple被分配到一个Bolt中的一个Task,实现事务性的Topology。
    • None grouping :不分组
    • Direct grouping :直接分组 指定分组

二、流式计算一般框架图

  • Flume用来获取数据。
  • Kafka用来临时保存数据。
  • Strom用来计算数据。
  • Redis是个内存数据库,用来保存数据。

0.使用Maven管理工程,pom.xml需要添加的依赖

pom.xml

<!-- apache storm core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.3</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-rename-hack</artifactId>
<version>1.0.3</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>1.0.3</version>
</dependency>

完整的pom.xml

  1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3     <modelVersion>4.0.0</modelVersion>
  4     <groupId>com.wulei</groupId>
  5     <artifactId>Bigdata</artifactId>
  6     <version>1.0.0</version>
  7 
  8 
  9     <properties>
 10         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 11         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 12         <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
 13         <hadoop.version>2.7.3</hadoop.version>
 14     </properties>
 15 
 16     <dependencies>
 17         <!-- Start-SQL connector -->
 18         <dependency>
 19             <groupId>mysql</groupId>
 20             <artifactId>mysql-connector-java</artifactId>
 21             <version>5.1.43</version>
 22         </dependency>
 23 
 24         <!-- Hadoop 2.7.3 -->
 25         <dependency>
 26             <groupId>org.apache.hadoop</groupId>
 27             <artifactId>hadoop-client</artifactId>
 28             <version>${hadoop.version}</version>
 29         </dependency>
 30         <dependency>
 31             <groupId>org.apache.hadoop</groupId>
 32             <artifactId>hadoop-common</artifactId>
 33             <version>${hadoop.version}</version>
 34         </dependency>
 35         <dependency>
 36             <groupId>org.apache.hadoop</groupId>
 37             <artifactId>hadoop-hdfs</artifactId>
 38             <version>${hadoop.version}</version>
 39         </dependency>
 40 
 41         <!-- HBase -->
 42         <dependency>
 43             <groupId>org.apache.hbase</groupId>
 44             <artifactId>hbase</artifactId>
 45             <version>1.3.1</version>
 46             <type>pom</type>
 47         </dependency>
 48 
 49         <dependency>
 50             <groupId>org.apache.hbase</groupId>
 51             <artifactId>hbase-client</artifactId>
 52             <version>1.3.1</version>
 53         </dependency>
 54 
 55 
 56         <dependency>
 57             <groupId>org.apache.mrunit</groupId>
 58             <artifactId>mrunit</artifactId>
 59             <version>1.1.0</version>
 60             <classifier>hadoop2</classifier>
 61             <scope>test</scope>
 62         </dependency>
 63 
 64         <dependency>
 65             <groupId>org.mockito</groupId>
 66             <artifactId>mockito-all</artifactId>
 67             <version>1.10.19</version>
 68             <scope>test</scope>
 69         </dependency>
 70 
 71         <dependency>
 72             <groupId>junit</groupId>
 73             <artifactId>junit</artifactId>
 74             <version>4.12</version>
 75             <scope>test</scope>
 76         </dependency>
 77 
 78         <!-- apache storm core -->
 79         <dependency>
 80             <groupId>org.apache.storm</groupId>
 81             <artifactId>storm-core</artifactId>
 82             <version>1.0.3</version>
 83             <scope>provided</scope>
 84         </dependency>
 85 
 86         <dependency>
 87             <groupId>org.apache.storm</groupId>
 88             <artifactId>storm-rename-hack</artifactId>
 89             <version>1.0.3</version>
 90         </dependency>
 91 
 92         <dependency>
 93             <groupId>org.apache.storm</groupId>
 94             <artifactId>storm-hbase</artifactId>
 95             <version>1.0.3</version>
 96             <scope>test</scope>
 97         </dependency>
 98 
 99         <dependency>
100             <groupId>org.apache.storm</groupId>
101             <artifactId>storm-redis</artifactId>
102             <version>1.0.3</version>
103         </dependency>
104 
105 
106 
107     </dependencies>
108     <build>
109         <plugins>
110             <plugin>
111                 <groupId>org.apache.maven.plugins</groupId>
112                 <artifactId>maven-compiler-plugin</artifactId>
113                 <configuration>
114                     <source>1.8</source>
115                     <target>1.8</target>
116                 </configuration>
117             </plugin>
118 
119             <plugin>
120                 <groupId>org.apache.maven.plugins</groupId>
121                 <artifactId>maven-shade-plugin</artifactId>
122                 <version>2.4.1</version>
123                 <configuration>
124                     <createDependencyReducedPom>false</createDependencyReducedPom>
125                 </configuration>
126                 <executions>
127                     <execution>
128                         <phase>package</phase>
129                         <goals>
130                             <goal>shade</goal>
131                         </goals>
132                         <configuration>
133                             <transformers>
134                                 <transformer
135                                     implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
136                                     <mainClass>com.bigdata.storm.WordCountTopology</mainClass>
137                                 </transformer>
138                             </transformers>
139                         </configuration>
140                     </execution>
141                 </executions>
142             </plugin>
143 
144         </plugins>
145     </build>
146 </project>
pom.xml

 

1.Spout组件:创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源

WordCountSpout.java

 1 package com.bigdata.storm;
 2 
 3 import java.util.Map;
 4 import java.util.Random;
 5 
 6 import org.apache.storm.spout.SpoutOutputCollector;
 7 import org.apache.storm.task.TopologyContext;
 8 import org.apache.storm.topology.OutputFieldsDeclarer;
 9 import org.apache.storm.topology.base.BaseRichSpout;
10 import org.apache.storm.tuple.Fields;
11 import org.apache.storm.tuple.Values;
12 import org.apache.storm.utils.Utils;
13 
14 //采集数据: Spout组件
15 public class WordCountSpout extends BaseRichSpout{
16     //模拟产生一些数据
17     private String[] data = {"I love Beijing","I love China","Beijing is the capital of China"};
18     
19     //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件
20     private SpoutOutputCollector collector;
21     
22     @Override
23     public void nextTuple() {
24         //每隔3秒 采集一次数据
25         Utils.sleep(3000);
26         
27         //由Storm的引擎调用,用于处理采集的每条数据
28         
29         //生成一个3以内的随机数
30         int random = (new Random()).nextInt(3);
31         String value = data[random];
32         
33         //打印
34         System.out.println("采集的数据是:" + value);
35         
36         //发送给下一个组件
37         this.collector.emit(new Values(value));
38     }
39 
40     @Override
41     public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
42         //collector:该Spout组件的收集器,用于把采集的数据发给下一个组件
43         //在open方法中对collector初始化
44         this.collector = collector;
45     }
46 
47     @Override
48     public void declareOutputFields(OutputFieldsDeclarer declare) {
49         // 申明发送给下一个组建的tuple的schema(结构)
50         declare.declare(new Fields("sentence"));
51     }
52 }
WordCountSpout

2.Bolt组件1:创建Bolt(WordCountSplitBolt)组件进行分词操作

WordCountSplitBolt.java

 1 package com.bigdata.storm;
 2 
 3 import java.util.Map;
 4 
 5 import org.apache.storm.task.OutputCollector;
 6 import org.apache.storm.task.TopologyContext;
 7 import org.apache.storm.topology.OutputFieldsDeclarer;
 8 import org.apache.storm.topology.base.BaseRichBolt;
 9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Tuple;
11 import org.apache.storm.tuple.Values;
12 
13 //第一个bolt组件:单词拆分
14 public class WordCountSplitBolt extends BaseRichBolt{
15 
16     //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
17     private OutputCollector collector;
18     
19     @Override
20     public void execute(Tuple tuple) {
21         //如何处理上一级发来的数据: I love Beijing
22         String value = tuple.getStringByField("sentence");
23         
24         //分词
25         String[] words = value.split(" ");
26         
27         //输出
28         for(String w:words){
29             collector.emit(new Values(w,1));
30         }
31     }
32 
33     @Override
34     public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
35         // 初始化
36         //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
37         this.collector = collector;
38     }
39 
40     @Override
41     public void declareOutputFields(OutputFieldsDeclarer declare) {
42         // 申明发送给下一个组建的tuple的schema(结构)
43         declare.declare(new Fields("word","count"));
44     }
45 }
WordCountSplitBolt

3.Bolt组件2:创建Bolt(WordCountBoltCount)组件进行单词统计操作

WordCountTotalBolt.java

package com.bigdata.storm;
import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

//第二个Bolt组件:单词的计数
public class WordCountTotalBolt extends BaseRichBolt{
    //使用Map集合存储结果
    private Map<String, Integer> result = new HashMap<>();
    
    //collector:该bolt组件的收集器,用于把处理的数据发给下一个bolt组件
    private OutputCollector collector;
    
    @Override
    public void execute(Tuple tuple) {
        //取出数据
        String word = tuple.getStringByField("word");
        int count = tuple.getIntegerByField("count");
        
        //求和
        if(result.containsKey(word)){
            //如果已经存在,累加
            int total = result.get(word);
            result.put(word, total+count);
        }else{
            //这是一个新单词
            result.put(word, count);
        }
        
        //输出到屏幕
        System.out.println("统计的结果是:" + result);
        
        //输出给下一个组件                                               单词           总频率
        this.collector.emit(new Values(word,result.get(word)));
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declare) {
        declare.declare(new Fields("word","total"));
    }
}
WordCountTotalBolt

4.Topology主程序:(WordCountTopology

WordCountTopology.java

package com.bigdata.storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        
        //指定任务的spout组件
        builder.setSpout("mywordcountspout", new WordCountSpout());
        
        //指定任务的第一个bolt组件
        builder.setBolt("mywordcountsplit", new WordCountSplitBolt())
               .shuffleGrouping("mywordcountspout");//随机分组
        
        //指定任务的第二个bolt组件
        builder.setBolt("mywordcounttotal", new WordCountTotalBolt())
               .fieldsGrouping("mywordcountsplit", new Fields("word"));
         
        //创建任务
        StormTopology job = builder.createTopology();
        
        Config conf = new Config();
        
        //任务有两种运行方式:1、本地模式   2、集群模式
        //1、本地模式
        LocalCluster localcluster = new LocalCluster();
        localcluster.submitTopology("MyWordCount", conf, job);
        
        //2、集群模式:用于打包jar,并放到storm运行
//        StormSubmitter.submitTopology(args[0], conf, job);
    }
    

}
WordCountTopology

5.本地运行主程序:Run java application的结果

 

6.storm集群运行程序jar

集群运行时,需要将WordCountTopology.java的代码的本地cluster运行注释掉,通过这一行运行集群:StormSubmitter.submitTopology(args[0], conf, job);

然后打包工程为jar包,至少以上包含4个java文件。并且主程序设置为WordCountTopology.java。将打包的jar发送到storm环境的任意Linux目录下:

  • mystorm.jar包内容:

  • 上传到storm环境Linux

[root@bigdata111 mapreducejar]# pwd
/root/mapreducejar
[root@bigdata111 mapreducejar]# ls mystorm.jar
mystorm.jar

  • 运行jar程序

提交任务命令格式:storm jar 【jar包路径】 【包名.topology类名】 【别名】

[root@bigdata111 mapreducejar]# storm jar mystorm.jar com.bigdata.storm.WordCountTopology MyWordCountAlia

 

  • 查看WEB的任务运行

 

 三、storm集群任务提交过程

四、storm内部通信机制

五、storm集成

storm提供了对多个产品的集成,常见的有Redis,HDFS,HBase,JDBC,JMS,Hive,Kafka等。并且提供了集成的API。集成API的位置:

[root@bigdata111 external]# pwd
/root/training/apache-storm-1.0.3/external
[root@bigdata111 external]# ls
flux storm-eventhubs storm-jdbc storm-mongodb
sql storm-hbase storm-jms storm-mqtt
storm-cassandra storm-hdfs storm-kafka storm-redis
storm-elasticsearch storm-hive storm-kafka-client storm-solr

1.与Redis集成

  • 开启redis:bin/redis-server conf/redis.conf

  • 重写Topology主程序(与redis集成时,本地模式运行即可)
 1 package demo;
 2 
 3 import org.apache.storm.Config;
 4 import org.apache.storm.LocalCluster;
 5 import org.apache.storm.StormSubmitter;
 6 import org.apache.storm.generated.AlreadyAliveException;
 7 import org.apache.storm.generated.AuthorizationException;
 8 import org.apache.storm.generated.InvalidTopologyException;
 9 import org.apache.storm.generated.StormTopology;
10 import org.apache.storm.redis.bolt.RedisStoreBolt;
11 import org.apache.storm.redis.common.config.JedisPoolConfig;
12 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
13 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
14 import org.apache.storm.topology.IRichBolt;
15 import org.apache.storm.topology.TopologyBuilder;
16 import org.apache.storm.tuple.Fields;
17 import org.apache.storm.tuple.ITuple;
18 
19 //主程序
20 public class WordCountTopology {
21 
22     public static void main(String[] args) throws Exception{
23         //创建任务构建器
24         TopologyBuilder builder = new TopologyBuilder();
25         
26         //指定任务的spout组件
27         builder.setSpout("mywordcountspout", new WordCountSpout());
28         
29         //指定任务的第一个bolt组件
30         builder.setBolt("mywordcountsplit", new WordCountSplitBolt())
31                .shuffleGrouping("mywordcountspout");//随机分组
32         
33         //指定任务的第二个bolt组件
34         builder.setBolt("mywordcounttotal", new WordCountTotalBolt())
35                .fieldsGrouping("mywordcountsplit", new Fields("word"));
36         
37         //指定任务的第三个bolt组件,将结果写入Redis
38         builder.setBolt("mywordcountredisbolt", createRedisBolt())
39                .shuffleGrouping("mywordcounttotal");
40         
41 
42         
43         //创建任务
44         StormTopology job = builder.createTopology();
45         
46         Config conf = new Config();
47         
48         //任务有两种运行方式:1、本地模式   2、集群模式
49         //1、本地模式
50         LocalCluster localcluster = new LocalCluster();
51         localcluster.submitTopology("MyWordCount", conf, job);
52         
53         //2、集群模式
54 //        StormSubmitter.submitTopology(args[0], conf, job);
55     }
56 
57     private static IRichBolt createRedisBolt() {
58         // 创建一个RedisBolt
59         JedisPoolConfig.Builder builder = new JedisPoolConfig.Builder();
60         builder.setHost("10.30.30.146");
61         builder.setPort(6379);
62         JedisPoolConfig pool = builder.build();
63         
64         //RedisStoreMapper指定redis数据格式
65         return new RedisStoreBolt(pool, new RedisStoreMapper() {
66             
67             @Override
68             public RedisDataTypeDescription getDataTypeDescription() {
69                 //保存结果的Redis的数据类型
70                 return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, 
71                                                    "result");
72             }
73             
74             @Override
75             public String getValueFromTuple(ITuple tuple) {
76                 return String.valueOf(tuple.getIntegerByField("total"));
77             }
78             
79             @Override
80             public String getKeyFromTuple(ITuple tuple) {
81                 return tuple.getStringByField("word");
82             }
83         });
84     }
85 }
WordCountTopology
  • 运行程序前,连接redis-cli,查看此时的result是否有数据:hgetall result 为空

  • 运行程序后,再次查看:可以看到本地运行时,redis数据库已经存储了我们的数据,并且是一直在增长。

 

 2.与HBase集成

  • 启动HBase(确保zk已经启动:QuorumPeerMain进):start-habse.sh

  • 启动Hadoop:start-all.sh

查看进程,确保如下进程启动完毕:Hadoop,hbase,zookeeper相关进程

 

 

  • 登录HBase创建表:create ‘result’,'info'

 

  • 新建java类,处理HBase Bolt任务:

WordCountHBaseBolt.java

 1 package com.bigdata.storm;
 2 
 3 import java.io.IOException;
 4 import java.util.Map;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.hbase.client.HTable;
 8 import org.apache.hadoop.hbase.client.Put;
 9 import org.apache.hadoop.hbase.util.Bytes;
10 import org.apache.storm.task.OutputCollector;
11 import org.apache.storm.task.TopologyContext;
12 import org.apache.storm.topology.OutputFieldsDeclarer;
13 import org.apache.storm.topology.base.BaseRichBolt;
14 import org.apache.storm.tuple.Tuple;
15 
16 //将结果写入HBase
17 public class WordCountHBaseBolt extends BaseRichBolt {
18 
19     //定义HBase的表的客户端
20     private HTable table;
21     
22     @Override
23     public void execute(Tuple tuple) {
24         //取出数据
25         String word = tuple.getStringByField("word");
26         int total = tuple.getIntegerByField("total");
27         
28         //构造一个Put对象
29         Put put = new Put(Bytes.toBytes(word));
30         put.add(Bytes.toBytes("info"), Bytes.toBytes("word"), Bytes.toBytes(word));
31         put.add(Bytes.toBytes("info"), Bytes.toBytes("total"), Bytes.toBytes(String.valueOf(total)));
32         
33         //插入数据
34         try {
35             table.put(put);
36         } catch (IOException e) {
37             // TODO Auto-generated catch block
38             e.printStackTrace();
39         }
40     }
41 
42     @Override
43     public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
44         //创建HBase的客户端
45         //指定Zk
46         Configuration conf =  new Configuration();
47         conf.set("hbase.zookeeper.quorum", "10.30.30.146");
48         
49         try {
50             table = new HTable(conf, "result");
51         } catch (IOException e) {
52             // TODO Auto-generated catch block
53             e.printStackTrace();
54         }
55     }
56 
57     @Override
58     public void declareOutputFields(OutputFieldsDeclarer arg0) {
59         // TODO Auto-generated method stub
60 
61     }
62 
63 }
WordCountHBaseBolt
  • 重写topology主类

WordCountTopology.java

 1 package com.bigdata.storm;
 2 
 3 import org.apache.storm.Config;
 4 import org.apache.storm.LocalCluster;
 5 import org.apache.storm.StormSubmitter;
 6 import org.apache.storm.generated.StormTopology;
 7 import org.apache.storm.redis.bolt.RedisStoreBolt;
 8 import org.apache.storm.redis.common.config.JedisPoolConfig;
 9 import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
10 import org.apache.storm.redis.common.mapper.RedisStoreMapper;
11 import org.apache.storm.topology.IRichBolt;
12 import org.apache.storm.topology.TopologyBuilder;
13 import org.apache.storm.tuple.Fields;
14 import org.apache.storm.tuple.ITuple;
15 
16 public class WordCountTopology {
17     public static void main(String[] args) throws Exception {
18         TopologyBuilder builder = new TopologyBuilder();
19 
20         // 指定任务的spout组件
21         builder.setSpout("mywordcountspout", new WordCountSpout());
22 
23         // 指定任务的第一个bolt组件
24         builder.setBolt("mywordcountsplit", new WordCountSplitBolt()).shuffleGrouping("mywordcountspout");// 随机分组
25 
26         // 指定任务的第二个bolt组件
27         builder.setBolt("mywordcounttotal", new WordCountTotalBolt()).fieldsGrouping("mywordcountsplit",
28                 new Fields("word"));
29 //        // 指定任务的第三个bolt组件,将结果写入Redis
30 //        builder.setBolt("mywordcountredisbolt", createRedisBolt()).shuffleGrouping("mywordcounttotal");
31 
32         // 指定任务的第三个bolt组件,将结果写入HBase
33         builder.setBolt("mywordcountredisbolt", new WordCountHBaseBolt()).shuffleGrouping("mywordcounttotal");
34 
35         // 创建任务
36         StormTopology job = builder.createTopology();
37 
38         Config conf = new Config();
39 
40         // 任务有两种运行方式:1、本地模式 2、集群模式
41         // 1、本地模式
42         LocalCluster localcluster = new LocalCluster();
43         localcluster.submitTopology("MyWordCount", conf, job);
44 
45         // 2、集群模式:用于打包jar,并放到storm运行
46         // StormSubmitter.submitTopology(args[0], conf, job);
47     }
48 
49     private static IRichBolt createRedisBolt() {
50         // 创建一个RedisBolt
51         JedisPoolConfig.Builder builder = new JedisPoolConfig.Builder();
52         builder.setHost("10.30.30.146");
53         builder.setPort(6379);
54         JedisPoolConfig pool = builder.build();
55 
56         // RedisStoreMapper指定redis数据格式
57         return new RedisStoreBolt(pool, new RedisStoreMapper() {
58 
59             @Override
60             public RedisDataTypeDescription getDataTypeDescription() {
61                 // 保存结果的Redis的数据类型
62                 return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, "result");
63             }
64 
65             @Override
66             public String getValueFromTuple(ITuple tuple) {
67                 return String.valueOf(tuple.getIntegerByField("total"));
68             }
69 
70             @Override
71             public String getKeyFromTuple(ITuple tuple) {
72                 return tuple.getStringByField("word");
73             }
74         });
75     }
76 
77 }
WordCountTopology
  •  运行程序,查看结果:可以看到result表中已经保存了word,total列的信息:

 

 3.与JDBC集成:to be continued

 4.与HDFS集成:to be continued

 5.与Apache Kafka集成:to be continued

 6.与Hive集成:to be continued

 7.与JMS集成:to be continued


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM