Storm HBase 集成


原文鏈接:http://storm.apache.org/releases/1.1.0/storm-hbase.html

Storm/Trident 和 Apache HBase 的集成

用法

和HBase集成的重要API是org.apache.storm.hbase.bolt.mapper.HBaseMapper接口

public interface HBaseMapper extends Serializable {
    byte[] rowKey(Tuple tuple);

    ColumnList columns(Tuple tuple);
}

rowKey()方法是簡單明了的:輸入一個Storm tuple,返回代表rowkey的字節數組。

columns()方法定義了要寫入HBase行的內容。ColumnList類允許你添加 standard HBase columns和HBase counter columns(這兩種列是什么???)

要添加一個standard column,使用addColumn()方法:

ColumnList cols = new ColumnList();
cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));

要添加一個counter column,使用addCounter()方法:

ColumnList cols = new ColumnList();
cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));

當遠程HBase啟用了安全認證,一個Kerberos keytab和相應的principle名需要被提供給storm-hbase連接器。

特別地,傳遞給Topology的Config對象應該包含{(“storm.keytab.file”, “$keytab”), ("storm.kerberos.principal", “$principal”)}。例如:

Config config = new Config();
...
config.put("storm.keytab.file", "$keytab");
config.put("storm.kerberos.principal", "$principle");
StormSubmitter.submitTopology("$topologyName", config, builder.createTopology());

使用授權的token與啟用安全認證的 HBase 集成

如果你的Topology要和啟用安全認證的HBase交互,你的bolts/states需要被HBase認證。解決辦法是需要所有潛在的worker主機擁有"sotrm.keytab.file"。

如果你在一個集群上有多個Topology,每一個都使用不同的hbase用戶,你必須創建多個keytab並把它分發到所有worker主機。

以上做法的替代方法是:

管理員可以配置nimbus能自動地代表提交Topology的用戶獲取授權token。這樣,nimbus需要以以下配置啟動:

nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"]
hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.) 
hbase.kerberos.principal: "superuser@EXAMPLE.com" 
nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, 
if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is atleast 1 hour less then that.)

你的Topology配置應該包括:topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"]

如果nimbus沒有以上配置,你需要添加以上配置然后重啟HBase。

確保hbase配置文件(core-site.xml,hdfs-site.xml 和 hbase-site.xml)和包含storm-hbase jar的依賴文件都在nimbus的classpath里。

Nimbus會使用在配置里指定的keytab和principal進行HBase認證。在每一個Topology提交時,nimbus都要模擬提交的Topology用戶,並且代表提交的Topology用戶獲取授權token。

如果Topology以 topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 這樣的配置啟動,nimbus會為了你的Topology把授權token推給所有的worker,

連接hbase的 bolt/state 會以這些token進行認證。

當nimbus模擬提交的Topology用戶的用戶時,你需要確保在 storm.kerberos.principal 中指定的user擁有代表其他user獲取token的權限。

要達到這個要求,你需要按照列在這個link中的配置方向進行配置 http://hbase.apache.org/book/security.html#security.rest.gateway

你可以閱讀關於建立安全認證的HBase的內容 http://hbase.apache.org/book/security.html

SimpleHBaseMapper

storm-hbase有一個含有通用目的HBaseMapper的實現——SimpleHBaseMapper,它可以把Storm tuple映射到標准HBase column和counter column

要使用SimpleHBaseMapper,你需要告訴它哪些field要被映射成什么類型的column。

以下代碼創建了一個SimpleHBaseMapper實例:

1. 使用 word tuple值作為一個 row key

2. 為tuple域word 添加一個標准的HBase列

3. 為tuple域count 添加一個HBase counter列

4. 把值寫到 cf 列族中

SimpleHBaseMapper mapper = new SimpleHBaseMapper() 
        .withRowKeyField("word")
        .withColumnFields(new Fields("word"))
        .withCounterFields(new Fields("count"))
        .withColumnFamily("cf");

HBaseBolt

要使用HBaseBolt,需要提供 輸出的表的名字 和 一個HBaseMapper的實現 進行構造

HBaseBolt hbase = new HBaseBolt("WordCount", mapper);

HBaseBolt會授權 mapper實例 明確如何把tuple數據持久化到HBase中。

HBaseValueMapper

這個類運行你把 HBase的查詢結果轉化成storm Values實例,這個Values實例將會被HBaseLookupBolt發射出去。

public interface HBaseValueMapper extends Serializable {
    public List<Values> toTuples(Result result) throws Exception;
    void declareOutputFields(OutputFieldsDeclarer declarer);
}

toTuples方法接收一個 HBase Result,輸出一個包含Values的List。每一個由這個方法返回的值都會被HBaseLookupBolt發射。

declareOutputFields應該被用來聲明HBaseLookupBolt的輸出域 outputFields。

在 src/test/java 路徑下有example

HBaseProjectionCriteria

這個類允許你指定HBase Get方法的投影條件。

這是lookupBolt可選的參數,如果你不指定這個實例的話,所有的列都會由HBaseLookupBolt返回。

public class HBaseProjectionCriteria implements Serializable {
    public HBaseProjectionCriteria addColumnFamily(String columnFamily);
    public HBaseProjectionCriteria addColumn(ColumnMetaData column);
}

addColumnFamily 接收列族 columnFamily。指定這個參數意味着這個列族的所有列都將被包含在投影里

addColumn 接收一個 ColumnMetaData實例。指定這個參數意味着只有這個列族的這個列會成為投影的一部分。

以下代碼會創建一個HBaseProjectionCriteria實例用來指定投影的條件:

1. 包含列族 cf 的count column

2. 包含列族 cf2 的所有列

HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria()
    .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"))
    .addColumnFamily("cf2");

HBaseLookupBolt

要使用HBaseLookupBolt,通過 要輸出到的表名、一個HBaseMapper的實例、一個HBaseRowToStormValueMapper的實例去構建。

你可以指定或不指定HBaseProjectionCriteria。

HBaseLookupBolt會使用HBaseMapper的實例mapper獲取用來查詢的rowKey,

會使用HBaseProjectionCriteria的實例指定哪些列會被包含在結果中,

會使用HBaseRowToStormValueMapper的實例獲取要被bolt發送的value。

你可以參照 在 src/test/java下的example:topology LookupWordCount.java

示例: 持久化 Word Count

在src/test/java下有一個可以運行的例子

Setup

以下步驟假定你正在本地運行HBase,或在classpath中有hbase-site.xml指出你的HBase集群

使用 hbase shell命令創建一個schema:

> create 'WordCount', 'cf'

執行

運行org.apache.storm.hbase.topology.PersistenWordCount類(它會運行這個Topology 10秒,然后退出)

在這個Topology運行過程中或運行之后,運行org.apache.storm.hbase.topology.WordCountClient類去查看存儲在HBase的計數值。你會看到以下類似的內容:

Word: 'apple', Count: 6867
Word: 'orange', Count: 6645
Word: 'pineapple', Count: 6954
Word: 'banana', Count: 6787
Word: 'watermelon', Count: 6806

為了引用,列出這個Topology示例

public class PersistentWordCount {
    private static final String WORD_SPOUT = "WORD_SPOUT";
    private static final String COUNT_BOLT = "COUNT_BOLT";
    private static final String HBASE_BOLT = "HBASE_BOLT";


    public static void main(String[] args) throws Exception {
        Config config = new Config();

        WordSpout spout = new WordSpout();
        WordCounter bolt = new WordCounter();

        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
                .withRowKeyField("word")
                .withColumnFields(new Fields("word"))
                .withCounterFields(new Fields("count"))
                .withColumnFamily("cf");

        HBaseBolt hbase = new HBaseBolt("WordCount", mapper);


        // wordSpout ==> countBolt ==> HBaseBolt
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(WORD_SPOUT, spout, 1);
        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
        builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));


        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();
            System.exit(0);
        } else {
            config.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM