WordCount: 基於kafka+storm+hbase


描述

使用wordcount程序,整合kafkf,storm和hbase
數據源:kafka, topic "logs"
詞頻統計: storm
存儲:統計的結果存儲到hbase

1,分析

1.1 storm topology

在topology中,使用KafkaSpout從kafka接收數據,接收到的數據是以行為單位的句子;
使用SentenceSplitBolt分拆出每個單詞,再使用CountBolt統計每個單詞出現的次數,最后使用Hbase bolt把結果存儲到hbase中。

Kafka -> KafkaSpout -> SentenceSplitBolt -> CountBolt -> Hbase bolt

2, 實現

實驗環境

2台服務器,hadoop1 和 hadoo2

CentOS-6.4 hadoop1, hadoop2
Hadoop-2.5-cdh-5.3.6 hadoop1
kafka-2.10-0.8.1.1 hadoop2
hbase-0.98.6-cdh-5.3.6 hadoop2-HMaster, hadoop1-RegionServer
storm-0.9.6 hadoop2
zookeeper-3.4.5-cdh5.3.6 hadoop2

SentenceSplitBolt

public class SentenceSplitBolt extends BaseRichBolt {
	static final Logger LOGGER = LoggerFactory.getLogger(SentenceSplitBolt.class);
	
	private OutputCollector collector;
	
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		collector = collector;
	}

	@Override
	public void execute(Tuple input) {			
		// KafkaSpout中使用了"str"作為數據的字段名
		String sentence = input.getStringByField("str");
		String[] words = sentence.split(" ");
		
		if (words.length > 0) {
			for (String word : words) {				
				collector.emit(new Values(words));			// 一個一個單詞發射出去
			}
		}
		
		// 確認:tuple成功處理
		collector.ack(input);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}
}

CountBolt

public class CountBolt extends BaseRichBolt {
	static final Logger LOGGER = LoggerFactory.getLogger(CountBolt.class);
	private OutputCollector collector;
	private Map<String, Integer> wordMap = new Hashtable<String, Integer>();
	
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;			
	}

	@Override
	public void execute(Tuple input) {
		String word = input.getStringByField("word");
		if (!wordMap.containsKey(word)) {
			wordMap.put(word, 0);
		}
		
		int count = wordMap.get(word);
		count++;
		wordMap.put(word, count);

		// 為了方便測試,把count轉化為字符串,這樣能夠在hue中方便查看到hbase中的數據
		collector.emit(new Values(word, String.valueOf(count)));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word", "count"));
	}
}

WCTopohogy

public class WCTopohogy {
	static Logger logger = LoggerFactory.getLogger(WCTopohogy.class);
	
	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
		TopologyBuilder builder = new TopologyBuilder();

		SpoutConfig spoutConf = new SpoutConfig(new ZkHosts("hadoop2"), "test", "/test", UUID.randomUUID().toString());
		spoutConf.forceFromStart = true;
		spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
		
		KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
		
	    builder.setSpout("spout", kafkaSpout, 5);

	    builder.setBolt("split", new SentenceSplitBolt(), 8).shuffleGrouping("spout");
	    builder.setBolt("count", new CountBolt(), 12).fieldsGrouping("split", new Fields("word"));	    
	    
	    SimpleHBaseMapper mapper = new SimpleHBaseMapper();
	    mapper.withColumnFamily("result");
	    mapper.withColumnFields(new Fields("count"));
	    mapper.withRowKeyField("word");
	    
	    Map<String, Object> map = Maps.newTreeMap();
	    map.put("hbase.rootdir", "hdfs://hadoop1:9000/hbase");
	    map.put("hbase.zookeeper.quorum", "hadoop2:2181");
	    
	    // hbase-bolt
	    HBaseBolt hBaseBolt = new HBaseBolt("wordcount", mapper).withConfigKey("hbase.conf");
	    builder.setBolt("hbase", hBaseBolt, 6).shuffleGrouping("count");
	    
	    Config conf = new Config();
	    conf.setDebug(true);
	    conf.put("hbase.conf", map);

	   // 設置遠程nimbus主機
	   // conf.put(Config.NIMBUS_HOST, "hadoop2");
	   //  conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
	    
	    // 集群模式
	    if (args != null && args.length > 0) {
	      conf.setNumWorkers(3);

	      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
	    }
	    else {		// 本地模式
	      conf.setMaxTaskParallelism(3);
	      LocalCluster cluster = new LocalCluster();
	      cluster.submitTopology("word-count", conf, builder.createTopology());
	    }
	}
}

PrepareHbase 用於在hbase創建wordcount table

public class PrepareHbase {
	public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {	
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.rootdir", "hdfs://hadoop1:9000/hbase");
		conf.set("hbase.zookeeper.quorum", "hadoop2:2181");
		
		HBaseAdmin admin = new HBaseAdmin(conf);
		HTableDescriptor tableDescriptor = new HTableDescriptor("wordcount");
		tableDescriptor.addFamily(new HColumnDescriptor("result"));
		admin.createTable(tableDescriptor);
	}
}

3 測試

  1. 運行PrepareHbase創建wordcount表
  2. 運行WCTopology

啟動kafka-console-consumer,輸入句子進行測試

在hue中觀察storm出現的次數

再次在kafka-console-consumer輸入storm后,觀察storm的次數

4 總結

Storm是一個實時流式數據處理器,本實驗使用storm處理來自kafka的消息,並把處理后的結果保存到hbase


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM