描述
使用wordcount程序,整合kafkf,storm和hbase
數據源:kafka, topic "logs"
詞頻統計: storm
存儲:統計的結果存儲到hbase
1,分析
1.1 storm topology
在topology中,使用KafkaSpout從kafka接收數據,接收到的數據是以行為單位的句子;
使用SentenceSplitBolt分拆出每個單詞,再使用CountBolt統計每個單詞出現的次數,最后使用Hbase bolt把結果存儲到hbase中。
Kafka -> KafkaSpout -> SentenceSplitBolt -> CountBolt -> Hbase bolt
2, 實現
實驗環境
2台服務器,hadoop1 和 hadoo2
CentOS-6.4 | hadoop1, hadoop2 |
Hadoop-2.5-cdh-5.3.6 | hadoop1 |
kafka-2.10-0.8.1.1 | hadoop2 |
hbase-0.98.6-cdh-5.3.6 | hadoop2-HMaster, hadoop1-RegionServer |
storm-0.9.6 | hadoop2 |
zookeeper-3.4.5-cdh5.3.6 | hadoop2 |
SentenceSplitBolt
public class SentenceSplitBolt extends BaseRichBolt {
static final Logger LOGGER = LoggerFactory.getLogger(SentenceSplitBolt.class);
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
collector = collector;
}
@Override
public void execute(Tuple input) {
// KafkaSpout中使用了"str"作為數據的字段名
String sentence = input.getStringByField("str");
String[] words = sentence.split(" ");
if (words.length > 0) {
for (String word : words) {
collector.emit(new Values(words)); // 一個一個單詞發射出去
}
}
// 確認:tuple成功處理
collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
CountBolt
public class CountBolt extends BaseRichBolt {
static final Logger LOGGER = LoggerFactory.getLogger(CountBolt.class);
private OutputCollector collector;
private Map<String, Integer> wordMap = new Hashtable<String, Integer>();
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
if (!wordMap.containsKey(word)) {
wordMap.put(word, 0);
}
int count = wordMap.get(word);
count++;
wordMap.put(word, count);
// 為了方便測試,把count轉化為字符串,這樣能夠在hue中方便查看到hbase中的數據
collector.emit(new Values(word, String.valueOf(count)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
WCTopohogy
public class WCTopohogy {
static Logger logger = LoggerFactory.getLogger(WCTopohogy.class);
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
TopologyBuilder builder = new TopologyBuilder();
SpoutConfig spoutConf = new SpoutConfig(new ZkHosts("hadoop2"), "test", "/test", UUID.randomUUID().toString());
spoutConf.forceFromStart = true;
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
builder.setSpout("spout", kafkaSpout, 5);
builder.setBolt("split", new SentenceSplitBolt(), 8).shuffleGrouping("spout");
builder.setBolt("count", new CountBolt(), 12).fieldsGrouping("split", new Fields("word"));
SimpleHBaseMapper mapper = new SimpleHBaseMapper();
mapper.withColumnFamily("result");
mapper.withColumnFields(new Fields("count"));
mapper.withRowKeyField("word");
Map<String, Object> map = Maps.newTreeMap();
map.put("hbase.rootdir", "hdfs://hadoop1:9000/hbase");
map.put("hbase.zookeeper.quorum", "hadoop2:2181");
// hbase-bolt
HBaseBolt hBaseBolt = new HBaseBolt("wordcount", mapper).withConfigKey("hbase.conf");
builder.setBolt("hbase", hBaseBolt, 6).shuffleGrouping("count");
Config conf = new Config();
conf.setDebug(true);
conf.put("hbase.conf", map);
// 設置遠程nimbus主機
// conf.put(Config.NIMBUS_HOST, "hadoop2");
// conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
// 集群模式
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else { // 本地模式
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
}
}
}
PrepareHbase 用於在hbase創建wordcount table
public class PrepareHbase {
public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://hadoop1:9000/hbase");
conf.set("hbase.zookeeper.quorum", "hadoop2:2181");
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor tableDescriptor = new HTableDescriptor("wordcount");
tableDescriptor.addFamily(new HColumnDescriptor("result"));
admin.createTable(tableDescriptor);
}
}
3 測試
- 運行PrepareHbase創建wordcount表
- 運行WCTopology
啟動kafka-console-consumer,輸入句子進行測試
在hue中觀察storm出現的次數
再次在kafka-console-consumer輸入storm后,觀察storm的次數
4 總結
Storm是一個實時流式數據處理器,本實驗使用storm處理來自kafka的消息,並把處理后的結果保存到hbase