PySpark操作HBase時設置scan參數


 在用PySpark操作HBase時默認是scan操作,通常情況下我們希望加上rowkey指定范圍,即只獲取一部分數據參加運算。翻遍了spark的python相關文檔,搜遍了googlestackoverflow也沒有具體的解決方案。既然java和scala都支持,python肯定也支持的。

翻了一下hbase源碼

org.apache.hadoop.hbase.mapreduce.TableInputFormat

setConf方法里原來是根據特定的字符串對scan進行配置,那么在Python里對conf就可以進行相應的設置,這些設置主要包括:

hbase.mapreduce.scan.row.start
hbase.mapreduce.scan.row.stop
hbase.mapreduce.scan.column.family
hbase.mapreduce.scan.columns
hbase.mapreduce.scan.timestamp
hbase.mapreduce.scan.timerange.start
hbase.mapreduce.scan.timerange.end
hbase.mapreduce.scan.maxversions
hbase.mapreduce.scan.cacheblocks
hbase.mapreduce.scan.cachedrows
hbase.mapreduce.scan.batchsize

首先創建測試表

hbase> create 'test', 'f1'
hbase> put 'test', 'row1', 'f1', 'value1'
hbase> put 'test', 'row2', 'f1', 'value2'
hbase> put 'test', 'row3', 'f1', 'value3'
hbase> put 'test', 'row4', 'f1', 'value4'

  

然后,設置scan范圍的示例代碼如下

sc = SparkContext(appName=settings.APP_NAME)
conf = {
        "hbase.zookeeper.quorum": settings.HBASE_HOST,
        "hbase.mapreduce.inputtable": "test",
        "hbase.mapreduce.scan.row.start": "row2"
    }
rdd = sc.newAPIHadoopRDD(
    "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
    "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
    "org.apache.hadoop.hbase.client.Result",
    keyConverter="org.valux.converters.ImmutableBytesWritableToStringConverter",
    valueConverter="org.valux.converters.HBaseResultToStringConverter",
    conf=conf)
result = rdd.collect()
for (k, v) in result
    print k, v

 

org.valux.converters.ImmutableBytesWritableToStringConverterorg.valux.converters.HBaseResultToStringConverter 是我自己實現的兩個轉換類,也可以用spark默認自帶的converter,具體可以參考hbase_inputformat.py,不過提交時請帶上相應的jar包

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM