spark實現分頁查詢hbase


第一種:

 

 

 

  import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  import org.apache.spark.{SparkConf, SparkContext}
  import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  import org.apache.hadoop.hbase.protobuf.ProtobufUtil
  import org.apache.hadoop.hbase.util.{Base64, Bytes}
  import org.apache.spark.rdd.RDD
  import org.apache.hadoop.hbase.HBaseConfiguration
 import org.apache.hadoop.hbase.client.Result
 import org.apache.hadoop.hbase.client.Scan
 import org.apache.hadoop.hbase.filter._
 import org.apache.hadoop.hbase.util.Bytes    
 val sparkConf = new SparkConf().setAppName("HbaseTest").setMaster("local[1]")
     val sc = new SparkContext(sparkConf)
     val conf = HBaseConfiguration.create()
     conf.set("hbase.zookeeper.quorum",Spark_HbaseUtil.getProperties("bootstrap.servers") )
     val tableName = "sinldo:hos_index"
     conf.set(TableInputFormat.INPUT_TABLE, tableName)
     //開始rowKey和結束rowKey一樣代表精確查詢的某條數據
     val startRowkey = lastRowKey
    // 組裝scan語句  startRowkey  stopRowkey可以寫成參數
     val scan = new Scan(Bytes.toBytes(startRowkey))
     //true代表不查詢全表
   scan.setCacheBlocks(true)
     scan.setCaching(9)
    val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
     val filter = new PageFilter(10);
val proto = ProtobufUtil.toScan(scan)
val scanToString = Base64.encodeBytes(proto.toByteArray)
conf.set(TableInputFormat.SCAN, scanToString)
val hBaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
//獲取數量
val count = hBaseRDD.count()
println(count)

第二種:將第一種的設置開始RoeKey的地方換成

    import org.apache.hadoop.hbase.filter.RowFilter
    import org.apache.hadoop.hbase.filter.BinaryComparator;
     val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
     val filter = new PageFilter(10)
  //002e代表開始的rowKey
    val rowFilter2: Filter = new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes("022e")));
    filterList.addFilter(filter)
    filterList.addFilter(rowFilter2)
    scan.setFilter(filterList)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM