一、集群storm版本:
storm version命令打出來的:
Storm 0.10.0.2.3.0.0-2557 URL git@github.com:hortonworks/storm.git -r 38fad7c05bd00ac4ca61b68abf7411d9abc6189c Branch (no branch) Compiled by jenkins on 2015-07-14T14:45Z From source with checksum 43c4b3baaad6a0bca88145356d46327
本地storm版本:apache-storm-0.10.1 注意版本和集群並不一致
storm-hbase jar包版本:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hbase</artifactId> <version>0.10.0</version> </dependency>
HBase version:
[root@node3 tmp]# hbase version
2016-07-08 14:02:14,137 INFO [main] util.VersionInfo: HBase 1.1.1.2.3.0.0-2557
2016-07-08 14:02:14,140 INFO [main] util.VersionInfo: Source code repository git://ip-10-0-0-89.ec2.internal/grid/0/jenkins/workspace/HDP-dal-centos6/bigtop/build/hbase/rpm/BUILD/hbase-1.1.1.2.3.0.0 revision=6a55f21850cfccf19fa651b9e2c74c7f99bbd4f9
2016-07-08 14:02:14,140 INFO [main] util.VersionInfo: Compiled by jenkins on Tue Jul 14 09:41:13 EDT 2015
2016-07-08 14:02:14,140 INFO [main] util.VersionInfo: From source with checksum 8f076e3255b10e166a73c2436c2b1706
二、本地模式下測試往HBase里寫數據,拓撲定義代碼如下,用了自帶的HBaseBolt類
public class PersistTopology { private static final String KAFKA_SPOUT = "KAFKA_SPOUT"; private static final String HBASE_BOLT = "HBASE_BOLT"; public static void main(String[] args) throws Exception { /* define spout */ KafkaSpout kafkaSpout = new KafkaSpout(); System.setProperty("hadoop.home.dir", "E:\\eclipse\\"); /* define HBASE Bolt */ HBaseMapper mapper = new MyHBaseMapper(); HBaseBolt hbaseBolt = new HBaseBolt("testhbasebolt", mapper).withConfigKey("hbase.conf"); /* define topology*/ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(KAFKA_SPOUT, kafkaSpout); builder.setBolt(HBASE_BOLT, hbaseBolt, 2).shuffleGrouping(KAFKA_SPOUT); Config conf = new Config(); conf.setDebug(true); Map<String, Object> hbConf = new HashMap<String, Object>(); conf.put("hbase.conf", hbConf);if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(60000000); cluster.killTopology("test"); cluster.shutdown(); } } }
測試結果:520多萬條數據,寫了幾個多小時,平均一秒鍾才幾百條
問題原因:看了HBaseBolt類的源碼發現,此版本實現不是批量發的,如下,每收到一個tuple會調用execute函數,然后就直接batchMutate發出去了
@Override public void execute(Tuple tuple) { byte[] rowKey = this.mapper.rowKey(tuple); ColumnList cols = this.mapper.columns(tuple); List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL); try { this.hBaseClient.batchMutate(mutations); } catch(Exception e){ this.collector.reportError(e); this.collector.fail(tuple); return; } this.collector.ack(tuple); }
二、下了一個apache-storm-1.0.1的原碼發現execute函數的實現已經變成真正的批量發送,如下:
@Override public void execute(Tuple tuple) { boolean flush = false; try { if (TupleUtils.isTick(tuple)) { LOG.debug("TICK received! current batch status [{}/{}]", tupleBatch.size(), batchSize); collector.ack(tuple); flush = true; } else { byte[] rowKey = this.mapper.rowKey(tuple); ColumnList cols = this.mapper.columns(tuple); List<Mutation> mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL); batchMutations.addAll(mutations); tupleBatch.add(tuple); if (tupleBatch.size() >= batchSize) { flush = true; } } if (flush && !tupleBatch.isEmpty()) { this.hBaseClient.batchMutate(batchMutations); LOG.debug("acknowledging tuples after batchMutate"); for(Tuple t : tupleBatch) { collector.ack(t); } tupleBatch.clear(); batchMutations.clear(); } } catch(Exception e){ this.collector.reportError(e); for (Tuple t : tupleBatch) { collector.fail(t); } tupleBatch.clear(); batchMutations.clear(); } }
batchSize可以設置,一旦當前數據量超過這個值就會被批量寫入到HBase,同時,if (TupleUtils.isTick(tuple))這個目測是一種機制,隔一段時間bolt就會收到這樣一個tick tuple,類似於一種
定時的機制,這樣可保證到達這個時間后即使數據量不到batchsize這么多也能被及時寫入,該值可以設置,通過代碼或storm.yaml配置文件都可以,代碼設置如下:
conf.put("hbase.conf", hbConf); //conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); // conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); // conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); // conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1);
配置文件設置如下,代碼設置應該優先級更高(還沒試過):
[root@node1 conf]# more storm.yaml |grep tuple topology.tick.tuple.freq.secs : 1
沒有升級storm版本,直接在當前的版本里把新版本中優化的代碼抄了過來,上集群測試。
測試結果:數據量和之前一樣,但是有非常大的提升,之前需要5個多小時寫入的數據,差不多二十分鍾就寫完了