Spark:spark读取hbase


spark-shell运行,若要载入hbase的jar包:
export SPARK_CLASSPATH=/opt/hbase-1.2.4/lib/*

获取hbase中列簇中的列值:

import org.apache.spark._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes

val tableName = "table"   // 数据表名
val columnFamily = "col2" // 要获取的列簇名
val columnFamily_column = "c1"  // 要获取的列簇下的列名

@transient val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val hBaseRDD = sc.newAPIHadoopRDD(
    conf, 
    classOf[TableInputFormat], 
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
    classOf[org.apache.hadoop.hbase.client.Result])
    
val rdd = hBaseRDD.map{ s => 
  Bytes.toString(s._2.getValue(
      Bytes.toBytes(columnFamily), 
      Bytes.toBytes(columnFamily_column)))
}

//如果列族下没有列则第二个参数为null
//val rdd = hBaseRDD.map{ s => 
// Bytes.toString(s._2.getValue(
// Bytes.toBytes(columnFamily), 
// null))
//}

获取hbase表DataFrame:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result

class A() {
  def getHBaseBUYERValue(result: Result, cols: Array[String]) = {
    var values = ""
    for (str <- cols) {
      var value = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes(str)))
      if (value == null)
        value = ","
      else
        value = "," + value
      values += value
    }
    Row(values.flatmap(s => s.split(",").tail))
  }

  val conf = HBaseConfiguration.create()
  conf.set(TableInputFormat.INPUT_TABLE, "buyer")
  val hBaseRDD1 = sc.newAPIHadoopRDD(
    conf,
    classOf[TableInputFormat],
    classOf[ImmutableBytesWritable],
    classOf[Result])
  val buyer_rowRDD = hBaseRDD1.map { s =>
    getHBaseBUYERValue(s._2, Array("BUY_ID", "NAME"))
  }

  val buyer_field1 = new StructField("BUY_ID", StringType, true)
  val buyer_field2 = new StructField("NAME", StringType, true)
  val buyer_schema = StructType(Array(buyer_field1, buyer_field2))
  val buyer_table = spark.createDataFrame(buyer_rowRDD, buyer_schema)
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM