第一種:
import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.spark.rdd.RDD import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.filter._ import org.apache.hadoop.hbase.util.Bytes val sparkConf = new SparkConf().setAppName("HbaseTest").setMaster("local[1]") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum",Spark_HbaseUtil.getProperties("bootstrap.servers") ) val tableName = "sinldo:hos_index" conf.set(TableInputFormat.INPUT_TABLE, tableName) //開始rowKey和結束rowKey一樣代表精確查詢的某條數據 val startRowkey = lastRowKey // 組裝scan語句 startRowkey stopRowkey可以寫成參數 val scan = new Scan(Bytes.toBytes(startRowkey)) //true代表不查詢全表 scan.setCacheBlocks(true) scan.setCaching(9) val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); val filter = new PageFilter(10);
val proto = ProtobufUtil.toScan(scan)
val scanToString = Base64.encodeBytes(proto.toByteArray)
conf.set(TableInputFormat.SCAN, scanToString)
val hBaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
//獲取數量
val count = hBaseRDD.count()
println(count)
第二種:將第一種的設置開始RoeKey的地方換成
import org.apache.hadoop.hbase.filter.RowFilter import org.apache.hadoop.hbase.filter.BinaryComparator;
val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
val filter = new PageFilter(10)
//002e代表開始的rowKey val rowFilter2: Filter = new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes("022e"))); filterList.addFilter(filter) filterList.addFilter(rowFilter2) scan.setFilter(filterList)