Flume+Kafka+Strom基於偽分布式環境的結合使用


目錄:
  一、Flume、Kafka、Storm是什么,如何安裝?
  二、Flume、Kafka、Storm如何結合使用?
    1) 原理是什么?
    2) Flume和Kafka的整合 
    3) Kafka和Storm的整合 
    4) Flume、Kafka、Storm的整合 
 
  一、Flume、Kafka、Storm是什么,如何安裝?
  Flume的介紹,請參考這篇文章《 Flume1.5.0的安裝、部署、簡單應用
  Kafka的介紹,請參考這篇文章《 kafka2.9.2的分布式集群安裝和demo(java api)測試
  Storm的介紹,請參考這篇文章《 ubuntu12.04+storm0.9.2分布式集群的搭建
   在后面的例子中,我們也是使用以上三篇文章中的配置進行測試。
 
  二、Flume、Kafka、Storm如何結合使用?
    1) 原理是什么?
  如何你仔細閱讀過關於Flume、Kafka、Storm的介紹,就會知道,在他們各自之間對外交互發送消息的原理。
   在后面的例子中,我們主要對Flume的sink進行重構,調用kafka的消費生產者(producer)發送消息;在Sotrm的spout中繼承IRichSpout接口,調用kafka的消息消費者(Consumer)來接收消息,然后經過幾個自定義的Bolt,將自定義的內容進行輸出。
 
    2) flume和kafka的整合
     #復制flume要用到的kafka相關jar到flume目錄下的lib里面。
?
1
2
3
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/flume-1.5.0-bin/lib
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/flume-1.5.0-bin/lib
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/flume-1.5.0-bin/lib
     #編寫sink.java文件,然后在eclipse導出jar包,放到flume-1.5.1-bin/lib目錄中,項目中要引用flume-ng-configuration-1.5.0.jar,flume-ng-sdk-1.5.0.jar,flume-ng-core-1.5.0.jar,zkclient-0.3.jar,commons-logging-1.1.1.jar,在flume目錄中,可以找到這幾個jar文件,如果找不到就用find命令搜一下。
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package idoall.cloud.flume.sink;
 
import java.util.Properties;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
 
 
public class KafkaSink extends AbstractSink implements Configurable {
     private static final Log logger = LogFactory.getLog(KafkaSink. class );
 
     private String topic;
     private Producer<String, String> producer;
 
     public void configure(Context context) {
         topic = "idoall_testTopic" ;
         Properties props = new Properties();
         props.setProperty( "metadata.broker.list" , "m1:9092,m2:9092,s1:9092,s2:9092" );
         props.setProperty( "serializer.class" , "kafka.serializer.StringEncoder" );
         props.put( "partitioner.class" , "idoall.cloud.kafka.Partitionertest" );
         props.put( "zookeeper.connect" , "m1:2181,m2:2181,s1:2181,s2:2181/kafka" );
         props.setProperty( "num.partitions" , "4" ); //
         props.put( "request.required.acks" , "1" );
         ProducerConfig config = new ProducerConfig(props);
         producer = new Producer<String, String>(config);
         logger.info( "KafkaSink初始化完成." );
 
     }
 
     public Status process() throws EventDeliveryException {
         Channel channel = getChannel();
         Transaction tx = channel.getTransaction();
         try {
             tx.begin();
             Event e = channel.take();
             if (e == null ) {
                 tx.rollback();
                 return Status.BACKOFF;
             }
             KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody()));
             producer.send(data);
             logger.info( "flume向kafka發送消息:" + new String(e.getBody()));
             tx.commit();
             return Status.READY;
         } catch (Exception e) {
             logger.error( "Flume KafkaSinkException:" , e);
             tx.rollback();
             return Status.BACKOFF;
         } finally {
             tx.close();
         }
     }
}
     #在m1上配置flume和kafka交互的agent
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
root@m1: /home/hadoop/flume-1 .5.0-bin # vi /home/hadoop/flume-1.5.0-bin/conf/kafka.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1. type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
 
# Describe the sink
a1.sinks.k1. type = idoall.cloud.flume.sink.KafkaSink
 
# Use a channel which buffers events in memory
a1.channels.c1. type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
     #在m1,m2,s1,s2的機器上,分別啟動kafka(如果不會請參考這篇文章介紹了kafka的安裝、配置和啟動《 kafka2.9.2的分布式集群安裝和demo(java api)測試》),然后在s1機器上再啟動一個消息消費者consumer
?
1
root@m1: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /home/hadoop/kafka_2.9.2-0.8.1.1/config/server.properties &
     #在m1啟動flume
?
1
2
3
4
5
6
7
8
9
10
11
root@m1: /home/hadoop # /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console
#下面只截取部分日志信息
14 /08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.
14 /08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
14 /08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source :org.apache.flume. source .SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
14 /08/19 11:36:34 INFO node.Application: Starting Channel c1
14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type : CHANNEL, name: c1: Successfully registered new MBean.
14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type : CHANNEL, name: c1 started
14 /08/19 11:36:34 INFO node.Application: Starting Sink k1
14 /08/19 11:36:34 INFO node.Application: Starting Source r1
14 /08/19 11:36:34 INFO source .SyslogTcpSource: Syslog TCP Source starting...
     #在m1上再打開一個窗口,測試向flume中發送syslog
?
1
root@m1: /home/hadoop # echo "hello idoall.org syslog" | nc localhost 5140
     #m1打開的flume窗口中看最后一行的信息,Flume已經向kafka發送了消息
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
14 /08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.
14 /08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
14 /08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source :org.apache.flume. source .SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
14 /08/19 11:36:34 INFO node.Application: Starting Channel c1
14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type : CHANNEL, name: c1: Successfully registered new MBean.
14 /08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type : CHANNEL, name: c1 started
14 /08/19 11:36:34 INFO node.Application: Starting Sink k1
14 /08/19 11:36:34 INFO node.Application: Starting Source r1
14 /08/19 11:36:34 INFO source .SyslogTcpSource: Syslog TCP Source starting...
14 /08/19 11:38:05 WARN source .SyslogUtils: Event created from Invalid Syslog data.
14 /08/19 11:38:05 INFO client.ClientUtils$: Fetching metadata from broker id :3,host:s2,port:9092 with correlation id 0 for 1 topic(s) Set(idoall_testTopic)
14 /08/19 11:38:05 INFO producer.SyncProducer: Connected to s2:9092 for producing
14 /08/19 11:38:05 INFO producer.SyncProducer: Disconnecting from s2:9092
14 /08/19 11:38:05 INFO producer.SyncProducer: Connected to m1:9092 for producing
14 /08/19 11:38:05 INFO sink.KafkaSink: flume向kafka發送消息:hello idoall.org syslog
     #在剛才s1機器上打開的kafka消費端,同樣可以看到從Flume中發出的信息, 說明flume和kafka已經調試成功了。
?
1
2
3
4
5
6
7
8
9
10
11
root@s1: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic flume-kafka-storm-001 --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" .
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http: //www .slf4j.org /codes .html #StaticLoggerBinder for further details.
[2014-08-11 14:22:12,165] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [flume-kafka-storm-001,1] (kafka.server.ReplicaFetcherManager)
[2014-08-11 14:22:12,218] WARN [KafkaApi-3] Produce request with correlation id 2 from client  on partition [flume-kafka-storm-001,1] failed due to Topic flume-kafka-storm-001 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis)
[2014-08-11 14:22:12,223] INFO Completed load of log flume-kafka-storm-001-1 with log end offset 0 (kafka.log.Log)
[2014-08-11 14:22:12,250] INFO Created log for partition [flume-kafka-storm-001,1] in /home/hadoop/kafka_2 .9.2-0.8.1.1 /kafka-logs with properties {segment.index.bytes -> 10485760, file .delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
[2014-08-11 14:22:12,267] WARN Partition [flume-kafka-storm-001,1] on broker 3: No checkpointed highwatermark is found for partition [flume-kafka-storm-001,1] (kafka.cluster.Partition)
[2014-08-11 14:22:12,375] INFO Closing socket connection to /192 .168.1.50. (kafka.network.Processor)
hello idoall.org syslog
    3) kafka和storm的整合 
     #我們先在eclipse中寫代碼,在寫代碼之前,我們要先對maven進行配置,pom.xml配置文件內容如下:
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<? xml version = "1.0" encoding = "utf-8" ?>
< project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
   < modelVersion >4.0.0</ modelVersion
   < groupId >idoall.cloud</ groupId
   < artifactId >idoall.cloud</ artifactId
   < version >0.0.1-SNAPSHOT</ version
   < packaging >jar</ packaging
   < name >idoall.cloud</ name
   < url >http://maven.apache.org</ url
   < properties >
     < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding >
   </ properties
   < repositories >
     < repository >
       < id >github-releases</ id
       < url >http://oss.sonatype.org/content/repositories/github-releases/</ url >
     </ repository
     < repository >
       < id >clojars.org</ id
       < url >http://clojars.org/repo</ url >
     </ repository >
   </ repositories
   < dependencies >
     < dependency >
       < groupId >junit</ groupId
       < artifactId >junit</ artifactId
       < version >4.11</ version
       < scope >test</ scope >
     </ dependency
     < dependency >
       < groupId >com.sksamuel.kafka</ groupId
       < artifactId >kafka_2.10</ artifactId
       < version >0.8.0-beta1</ version >
     </ dependency
     < dependency >
       < groupId >log4j</ groupId
       < artifactId >log4j</ artifactId
       < version >1.2.14</ version >
     </ dependency
     < dependency >
       < groupId >storm</ groupId
       < artifactId >storm</ artifactId
       < version >0.9.0.1</ version
       <!-- keep storm out of the jar-with-dependencies --> 
       < scope >provided</ scope >
     </ dependency
     < dependency >
       < groupId >commons-collections</ groupId
       < artifactId >commons-collections</ artifactId
       < version >3.2.1</ version >
     </ dependency >
   </ dependencies >
</ project >
     #編寫KafkaSpouttest.java文件
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package idoall.cloud.storm;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
 
public class KafkaSpouttest implements IRichSpout {
     
     private SpoutOutputCollector collector;
     private ConsumerConnector consumer;
     private String topic;
 
     public KafkaSpouttest() {
     }
     
     public KafkaSpouttest(String topic) {
         this .topic = topic;
     }
 
     public void nextTuple() {
     }
 
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         this .collector = collector;
     }
 
     public void ack(Object msgId) {
     }
 
     public void activate() {
         
<span style= "font-size: 9pt; line-height: 25.2000007629395px;" >     </span>consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
         
<span style= "font-size: 9pt; line-height: 25.2000007629395px;" >     </span>Map<String,Integer> topickMap = new HashMap<String, Integer>(); 
         topickMap.put(topic, 1 ); 
 
         System.out.println( "*********Results********topic:" +topic); 
 
         Map<String, List<KafkaStream< byte [], byte []>>>  streamMap=consumer.createMessageStreams(topickMap); 
         KafkaStream< byte [], byte []>stream = streamMap.get(topic).get( 0 ); 
         ConsumerIterator< byte [], byte []> it =stream.iterator();  
         while (it.hasNext()){ 
              String value = new String(it.next().message());
              SimpleDateFormat formatter = new SimpleDateFormat   ( "yyyy年MM月dd日 HH:mm:ss SSS" ); 
              Date curDate = new Date(System.currentTimeMillis()); //獲取當前時間      
              String str = formatter.format(curDate);  
                
              System.out.println( "storm接收到來自kafka的消息------->" + value);
 
              collector.emit( new Values(value, 1 ,str), value);
        
     }
     
     private static ConsumerConfig createConsumerConfig() { 
         Properties props = new Properties(); 
         // 設置zookeeper的鏈接地址
         props.put( "zookeeper.connect" , "m1:2181,m2:2181,s1:2181,s2:2181" ); 
         // 設置group id
         props.put( "group.id" , "1" ); 
         // kafka的group 消費記錄是保存在zookeeper上的, 但這個信息在zookeeper上不是實時更新的, 需要有個間隔時間更新
         props.put( "auto.commit.interval.ms" , "1000" );
         props.put( "zookeeper.session.timeout.ms" , "10000" ); 
         return new ConsumerConfig(props); 
    
 
     public void close() {
     }
 
     public void deactivate() {
     }
 
     public void fail(Object msgId) {
     }
 
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare( new Fields( "word" , "id" , "time" ));
     }
 
     public Map<String, Object> getComponentConfiguration() {
         System.out.println( "getComponentConfiguration被調用" );
         topic= "idoall_testTopic" ;
         return null ;
     }
}
     #編寫KafkaTopologytest.java文件
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package idoall.cloud.storm;
 
import java.util.HashMap;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
 
public class KafkaTopologytest {
 
     public static void main(String[] args) {
         TopologyBuilder builder = new TopologyBuilder();
 
         builder.setSpout( "spout" , new KafkaSpouttest( "" ), 1 );
         builder.setBolt( "bolt1" , new Bolt1(), 2 ).shuffleGrouping( "spout" );
         builder.setBolt( "bolt2" , new Bolt2(), 2 ).fieldsGrouping( "bolt1" , new Fields( "word" ));
 
         Map conf = new HashMap();
         conf.put(Config.TOPOLOGY_WORKERS, 1 );
         conf.put(Config.TOPOLOGY_DEBUG, true );
 
         LocalCluster cluster = new LocalCluster();
         cluster.submitTopology( "my-flume-kafka-storm-topology-integration" , conf, builder.createTopology());
         
         Utils.sleep( 1000 * 60 * 5 ); // local cluster test ...
         cluster.shutdown();
     }
     
     public static class Bolt1 extends BaseBasicBolt {
         
         public void execute(Tuple input, BasicOutputCollector collector) {
             try {
                 String msg = input.getString( 0 );
                 int id = input.getInteger( 1 );
                 String time = input.getString( 2 );
                 msg = msg+ "bolt1" ;
                 System.out.println( "對消息加工第1次-------[arg0]:" + msg + "---[arg1]:" +id+ "---[arg2]:" +time+ "------->" +msg);
                 if (msg != null ) {
                     collector.emit( new Values(msg));
                 }
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }
  
        
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
             declarer.declare( new Fields( "word" ));
         }
     }
     
     public static class Bolt2 extends BaseBasicBolt {
         Map<String, Integer> counts = new HashMap<String, Integer>();
  
        
         public void execute(Tuple tuple, BasicOutputCollector collector) {
             String msg = tuple.getString( 0 );
             msg = msg + "bolt2" ;
             System.out.println( "對消息加工第2次---------->" +msg);
             collector.emit( new Values(msg, 1 ));
         }
  
       
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
             declarer.declare( new Fields( "word" , "count" ));
         }
     }
}
     #測試kafka和storm的結合
  打開兩個窗口(也可以在兩台機器上分別打開,下面的例子中,我會打開m2和s1機器 ),分別m2上運行kafka的producer,在s1上運行kafka的consumer(如果剛才打開了就不用再打開),先測試kafka自運行是否正常。
  如下所示,我在m2上運行producer,輸入“hello welcome idoall.org”,在s1的機器上consumer同樣收到了消息。說明kafka已經運行正常,並且消息通訊也沒有問題。
 
  m2機器輸出的消息:
?
1
2
3
4
5
root@m2: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-producer.sh --broker-st m1:9092 --sync --topic idoall_testTopic
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" .
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http: //www .slf4j.org /codes .html #StaticLoggerBinder for further details.
hello welcome idoall.org
  s1機器接收的消息:
?
1
2
3
4
5
root@s1: /home/hadoop # /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic idoall_testTopic --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" .
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http: //www .slf4j.org /codes .html #StaticLoggerBinder for further details.
hello welcome idoall.org
     #我們再在Eclipse中運行KafkaTopologytest.java,可以看到在控制台,同樣收到了剛才在m2上kafka發送的消息。說明kafka和storm也打通了。
?
1
2
3
4
5
6
7
8
9
10
11
#信息太多,我只截取重要部分:
*********Results********topic:idoall_testTopic
storm接收到來自kafka的消息------->hello welcome idoall.org
5268 [Thread-24-spout] INFO backtype.storm.daemon.task - Emitting: spout default [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051]
對消息加工第1次-------[arg0]:hello welcome idoall.orgbolt1---[arg1]:1---[arg2]:2014年08月19日 11:21:15 051------->hello welcome idoall.orgbolt1
5269 [Thread-18-bolt1] INFO backtype.storm.daemon.executor - Processing received message source : spout:6, stream: default, id : {-2000523200413433507=6673316475127546409}, [hello welcome idoall.org, 1, 2014年08月19日 11:21:15 051]
5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 default [hello welcome idoall.orgbolt1]
5269 [Thread-18-bolt1] INFO backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2000523200413433507 4983764025617316501]
5269 [Thread-20-bolt2] INFO backtype.storm.daemon.executor - Processing received message source : bolt1:3, stream: default, id : {-2000523200413433507=1852530874180384956}, [hello welcome idoall.orgbolt1]
對消息加工第2次---------->hello welcome idoall.orgbolt1bolt2
5270 [Thread-20-bolt2] INFO backtype.storm.daemon.task - Emitting: bolt2 default [hello welcome idoall.orgbolt1bolt2, 1]
    3) flume、kafka、storm的整合 
  從上面兩個例子我們可以看到,flume和kafka之前已經完成了通訊和部署,kafka和storm之間可以正常通訊,只差把storm的相關文件打包成jar部署到storm中即可完成三者的通訊。
  Storm的安裝、配置、部署,如果不了解,可以參考這篇文章《 ubuntu12.04+storm0.9.2分布式集群的搭建
 
     #復制kafka相關的jar包到storm的lib里面。(因為在上面我們已經說過,kafka和storm的整合,主要是重寫storm的spout,調用kafka的Consumer來接收消息並打印,所在需要用到這些jar包)
?
1
2
3
4
5
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1: /home/hadoop # cp /home/hadoop/zookeeper-3.4.5/dist-maven/zookeeper-3.4.5.jar /home/hadoop/storm-0.9.2-incubating/lib
root@m1: /home/hadoop # cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/zkclient-0.3.jar /home/hadoop/storm-0.9.2-incubating/lib
     #在m1上啟動storm nimbus
?
1
root@m1: /home/hadoop # /home/hadoop/storm-0.9.2-incubating/bin/storm nimbus &
     #在s1,s2上啟動storm supervisor
?
1
root@s1: /home/hadoop # /home/hadoop/storm-0.9.2-incubating/bin/storm supervisor &
     #在m1上啟動storm ui
?
1
root@m1: /home/hadoop # /home/hadoop/storm-0.9.2-incubating/bin/storm ui &
     #將Eclipse中的文件打包成jar復制到做任意目錄,然后用storm來運行
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
root@m1: /home/hadoop/storm-0 .9.2-incubating # ll
總用量 25768
drwxr-xr-x 11 root   root       4096 Aug 19 11:53 ./
drwxr-xr-x 46 hadoop hadoop     4096 Aug 17 15:06 ../
drwxr-xr-x  2 root   root       4096 Aug  1 14:38 bin/
-rw-r--r--  1    502 staff     34239 Jun 13 08:46 CHANGELOG.md
drwxr-xr-x  2 root   root       4096 Aug  2 12:31 conf/
-rw-r--r--  1    502 staff       538 Mar 13 11:17 DISCLAIMER
drwxr-xr-x  3    502 staff      4096 May  6 03:13 examples/
drwxr-xr-x  3 root   root       4096 Aug  1 14:38 external/
-rw-r--r--  1 root   root   26252342 Aug 19 11:36 idoall.cloud.jar
drwxr-xr-x  3 root   root       4096 Aug  2 12:51 ldir/
drwxr-xr-x  2 root   root       4096 Aug 19 11:53 lib/
-rw-r--r--  1    502 staff     22822 Jun 12 04:07 LICENSE
drwxr-xr-x  2 root   root       4096 Aug  1 14:38 logback/
drwxr-xr-x  2 root   root       4096 Aug  1 15:07 logs/
-rw-r--r--  1    502 staff       981 Jun 11 01:10 NOTICE
drwxr-xr-x  5 root   root       4096 Aug  1 14:38 public/
-rw-r--r--  1    502 staff      7445 Jun 10 02:24 README.markdown
-rw-r--r--  1    502 staff        17 Jun 17 00:22 RELEASE
-rw-r--r--  1    502 staff      3581 May 30 00:20 SECURITY.md
root@m1: /home/hadoop/storm-0 .9.2-incubating # /home/hadoop/storm-0.9.2-incubating/bin/storm jar idoall.cloud.jar idoall.cloud.storm.KafkaTopologytest
     #在flume中發消息,在storm中看是否有接收到
 
   在flume中發送的消息:
?
1
2
root@m1: /home/hadoop # echo "flume->kafka->storm message" | nc localhost 5140                      
root@m1: /home/hadoop #
   storm中顯示的內容:
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#內容太多,只截取重要部分
storm接收到來自kafka的消息------->flume->kafka->storm message
174218 [Thread-16-spout] INFO  backtype.storm.daemon.task - Emitting: spout default [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360]
174220 [Thread-10-bolt1] INFO  backtype.storm.daemon.executor - Processing received message source : spout:6, stream: default, id : {-2345821945306343027=-7738131487327750388}, [flume->kafka->storm message, 1, 2014年08月19日 12:06:39 360]
對消息加工第1次-------[arg0]:flume->kafka->storm messagebolt1---[arg1]:1---[arg2]:2014年08月19日 12:06:39 360------->flume->kafka->storm messagebolt1
174221 [Thread-10-bolt1] INFO  backtype.storm.daemon.task - Emitting: bolt1 default [flume->kafka->storm messagebolt1]
174221 [Thread-10-bolt1] INFO  backtype.storm.daemon.task - Emitting: bolt1 __ack_ack [-2345821945306343027 -2191137958679040397]
174222 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received message source : bolt1:3, stream: __ack_ack, id : {}, [-2345821945306343027 -2191137958679040397]
174222 [Thread-12-bolt2] INFO  backtype.storm.daemon.executor - Processing received message source : bolt1:3, stream: default, id : {-2345821945306343027=8433871885621516671}, [flume->kafka->storm messagebolt1]
對消息加工第2次---------->flume->kafka->storm messagebolt1bolt2
174223 [Thread-12-bolt2] INFO  backtype.storm.daemon.task - Emitting: bolt2 default [flume->kafka->storm messagebolt1bolt2, 1]
174223 [Thread-12-bolt2] INFO  backtype.storm.daemon.task - Emitting: bolt2 __ack_ack [-2345821945306343027 8433871885621516671]
174224 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received message source : bolt2:4, stream: __ack_ack, id : {}, [-2345821945306343027 8433871885621516671]
174228 [Thread-16-spout] INFO  backtype.storm.daemon.task - Emitting: spout __ack_init [-2345821945306343027 -7738131487327750388 6]
174228 [Thread-20-__acker] INFO  backtype.storm.daemon.executor - Processing received message source : spout:6, stream: __ack_init, id : {}, [-2345821945306343027 -7738131487327750388 6]
174228 [Thread-20-__acker] INFO  backtype.storm.daemon.task - Emitting direct: 6; __acker __ack_ack [-2345821945306343027]
    通過以上實例,我們完成了flume、kafka、storm之間的通訊,結合之前介紹的《Flume1.5.0的安裝、部署、簡單應用(含分布式、與hadoop2.2.0、hbase0.96的案例)》和《Golang、Php、Python、Java基於Thrift0.9.1實現跨語言調用》.如果相互結合,相信在基於大數據實時計算,以及多語言之間的相互調用,能夠解決你在項目中的大部分問題。希望最近一系列的文章能夠對你有幫助。
 
---------------------------------------
博文作者:迦壹
轉載聲明:可以轉載, 但必須以超鏈接形式標明文章原始出處和作者信息及版權聲明,謝謝合作!
---------------------------------------


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM