5 kafka整合storm


 本博文的主要內容有

   .kafka整合storm

   .storm-kafka工程

   .storm + kafka的具體應用場景有哪些?

 

 

 

 

 

 

   要想kafka整合storm,則必須要把這個storm-kafka-0.9.2-incubating.jar,放到工程里去。

無非,就是storm要去拿kafka里的東西,

 

 

 

 

storm-kafka工程

我們自己,在storm-kafka工程里,寫,

KafkaTopo.java、 WordSpliter.java、WriterBolt.java、

 

 

 

這里,把話題wordcount改為,sufei,即可。

 

 

 

 

KafkaTopo.java

 

package cn.itcast.storm.topology;

 

import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.ZkHosts;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

import cn.itcast.storm.bolt.WordSpliter;

import cn.itcast.storm.bolt.WriterBolt;

import cn.itcast.storm.spout.MessageScheme;

 

public class KafkaTopo {

 

         public static void main(String[] args) throws Exception {

                  

                   String topic = "wordcount";

                   String zkRoot = "/kafka-storm";

                   String spoutId = "KafkaSpout";

                   BrokerHosts brokerHosts = new ZkHosts("weekend01:2181,weekend02:2181,weekend03:2181");

                   SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "wordcount", zkRoot, spoutId);

                   spoutConfig.forceFromStart = true;

                   spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

                   TopologyBuilder builder = new TopologyBuilder();

                   //設置一個spout用來從kaflka消息隊列中讀取數據並發送給下一級的bolt組件,此處用的spout組件並非自定義的,而是storm中已經開發好的KafkaSpout

                   builder.setSpout("KafkaSpout", new KafkaSpout(spoutConfig));

                   builder.setBolt("word-spilter", new WordSpliter()).shuffleGrouping(spoutId);

                   builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("word-spilter", new Fields("word"));

                   Config conf = new Config();

                   conf.setNumWorkers(4);

                   conf.setNumAckers(0);

                   conf.setDebug(false);

                  

                   //LocalCluster用來將topology提交到本地模擬器運行,方便開發調試

                   LocalCluster cluster = new LocalCluster();

                   cluster.submitTopology("WordCount", conf, builder.createTopology());

                  

                   //提交topology到storm集群中運行

//               StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology());

         }

 

}

 

 

 

 

 

 

WordSpliter.java

 

package cn.itcast.storm.bolt;

 

import org.apache.commons.lang.StringUtils;

 

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

 

public class WordSpliter extends BaseBasicBolt {

 

         private static final long serialVersionUID = -5653803832498574866L;

 

         @Override

         public void execute(Tuple input, BasicOutputCollector collector) {

                   String line = input.getString(0);

                   String[] words = line.split(" ");

                   for (String word : words) {

                            word = word.trim();

                            if (StringUtils.isNotBlank(word)) {

                                     word = word.toLowerCase();

                                     collector.emit(new Values(word));

                            }

                   }

         }

 

         @Override

         public void declareOutputFields(OutputFieldsDeclarer declarer) {

                   declarer.declare(new Fields("word"));

 

         }

 

}

 

 

 

 

WriterBolt.java

 

package cn.itcast.storm.bolt;

 

import java.io.FileWriter;

import java.io.IOException;

import java.util.Map;

import java.util.UUID;

 

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Tuple;

/**

 * 將數據寫入文件

 *

 *

 */

public class WriterBolt extends BaseBasicBolt {

 

         private static final long serialVersionUID = -6586283337287975719L;

        

         private FileWriter writer = null;

        

         @Override

         public void prepare(Map stormConf, TopologyContext context) {

                   try {

                            writer = new FileWriter("c:\\storm-kafka\\" + "wordcount"+UUID.randomUUID().toString());

                   } catch (IOException e) {

                            throw new RuntimeException(e);

                   }

         }

 

        

         @Override

         public void declareOutputFields(OutputFieldsDeclarer declarer) {

         }

        

        

         @Override

         public void execute(Tuple input, BasicOutputCollector collector) {

                   String s = input.getString(0);

                   try {

                            writer.write(s);

                            writer.write("\n");

                            writer.flush();

                   } catch (IOException e) {

                            throw new RuntimeException(e);

                   }

         }

}

 

 

 

 

 

 

 

storm + kafka的具體應用場景有哪些?

      手機位置的,在基站的實時軌跡分析。

      Storm,是可以做實時分析,但是你,若沒有個消息隊列的話,你那消息,當storm死掉之后,中間那段時間,消息都沒了。而,你若采用storm + kafka,則把那中間段時間的消息緩存下。

      初步可以這么理解,storm + kafka,把kafka理解為緩存,只不過這個緩存,可以分區域。實際上,處理業務邏輯的是,storm。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM