原文鏈接:http://storm.apache.org/releases/1.1.0/storm-hbase.html
Storm/Trident 和 Apache HBase 的集成
用法
和HBase集成的重要API是org.apache.storm.hbase.bolt.mapper.HBaseMapper接口
public interface HBaseMapper extends Serializable { byte[] rowKey(Tuple tuple); ColumnList columns(Tuple tuple); }
rowKey()方法是簡單明了的:輸入一個Storm tuple,返回代表rowkey的字節數組。
columns()方法定義了要寫入HBase行的內容。ColumnList類允許你添加 standard HBase columns和HBase counter columns(這兩種列是什么???)
要添加一個standard column,使用addColumn()方法:
ColumnList cols = new ColumnList(); cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));
要添加一個counter column,使用addCounter()方法:
ColumnList cols = new ColumnList(); cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));
當遠程HBase啟用了安全認證,一個Kerberos keytab和相應的principle名需要被提供給storm-hbase連接器。
特別地,傳遞給Topology的Config對象應該包含{(“storm.keytab.file”, “$keytab”), ("storm.kerberos.principal", “$principal”)}。例如:
Config config = new Config(); ... config.put("storm.keytab.file", "$keytab"); config.put("storm.kerberos.principal", "$principle"); StormSubmitter.submitTopology("$topologyName", config, builder.createTopology());
使用授權的token與啟用安全認證的 HBase 集成
如果你的Topology要和啟用安全認證的HBase交互,你的bolts/states需要被HBase認證。解決辦法是需要所有潛在的worker主機擁有"sotrm.keytab.file"。
如果你在一個集群上有多個Topology,每一個都使用不同的hbase用戶,你必須創建多個keytab並把它分發到所有worker主機。
以上做法的替代方法是:
管理員可以配置nimbus能自動地代表提交Topology的用戶獲取授權token。這樣,nimbus需要以以下配置啟動:
nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"] hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.) hbase.kerberos.principal: "superuser@EXAMPLE.com" nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed,
if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is atleast 1 hour less then that.)
你的Topology配置應該包括:topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"]
如果nimbus沒有以上配置,你需要添加以上配置然后重啟HBase。
確保hbase配置文件(core-site.xml,hdfs-site.xml 和 hbase-site.xml)和包含storm-hbase jar的依賴文件都在nimbus的classpath里。
Nimbus會使用在配置里指定的keytab和principal進行HBase認證。在每一個Topology提交時,nimbus都要模擬提交的Topology用戶,並且代表提交的Topology用戶獲取授權token。
如果Topology以 topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 這樣的配置啟動,nimbus會為了你的Topology把授權token推給所有的worker,
連接hbase的 bolt/state 會以這些token進行認證。
當nimbus模擬提交的Topology用戶的用戶時,你需要確保在 storm.kerberos.principal 中指定的user擁有代表其他user獲取token的權限。
要達到這個要求,你需要按照列在這個link中的配置方向進行配置 http://hbase.apache.org/book/security.html#security.rest.gateway
你可以閱讀關於建立安全認證的HBase的內容 http://hbase.apache.org/book/security.html
SimpleHBaseMapper
storm-hbase有一個含有通用目的HBaseMapper的實現——SimpleHBaseMapper,它可以把Storm tuple映射到標准HBase column和counter column
要使用SimpleHBaseMapper,你需要告訴它哪些field要被映射成什么類型的column。
以下代碼創建了一個SimpleHBaseMapper實例:
1. 使用 word tuple值作為一個 row key
2. 為tuple域word 添加一個標准的HBase列
3. 為tuple域count 添加一個HBase counter列
4. 把值寫到 cf 列族中
SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withRowKeyField("word") .withColumnFields(new Fields("word")) .withCounterFields(new Fields("count")) .withColumnFamily("cf");
HBaseBolt
要使用HBaseBolt,需要提供 輸出的表的名字 和 一個HBaseMapper的實現 進行構造
HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
HBaseBolt會授權 mapper實例 明確如何把tuple數據持久化到HBase中。
HBaseValueMapper
這個類運行你把 HBase的查詢結果轉化成storm Values實例,這個Values實例將會被HBaseLookupBolt發射出去。
public interface HBaseValueMapper extends Serializable { public List<Values> toTuples(Result result) throws Exception; void declareOutputFields(OutputFieldsDeclarer declarer); }
toTuples方法接收一個 HBase Result,輸出一個包含Values的List。每一個由這個方法返回的值都會被HBaseLookupBolt發射。
declareOutputFields應該被用來聲明HBaseLookupBolt的輸出域 outputFields。
在 src/test/java 路徑下有example
HBaseProjectionCriteria
這個類允許你指定HBase Get方法的投影條件。
這是lookupBolt可選的參數,如果你不指定這個實例的話,所有的列都會由HBaseLookupBolt返回。
public class HBaseProjectionCriteria implements Serializable { public HBaseProjectionCriteria addColumnFamily(String columnFamily); public HBaseProjectionCriteria addColumn(ColumnMetaData column); }
addColumnFamily 接收列族 columnFamily。指定這個參數意味着這個列族的所有列都將被包含在投影里
addColumn 接收一個 ColumnMetaData實例。指定這個參數意味着只有這個列族的這個列會成為投影的一部分。
以下代碼會創建一個HBaseProjectionCriteria實例用來指定投影的條件:
1. 包含列族 cf 的count column
2. 包含列族 cf2 的所有列
HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria() .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count")) .addColumnFamily("cf2");
HBaseLookupBolt
要使用HBaseLookupBolt,通過 要輸出到的表名、一個HBaseMapper的實例、一個HBaseRowToStormValueMapper的實例去構建。
你可以指定或不指定HBaseProjectionCriteria。
HBaseLookupBolt會使用HBaseMapper的實例mapper獲取用來查詢的rowKey,
會使用HBaseProjectionCriteria的實例指定哪些列會被包含在結果中,
會使用HBaseRowToStormValueMapper的實例獲取要被bolt發送的value。
你可以參照 在 src/test/java下的
example:topology LookupWordCount.java
示例: 持久化 Word Count
在src/test/java下有一個可以運行的例子
Setup
以下步驟假定你正在本地運行HBase,或在classpath中有hbase-site.xml指出你的HBase集群
使用 hbase shell命令創建一個schema:
> create 'WordCount', 'cf'
執行
運行org.apache.storm.hbase.topology.PersistenWordCount類(它會運行這個Topology 10秒,然后退出)
在這個Topology運行過程中或運行之后,運行org.apache.storm.hbase.topology.WordCountClient類去查看存儲在HBase的計數值。你會看到以下類似的內容:
Word: 'apple', Count: 6867 Word: 'orange', Count: 6645 Word: 'pineapple', Count: 6954 Word: 'banana', Count: 6787 Word: 'watermelon', Count: 6806
為了引用,列出這個Topology示例
public class PersistentWordCount { private static final String WORD_SPOUT = "WORD_SPOUT"; private static final String COUNT_BOLT = "COUNT_BOLT"; private static final String HBASE_BOLT = "HBASE_BOLT"; public static void main(String[] args) throws Exception { Config config = new Config(); WordSpout spout = new WordSpout(); WordCounter bolt = new WordCounter(); SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withRowKeyField("word") .withColumnFields(new Fields("word")) .withCounterFields(new Fields("count")) .withColumnFamily("cf"); HBaseBolt hbase = new HBaseBolt("WordCount", mapper); // wordSpout ==> countBolt ==> HBaseBolt TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(WORD_SPOUT, spout, 1); builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT); builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word")); if (args.length == 0) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", config, builder.createTopology()); Thread.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); System.exit(0); } else { config.setNumWorkers(3); StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } } }